celeryd_detach.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.bin.celeryd_detach
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Program used to daemonize the worker
  6. Using :func:`os.execv` because forking and multiprocessing
  7. leads to weird issues (it was a long time ago now, but it
  8. could have something to do with the threading mutex bug)
  9. """
  10. from __future__ import absolute_import, unicode_literals
  11. import celery
  12. import os
  13. import sys
  14. from optparse import OptionParser, BadOptionError
  15. from celery.platforms import EX_FAILURE, detached
  16. from celery.utils import default_nodename, node_format
  17. from celery.utils.log import get_logger
  18. from celery.bin.base import daemon_options
  19. __all__ = ['detached_celeryd', 'detach']
  20. logger = get_logger(__name__)
  21. C_FAKEFORK = os.environ.get('C_FAKEFORK')
  22. def detach(path, argv, logfile=None, pidfile=None, uid=None,
  23. gid=None, umask=None, working_directory=None, fake=False, app=None,
  24. executable=None, hostname=None):
  25. hostname = default_nodename(hostname)
  26. logfile = node_format(logfile, hostname)
  27. pidfile = node_format(pidfile, hostname)
  28. fake = 1 if C_FAKEFORK else fake
  29. with detached(logfile, pidfile, uid, gid, umask, working_directory, fake,
  30. after_forkers=False):
  31. try:
  32. if executable is not None:
  33. path = executable
  34. os.execv(path, [path] + argv)
  35. except Exception:
  36. if app is None:
  37. from celery import current_app
  38. app = current_app
  39. app.log.setup_logging_subsystem(
  40. 'ERROR', logfile, hostname=hostname)
  41. logger.critical("Can't exec %r", ' '.join([path] + argv),
  42. exc_info=True)
  43. return EX_FAILURE
  44. class PartialOptionParser(OptionParser):
  45. def __init__(self, *args, **kwargs):
  46. self.leftovers = []
  47. OptionParser.__init__(self, *args, **kwargs)
  48. def _process_long_opt(self, rargs, values):
  49. arg = rargs.pop(0)
  50. if '=' in arg:
  51. opt, next_arg = arg.split('=', 1)
  52. rargs.insert(0, next_arg)
  53. had_explicit_value = True
  54. else:
  55. opt = arg
  56. had_explicit_value = False
  57. try:
  58. opt = self._match_long_opt(opt)
  59. option = self._long_opt.get(opt)
  60. except BadOptionError:
  61. option = None
  62. if option:
  63. if option.takes_value():
  64. nargs = option.nargs
  65. if len(rargs) < nargs:
  66. if nargs == 1:
  67. self.error('{0} requires an argument'.format(opt))
  68. else:
  69. self.error('{0} requires {1} arguments'.format(
  70. opt, nargs))
  71. elif nargs == 1:
  72. value = rargs.pop(0)
  73. else:
  74. value = tuple(rargs[0:nargs])
  75. del rargs[0:nargs]
  76. elif had_explicit_value:
  77. self.error('{0} option does not take a value'.format(opt))
  78. else:
  79. value = None
  80. option.process(opt, value, values, self)
  81. else:
  82. self.leftovers.append(arg)
  83. def _process_short_opts(self, rargs, values):
  84. arg = rargs[0]
  85. try:
  86. OptionParser._process_short_opts(self, rargs, values)
  87. except BadOptionError:
  88. self.leftovers.append(arg)
  89. if rargs and not rargs[0][0] == '-':
  90. self.leftovers.append(rargs.pop(0))
  91. class detached_celeryd(object):
  92. usage = '%prog [options] [celeryd options]'
  93. version = celery.VERSION_BANNER
  94. description = ('Detaches Celery worker nodes. See `celery worker --help` '
  95. 'for the list of supported worker arguments.')
  96. command = sys.executable
  97. execv_path = sys.executable
  98. execv_argv = ['-m', 'celery', 'worker']
  99. def __init__(self, app=None):
  100. self.app = app
  101. def create_parser(self, prog_name):
  102. p = PartialOptionParser(
  103. prog=prog_name,
  104. usage=self.usage,
  105. description=self.description,
  106. version=self.version,
  107. )
  108. self.prepare_arguments(p)
  109. return p
  110. def parse_options(self, prog_name, argv):
  111. parser = self.create_parser(prog_name)
  112. options, values = parser.parse_args(argv)
  113. if options.logfile:
  114. parser.leftovers.append('--logfile={0}'.format(options.logfile))
  115. if options.pidfile:
  116. parser.leftovers.append('--pidfile={0}'.format(options.pidfile))
  117. if options.hostname:
  118. parser.leftovers.append('--hostname={0}'.format(options.hostname))
  119. return options, values, parser.leftovers
  120. def execute_from_commandline(self, argv=None):
  121. argv = sys.argv if argv is None else argv
  122. config = []
  123. seen_cargs = 0
  124. for arg in argv:
  125. if seen_cargs:
  126. config.append(arg)
  127. else:
  128. if arg == '--':
  129. seen_cargs = 1
  130. config.append(arg)
  131. prog_name = os.path.basename(argv[0])
  132. options, values, leftovers = self.parse_options(prog_name, argv[1:])
  133. sys.exit(detach(
  134. app=self.app, path=self.execv_path,
  135. argv=self.execv_argv + leftovers + config,
  136. **vars(options)
  137. ))
  138. def prepare_arguments(self, parser):
  139. daemon_options(parser, default_pidfile='celeryd.pid')
  140. parser.add_option('--workdir', default=None, dest='working_directory')
  141. parser.add_option('-n', '--hostname')
  142. parser.add_option(
  143. '--fake',
  144. default=False, action='store_true', dest='fake',
  145. help="Don't fork (for debugging purposes)",
  146. )
  147. def main(app=None):
  148. detached_celeryd(app).execute_from_commandline()
  149. if __name__ == '__main__': # pragma: no cover
  150. main()