celeryd_detach.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # -*- coding: utf-8 -*-
  2. """Program used to daemonize the worker.
  3. Using :func:`os.execv` as forking and multiprocessing
  4. leads to weird issues (it was a long time ago now, but it
  5. could have something to do with the threading mutex bug)
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import argparse
  9. import os
  10. import sys
  11. import celery
  12. from celery.bin.base import daemon_options
  13. from celery.platforms import EX_FAILURE, detached
  14. from celery.utils.log import get_logger
  15. from celery.utils.nodenames import default_nodename, node_format
  16. __all__ = ('detached_celeryd', 'detach')
  17. logger = get_logger(__name__)
  18. C_FAKEFORK = os.environ.get('C_FAKEFORK')
  19. def detach(path, argv, logfile=None, pidfile=None, uid=None,
  20. gid=None, umask=None, workdir=None, fake=False, app=None,
  21. executable=None, hostname=None):
  22. """Detach program by argv'."""
  23. hostname = default_nodename(hostname)
  24. logfile = node_format(logfile, hostname)
  25. pidfile = node_format(pidfile, hostname)
  26. fake = 1 if C_FAKEFORK else fake
  27. with detached(logfile, pidfile, uid, gid, umask, workdir, fake,
  28. after_forkers=False):
  29. try:
  30. if executable is not None:
  31. path = executable
  32. os.execv(path, [path] + argv)
  33. except Exception: # pylint: disable=broad-except
  34. if app is None:
  35. from celery import current_app
  36. app = current_app
  37. app.log.setup_logging_subsystem(
  38. 'ERROR', logfile, hostname=hostname)
  39. logger.critical("Can't exec %r", ' '.join([path] + argv),
  40. exc_info=True)
  41. return EX_FAILURE
  42. class detached_celeryd(object):
  43. """Daemonize the celery worker process."""
  44. usage = '%(prog)s [options] [celeryd options]'
  45. version = celery.VERSION_BANNER
  46. description = ('Detaches Celery worker nodes. See `celery worker --help` '
  47. 'for the list of supported worker arguments.')
  48. command = sys.executable
  49. execv_path = sys.executable
  50. execv_argv = ['-m', 'celery', 'worker']
  51. def __init__(self, app=None):
  52. self.app = app
  53. def create_parser(self, prog_name):
  54. parser = argparse.ArgumentParser(
  55. prog=prog_name,
  56. usage=self.usage,
  57. description=self.description,
  58. )
  59. self._add_version_argument(parser)
  60. self.add_arguments(parser)
  61. return parser
  62. def _add_version_argument(self, parser):
  63. parser.add_argument(
  64. '--version', action='version', version=self.version,
  65. )
  66. def parse_options(self, prog_name, argv):
  67. parser = self.create_parser(prog_name)
  68. options, leftovers = parser.parse_known_args(argv)
  69. if options.logfile:
  70. leftovers.append('--logfile={0}'.format(options.logfile))
  71. if options.pidfile:
  72. leftovers.append('--pidfile={0}'.format(options.pidfile))
  73. if options.hostname:
  74. leftovers.append('--hostname={0}'.format(options.hostname))
  75. return options, leftovers
  76. def execute_from_commandline(self, argv=None):
  77. argv = sys.argv if argv is None else argv
  78. prog_name = os.path.basename(argv[0])
  79. config, argv = self._split_command_line_config(argv)
  80. options, leftovers = self.parse_options(prog_name, argv[1:])
  81. sys.exit(detach(
  82. app=self.app, path=self.execv_path,
  83. argv=self.execv_argv + leftovers + config,
  84. **vars(options)
  85. ))
  86. def _split_command_line_config(self, argv):
  87. config = list(self._extract_command_line_config(argv))
  88. try:
  89. argv = argv[:argv.index('--')]
  90. except ValueError:
  91. pass
  92. return config, argv
  93. def _extract_command_line_config(self, argv):
  94. # Extracts command-line config appearing after '--':
  95. # celery worker -l info -- worker.prefetch_multiplier=10
  96. # This to make sure argparse doesn't gobble it up.
  97. seen_cargs = 0
  98. for arg in argv:
  99. if seen_cargs:
  100. yield arg
  101. else:
  102. if arg == '--':
  103. seen_cargs = 1
  104. yield arg
  105. def add_arguments(self, parser):
  106. daemon_options(parser, default_pidfile='celeryd.pid')
  107. parser.add_argument('--workdir', default=None)
  108. parser.add_argument('-n', '--hostname')
  109. parser.add_argument(
  110. '--fake',
  111. action='store_true', default=False,
  112. help="Don't fork (for debugging purposes)",
  113. )
  114. def main(app=None):
  115. detached_celeryd(app).execute_from_commandline()
  116. if __name__ == '__main__': # pragma: no cover
  117. main()