123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- """celeryd
- .. program:: celeryd
- .. cmdoption:: -c, --concurrency
- Number of child processes processing the queue. The default
- is the number of CPUs available on your system.
- .. cmdoption:: -f, --logfile
- Path to log file. If no logfile is specified, ``stderr`` is used.
- .. cmdoption:: -l, --loglevel
- Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
- ``ERROR``, ``CRITICAL``, or ``FATAL``.
- .. cmdoption:: -p, --pidfile
- Path to pidfile.
- .. cmdoption:: -s, --statistics
- Turn on reporting of statistics (remember to flush the statistics message
- queue from time to time).
- .. cmdoption:: -d, --detach, --daemon
- Run in the background as a daemon.
- .. cmdoption:: -S, --supervised
- Restart the worker server if it dies.
- .. cmdoption:: --discard
- Discard all waiting tasks before the daemon is started.
- **WARNING**: This is unrecoverable, and the tasks will be
- deleted from the messaging server.
- .. cmdoption:: -u, --uid
- User-id to run ``celeryd`` as when in daemon mode.
- .. cmdoption:: -g, --gid
- Group-id to run ``celeryd`` as when in daemon mode.
- .. cmdoption:: --umask
- umask of the process when in daemon mode.
- .. cmdoption:: --workdir
- Directory to change to when in daemon mode.
- .. cmdoption:: --chroot
- Change root directory to this path when in daemon mode.
- """
- import os
- import sys
- from celery.loaders import current_loader
- from celery.loaders import settings
- from celery import __version__
- from celery.supervisor import OFASupervisor
- from celery.log import emergency_error
- from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
- from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
- from celery import conf
- from celery.task import discard_all
- from celery.worker import WorkController
- from celery import platform
- import multiprocessing
- import traceback
- import optparse
- USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
- settings.CELERY_STATISTICS = USE_STATISTICS
- STARTUP_INFO_FMT = """
- Configuration ->
- * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
- * Exchange -> %(exchange)s (%(exchange_type)s)
- * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
- * Concurrency -> %(concurrency)s
- * Statistics -> %(statistics)s
- """.strip()
- OPTION_LIST = (
- optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
- action="store", dest="concurrency", type="int",
- help="Number of child processes processing the queue."),
- optparse.make_option('--discard', default=False,
- action="store_true", dest="discard",
- help="Discard all waiting tasks before the server is started. "
- "WARNING: This is unrecoverable, and the tasks will be "
- "deleted from the messaging server."),
- optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
- action="store_true", dest="statistics",
- help="Collect statistics."),
- optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
- action="store", dest="logfile",
- help="Path to log file."),
- optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
- action="store", dest="loglevel",
- help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
- optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
- action="store", dest="pidfile",
- help="Path to pidfile."),
- optparse.make_option('-d', '--detach', '--daemon', default=False,
- action="store_true", dest="detach",
- help="Run in the background as a daemon."),
- optparse.make_option('-S', '--supervised', default=False,
- action="store_true", dest="supervised",
- help="Restart the worker server if it dies."),
- optparse.make_option('-u', '--uid', default=None,
- action="store", dest="uid",
- help="User-id to run celeryd as when in daemon mode."),
- optparse.make_option('-g', '--gid', default=None,
- action="store", dest="gid",
- help="Group-id to run celeryd as when in daemon mode."),
- optparse.make_option('--umask', default=0,
- action="store", type="int", dest="umask",
- help="umask of the process when in daemon mode."),
- optparse.make_option('--workdir', default=None,
- action="store", dest="working_directory",
- help="Directory to change to when in daemon mode."),
- optparse.make_option('--chroot', default=None,
- action="store", dest="chroot",
- help="Change root directory to this path when in daemon mode."),
- )
- def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
- loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
- pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
- supervised=False, working_directory=None, chroot=None,
- statistics=None, **kwargs):
- """Starts the celery worker server."""
- print("Celery %s is starting." % __version__)
-
-
-
- platform.reset_signal("SIGCLD")
- if statistics is not None:
- settings.CELERY_STATISTICS = statistics
- if not concurrency:
- concurrency = multiprocessing.cpu_count()
- if conf.CELERY_BACKEND == "database" \
- and settings.DATABASE_ENGINE == "sqlite3" and \
- concurrency > 1:
- import warnings
- warnings.warn("The sqlite3 database engine doesn't support "
- "concurrency. We'll be using a single process only.",
- UserWarning)
- concurrency = 1
-
- if not isinstance(loglevel, int):
- loglevel = LOG_LEVELS[loglevel.upper()]
- if not detach:
- logfile = None
- if discard:
- discarded_count = discard_all()
- what = discarded_count > 1 and "messages" or "message"
- print("discard: Erased %d %s from the queue.\n" % (
- discarded_count, what))
-
-
- print(STARTUP_INFO_FMT % {
- "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
- "host": getattr(settings, "AMQP_SERVER", "(default)"),
- "port": getattr(settings, "AMQP_PORT", "(default)"),
- "exchange": conf.AMQP_EXCHANGE,
- "exchange_type": conf.AMQP_EXCHANGE_TYPE,
- "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
- "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
- "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
- "concurrency": concurrency,
- "loglevel": loglevel,
- "pidfile": pidfile,
- "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
- })
- print("Celery has started.")
- if detach:
- from celery.log import setup_logger, redirect_stdouts_to_logger
- context = platform.create_daemon_context(logfile, pidfile,
- chroot_directory=chroot,
- working_directory=working_directory,
- umask=umask,
- uid=uid,
- gid=gid)
- context.open()
- logger = setup_logger(loglevel, logfile)
- redirect_stdouts_to_logger(logger, loglevel)
-
-
- current_loader.on_worker_init()
- def run_worker():
- worker = WorkController(concurrency=concurrency,
- loglevel=loglevel,
- logfile=logfile,
- is_detached=detach)
-
-
- install_worker_restart_handler(worker)
- try:
- worker.start()
- except Exception, e:
- emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
- e.__class__, e, traceback.format_exc()))
- try:
- if supervised:
- OFASupervisor(target=run_worker).start()
- else:
- run_worker()
- except:
- if detach:
- context.close()
- raise
- def install_worker_restart_handler(worker):
- def restart_worker_sig_handler(signum, frame):
- """Signal handler restarting the current python program."""
- worker.logger.info("Restarting celeryd (%s)" % (
- " ".join(sys.argv)))
- if worker.is_detached:
- pid = os.fork()
- if pid:
- worker.stop()
- sys.exit(0)
- else:
- worker.stop()
- os.execv(sys.executable, [sys.executable] + sys.argv)
- platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
- def parse_options(arguments):
- """Parse the available options to ``celeryd``."""
- parser = optparse.OptionParser(option_list=OPTION_LIST)
- options, values = parser.parse_args(arguments)
- return options
- if __name__ == "__main__":
- options = parse_options(sys.argv[1:])
- run_worker(**vars(options))
|