celeryd_detach.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.bin.celeryd_detach
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Program used to daemonize celeryd.
  6. Using :func:`os.execv` because forking and multiprocessing
  7. leads to weird issues (it was a long time ago now, but it
  8. could have something to do with the threading mutex bug)
  9. """
  10. from __future__ import absolute_import
  11. from __future__ import with_statement
  12. import os
  13. import sys
  14. from optparse import OptionParser, BadOptionError
  15. from celery import __version__
  16. from celery.platforms import EX_FAILURE, detached
  17. from celery.utils.log import get_logger
  18. from celery.bin.base import daemon_options, Option
  19. logger = get_logger(__name__)
  20. OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
  21. Option('--fake',
  22. default=False, action='store_true', dest='fake',
  23. help="Don't fork (for debugging purposes)"), )
  24. def detach(path, argv, logfile=None, pidfile=None, uid=None,
  25. gid=None, umask=0, working_directory=None, fake=False, ):
  26. with detached(logfile, pidfile, uid, gid, umask, working_directory, fake):
  27. try:
  28. os.execv(path, [path] + argv)
  29. except Exception:
  30. from celery import current_app
  31. current_app.log.setup_logging_subsystem('ERROR', logfile)
  32. logger.critical("Can't exec %r", ' '.join([path] + argv),
  33. exc_info=True)
  34. return EX_FAILURE
  35. class PartialOptionParser(OptionParser):
  36. def __init__(self, *args, **kwargs):
  37. self.leftovers = []
  38. OptionParser.__init__(self, *args, **kwargs)
  39. def _process_long_opt(self, rargs, values):
  40. arg = rargs.pop(0)
  41. if '=' in arg:
  42. opt, next_arg = arg.split('=', 1)
  43. rargs.insert(0, next_arg)
  44. had_explicit_value = True
  45. else:
  46. opt = arg
  47. had_explicit_value = False
  48. try:
  49. opt = self._match_long_opt(opt)
  50. option = self._long_opt.get(opt)
  51. except BadOptionError:
  52. option = None
  53. if option:
  54. if option.takes_value():
  55. nargs = option.nargs
  56. if len(rargs) < nargs:
  57. if nargs == 1:
  58. self.error('%s option requires an argument' % opt)
  59. else:
  60. self.error('%s option requires %d arguments' % (
  61. opt, nargs))
  62. elif nargs == 1:
  63. value = rargs.pop(0)
  64. else:
  65. value = tuple(rargs[0:nargs])
  66. del rargs[0:nargs]
  67. elif had_explicit_value:
  68. self.error('%s option does not take a value' % opt)
  69. else:
  70. value = None
  71. option.process(opt, value, values, self)
  72. else:
  73. self.leftovers.append(arg)
  74. def _process_short_opts(self, rargs, values):
  75. arg = rargs[0]
  76. try:
  77. OptionParser._process_short_opts(self, rargs, values)
  78. except BadOptionError:
  79. self.leftovers.append(arg)
  80. if rargs and not rargs[0][0] == '-':
  81. self.leftovers.append(rargs.pop(0))
  82. class detached_celeryd(object):
  83. option_list = OPTION_LIST
  84. usage = '%prog [options] [celeryd options]'
  85. version = __version__
  86. description = ('Detaches Celery worker nodes. See `celeryd --help` '
  87. 'for the list of supported worker arguments.')
  88. command = sys.executable
  89. execv_path = sys.executable
  90. execv_argv = ['-m', 'celery.bin.celeryd']
  91. def Parser(self, prog_name):
  92. return PartialOptionParser(prog=prog_name,
  93. option_list=self.option_list,
  94. usage=self.usage,
  95. description=self.description,
  96. version=self.version)
  97. def parse_options(self, prog_name, argv):
  98. parser = self.Parser(prog_name)
  99. options, values = parser.parse_args(argv)
  100. if options.logfile:
  101. parser.leftovers.append('--logfile=%s' % (options.logfile, ))
  102. if options.pidfile:
  103. parser.leftovers.append('--pidfile=%s' % (options.pidfile, ))
  104. return options, values, parser.leftovers
  105. def execute_from_commandline(self, argv=None):
  106. if argv is None:
  107. argv = sys.argv
  108. config = []
  109. seen_cargs = 0
  110. for arg in argv:
  111. if seen_cargs:
  112. config.append(arg)
  113. else:
  114. if arg == '--':
  115. seen_cargs = 1
  116. config.append(arg)
  117. prog_name = os.path.basename(argv[0])
  118. options, values, leftovers = self.parse_options(prog_name, argv[1:])
  119. sys.exit(detach(path=self.execv_path,
  120. argv=self.execv_argv + leftovers + config,
  121. **vars(options)))
  122. def main():
  123. detached_celeryd().execute_from_commandline()
  124. if __name__ == '__main__': # pragma: no cover
  125. main()