123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- # -*- coding: utf-8 -*-
- """Program used to daemonize the worker.
- Using :func:`os.execv` as forking and multiprocessing
- leads to weird issues (it was a long time ago now, but it
- could have something to do with the threading mutex bug)
- """
- from __future__ import absolute_import, unicode_literals
- import argparse
- import os
- import sys
- import celery
- from celery.bin.base import daemon_options
- from celery.platforms import EX_FAILURE, detached
- from celery.utils.log import get_logger
- from celery.utils.nodenames import default_nodename, node_format
- __all__ = ('detached_celeryd', 'detach')
- logger = get_logger(__name__)
- C_FAKEFORK = os.environ.get('C_FAKEFORK')
- def detach(path, argv, logfile=None, pidfile=None, uid=None,
- gid=None, umask=None, workdir=None, fake=False, app=None,
- executable=None, hostname=None):
- """Detach program by argv'."""
- hostname = default_nodename(hostname)
- logfile = node_format(logfile, hostname)
- pidfile = node_format(pidfile, hostname)
- fake = 1 if C_FAKEFORK else fake
- with detached(logfile, pidfile, uid, gid, umask, workdir, fake,
- after_forkers=False):
- try:
- if executable is not None:
- path = executable
- os.execv(path, [path] + argv)
- except Exception: # pylint: disable=broad-except
- if app is None:
- from celery import current_app
- app = current_app
- app.log.setup_logging_subsystem(
- 'ERROR', logfile, hostname=hostname)
- logger.critical("Can't exec %r", ' '.join([path] + argv),
- exc_info=True)
- return EX_FAILURE
- class detached_celeryd(object):
- """Daemonize the celery worker process."""
- usage = '%(prog)s [options] [celeryd options]'
- version = celery.VERSION_BANNER
- description = ('Detaches Celery worker nodes. See `celery worker --help` '
- 'for the list of supported worker arguments.')
- command = sys.executable
- execv_path = sys.executable
- execv_argv = ['-m', 'celery', 'worker']
- def __init__(self, app=None):
- self.app = app
- def create_parser(self, prog_name):
- parser = argparse.ArgumentParser(
- prog=prog_name,
- usage=self.usage,
- description=self.description,
- )
- self._add_version_argument(parser)
- self.add_arguments(parser)
- return parser
- def _add_version_argument(self, parser):
- parser.add_argument(
- '--version', action='version', version=self.version,
- )
- def parse_options(self, prog_name, argv):
- parser = self.create_parser(prog_name)
- options, leftovers = parser.parse_known_args(argv)
- if options.logfile:
- leftovers.append('--logfile={0}'.format(options.logfile))
- if options.pidfile:
- leftovers.append('--pidfile={0}'.format(options.pidfile))
- if options.hostname:
- leftovers.append('--hostname={0}'.format(options.hostname))
- return options, leftovers
- def execute_from_commandline(self, argv=None):
- argv = sys.argv if argv is None else argv
- prog_name = os.path.basename(argv[0])
- config, argv = self._split_command_line_config(argv)
- options, leftovers = self.parse_options(prog_name, argv[1:])
- sys.exit(detach(
- app=self.app, path=self.execv_path,
- argv=self.execv_argv + leftovers + config,
- **vars(options)
- ))
- def _split_command_line_config(self, argv):
- config = list(self._extract_command_line_config(argv))
- try:
- argv = argv[:argv.index('--')]
- except ValueError:
- pass
- return config, argv
- def _extract_command_line_config(self, argv):
- # Extracts command-line config appearing after '--':
- # celery worker -l info -- worker.prefetch_multiplier=10
- # This to make sure argparse doesn't gobble it up.
- seen_cargs = 0
- for arg in argv:
- if seen_cargs:
- yield arg
- else:
- if arg == '--':
- seen_cargs = 1
- yield arg
- def add_arguments(self, parser):
- daemon_options(parser, default_pidfile='celeryd.pid')
- parser.add_argument('--workdir', default=None)
- parser.add_argument('-n', '--hostname')
- parser.add_argument(
- '--fake',
- action='store_true', default=False,
- help="Don't fork (for debugging purposes)",
- )
- def main(app=None):
- detached_celeryd(app).execute_from_commandline()
- if __name__ == '__main__': # pragma: no cover
- main()
|