celeryd_detach.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # -*- coding: utf-8 -*-
  2. """Program used to daemonize the worker.
  3. Using :func:`os.execv` as forking and multiprocessing
  4. leads to weird issues (it was a long time ago now, but it
  5. could have something to do with the threading mutex bug)
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import celery
  9. import os
  10. import sys
  11. from optparse import OptionParser, BadOptionError
  12. from celery.platforms import EX_FAILURE, detached
  13. from celery.utils.log import get_logger
  14. from celery.utils.nodenames import default_nodename, node_format
  15. from celery.bin.base import daemon_options
  16. __all__ = ['detached_celeryd', 'detach']
  17. logger = get_logger(__name__)
  18. C_FAKEFORK = os.environ.get('C_FAKEFORK')
  19. def detach(path, argv, logfile=None, pidfile=None, uid=None,
  20. gid=None, umask=None, workdir=None, fake=False, app=None,
  21. executable=None, hostname=None):
  22. hostname = default_nodename(hostname)
  23. logfile = node_format(logfile, hostname)
  24. pidfile = node_format(pidfile, hostname)
  25. fake = 1 if C_FAKEFORK else fake
  26. with detached(logfile, pidfile, uid, gid, umask, workdir, fake,
  27. after_forkers=False):
  28. try:
  29. if executable is not None:
  30. path = executable
  31. os.execv(path, [path] + argv)
  32. except Exception:
  33. if app is None:
  34. from celery import current_app
  35. app = current_app
  36. app.log.setup_logging_subsystem(
  37. 'ERROR', logfile, hostname=hostname)
  38. logger.critical("Can't exec %r", ' '.join([path] + argv),
  39. exc_info=True)
  40. return EX_FAILURE
  41. class PartialOptionParser(OptionParser):
  42. def __init__(self, *args, **kwargs):
  43. self.leftovers = []
  44. OptionParser.__init__(self, *args, **kwargs)
  45. def _process_long_opt(self, rargs, values):
  46. arg = rargs.pop(0)
  47. if '=' in arg:
  48. opt, next_arg = arg.split('=', 1)
  49. rargs.insert(0, next_arg)
  50. had_explicit_value = True
  51. else:
  52. opt = arg
  53. had_explicit_value = False
  54. try:
  55. opt = self._match_long_opt(opt)
  56. option = self._long_opt.get(opt)
  57. except BadOptionError:
  58. option = None
  59. if option:
  60. if option.takes_value():
  61. nargs = option.nargs
  62. if len(rargs) < nargs:
  63. if nargs == 1:
  64. self.error('{0} requires an argument'.format(opt))
  65. else:
  66. self.error('{0} requires {1} arguments'.format(
  67. opt, nargs))
  68. elif nargs == 1:
  69. value = rargs.pop(0)
  70. else:
  71. value = tuple(rargs[0:nargs])
  72. del rargs[0:nargs]
  73. elif had_explicit_value:
  74. self.error('{0} option does not take a value'.format(opt))
  75. else:
  76. value = None
  77. option.process(opt, value, values, self)
  78. else:
  79. self.leftovers.append(arg)
  80. def _process_short_opts(self, rargs, values):
  81. arg = rargs[0]
  82. try:
  83. OptionParser._process_short_opts(self, rargs, values)
  84. except BadOptionError:
  85. self.leftovers.append(arg)
  86. if rargs and not rargs[0][0] == '-':
  87. self.leftovers.append(rargs.pop(0))
  88. class detached_celeryd(object):
  89. usage = '%prog [options] [celeryd options]'
  90. version = celery.VERSION_BANNER
  91. description = ('Detaches Celery worker nodes. See `celery worker --help` '
  92. 'for the list of supported worker arguments.')
  93. command = sys.executable
  94. execv_path = sys.executable
  95. execv_argv = ['-m', 'celery', 'worker']
  96. def __init__(self, app=None):
  97. self.app = app
  98. def create_parser(self, prog_name):
  99. p = PartialOptionParser(
  100. prog=prog_name,
  101. usage=self.usage,
  102. description=self.description,
  103. version=self.version,
  104. )
  105. self.prepare_arguments(p)
  106. return p
  107. def parse_options(self, prog_name, argv):
  108. parser = self.create_parser(prog_name)
  109. options, values = parser.parse_args(argv)
  110. if options.logfile:
  111. parser.leftovers.append('--logfile={0}'.format(options.logfile))
  112. if options.pidfile:
  113. parser.leftovers.append('--pidfile={0}'.format(options.pidfile))
  114. if options.hostname:
  115. parser.leftovers.append('--hostname={0}'.format(options.hostname))
  116. return options, values, parser.leftovers
  117. def execute_from_commandline(self, argv=None):
  118. argv = sys.argv if argv is None else argv
  119. config = []
  120. seen_cargs = 0
  121. for arg in argv:
  122. if seen_cargs:
  123. config.append(arg)
  124. else:
  125. if arg == '--':
  126. seen_cargs = 1
  127. config.append(arg)
  128. prog_name = os.path.basename(argv[0])
  129. options, values, leftovers = self.parse_options(prog_name, argv[1:])
  130. sys.exit(detach(
  131. app=self.app, path=self.execv_path,
  132. argv=self.execv_argv + leftovers + config,
  133. **vars(options)
  134. ))
  135. def prepare_arguments(self, parser):
  136. daemon_options(parser, default_pidfile='celeryd.pid')
  137. parser.add_option('--workdir', default=None)
  138. parser.add_option('-n', '--hostname')
  139. parser.add_option(
  140. '--fake',
  141. default=False, action='store_true',
  142. help="Don't fork (for debugging purposes)",
  143. )
  144. def main(app=None):
  145. detached_celeryd(app).execute_from_commandline()
  146. if __name__ == '__main__': # pragma: no cover
  147. main()