123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- import atexit
- import logging
- import multiprocessing
- import os
- import socket
- import sys
- from celery import __version__
- from celery import platforms
- from celery import signals
- from celery.app import app_or_default
- from celery.exceptions import ImproperlyConfigured
- from celery.utils import get_full_cls_name, LOG_LEVELS
- from celery.worker import WorkController
- STARTUP_INFO_FMT = """
- Configuration ->
- . broker -> %(conninfo)s
- . queues ->
- %(queues)s
- . concurrency -> %(concurrency)s
- . loader -> %(loader)s
- . logfile -> %(logfile)s@%(loglevel)s
- . events -> %(events)s
- . beat -> %(celerybeat)s
- %(tasks)s
- """.strip()
- TASK_LIST_FMT = """ . tasks ->\n%s"""
- class Worker(object):
- WorkController = WorkController
- def __init__(self, concurrency=None, loglevel=None, logfile=None,
- hostname=None, discard=False, run_clockservice=False,
- schedule=None, task_time_limit=None, task_soft_time_limit=None,
- max_tasks_per_child=None, queues=None, events=False, db=None,
- include=None, app=None, pidfile=None, **kwargs):
- self.app = app = app_or_default(app)
- self.concurrency = (concurrency or
- app.conf.CELERYD_CONCURRENCY or
- multiprocessing.cpu_count())
- self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
- self.logfile = logfile or app.conf.CELERYD_LOG_FILE
- self.hostname = hostname or socket.gethostname()
- self.discard = discard
- self.run_clockservice = run_clockservice
- self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
- self.events = events
- self.task_time_limit = (task_time_limit or
- app.conf.CELERYD_TASK_TIME_LIMIT)
- self.task_soft_time_limit = (task_soft_time_limit or
- app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
- self.max_tasks_per_child = (max_tasks_per_child or
- app.conf.CELERYD_MAX_TASKS_PER_CHILD)
- self.db = db
- self.use_queues = queues or []
- self.queues = None
- self.include = include or []
- self.pidfile = pidfile
- self._isatty = sys.stdout.isatty()
- if isinstance(self.use_queues, basestring):
- self.use_queues = self.use_queues.split(",")
- if isinstance(self.include, basestring):
- self.include = self.include.split(",")
- if not isinstance(self.loglevel, int):
- self.loglevel = LOG_LEVELS[self.loglevel.upper()]
- def run(self):
- self.init_loader()
- self.init_queues()
- self.worker_init()
- self.redirect_stdouts_to_logger()
- print("celery@%s v%s is starting." % (self.hostname, __version__))
- if getattr(os, "geteuid", None) and os.geteuid() == 0:
- warnings.warn(
- "Running celeryd with superuser privileges is not encouraged!")
- if self.discard:
- self.purge_messages()
- # Dump configuration to screen so we have some basic information
- # for when users sends bug reports.
- print(self.startup_info())
- self.set_process_status("Running...")
- self.run_worker()
- def on_listener_ready(self, listener):
- signals.worker_ready.send(sender=listener)
- print("celery@%s has started." % self.hostname)
- def init_queues(self):
- if self.use_queues:
- create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES
- try:
- self.app.amqp.queues.select_subset(self.use_queues,
- create_missing)
- except KeyError, exc:
- raise ImproperlyConfigured(
- "Trying to select queue subset of %r, but queue %s"
- "is not defined in CELERY_QUEUES. If you want to "
- "automatically declare unknown queues you have to "
- "enable CELERY_CREATE_MISSING_QUEUES" % (
- self.use_queues, exc))
- self.queues = self.app.amqp.queues
- def init_loader(self):
- self.loader = self.app.loader
- self.settings = self.app.conf
- map(self.loader.import_module, self.include)
- def redirect_stdouts_to_logger(self):
- handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
- logfile=self.logfile)
- if not handled:
- logger = self.app.log.get_default_logger()
- self.app.log.redirect_stdouts_to_logger(logger,
- loglevel=logging.WARNING)
- def purge_messages(self):
- count = self.app.control.discard_all()
- what = (not count or count > 1) and "messages" or "message"
- print("discard: Erased %d %s from the queue.\n" % (count, what))
- def worker_init(self):
- # Run the worker init handler.
- # (Usually imports task modules and such.)
- self.loader.init_worker()
- def tasklist(self, include_builtins=True):
- from celery.registry import tasks
- tasklist = tasks.keys()
- if not include_builtins:
- tasklist = filter(lambda s: not s.startswith("celery."),
- tasklist)
- return TASK_LIST_FMT % "\n".join("\t. %s" % task
- for task in sorted(tasklist))
- def startup_info(self):
- tasklist = ""
- if self.loglevel <= logging.INFO:
- include_builtins = self.loglevel <= logging.DEBUG
- tasklist = self.tasklist(include_builtins=include_builtins)
- return STARTUP_INFO_FMT % {
- "conninfo": self.app.amqp.format_broker_info(),
- "queues": self.queues.format(indent=8),
- "concurrency": self.concurrency,
- "loglevel": LOG_LEVELS[self.loglevel],
- "logfile": self.logfile or "[stderr]",
- "celerybeat": self.run_clockservice and "ON" or "OFF",
- "events": self.events and "ON" or "OFF",
- "tasks": tasklist,
- "loader": get_full_cls_name(self.loader.__class__),
- }
- def run_worker(self):
- if self.pidfile:
- pidlock = platforms.create_pidlock(self.pidfile).acquire()
- atexit.register(pidlock.release)
- worker = self.WorkController(app=self.app,
- concurrency=self.concurrency,
- loglevel=self.loglevel,
- logfile=self.logfile,
- hostname=self.hostname,
- ready_callback=self.on_listener_ready,
- embed_clockservice=self.run_clockservice,
- schedule_filename=self.schedule,
- send_events=self.events,
- db=self.db,
- queues=self.queues,
- max_tasks_per_child=self.max_tasks_per_child,
- task_time_limit=self.task_time_limit,
- task_soft_time_limit=self.task_soft_time_limit)
- self.install_platform_tweaks(worker)
- worker.start()
- def install_platform_tweaks(self, worker):
- """Install platform specific tweaks and workarounds."""
- if self.app.IS_OSX:
- self.osx_proxy_detection_workaround()
- # Install signal handler so SIGHUP restarts the worker.
- if not self._isatty:
- # only install HUP handler if detached from terminal,
- # so closing the terminal window doesn't restart celeryd
- # into the background.
- if self.app.IS_OSX:
- # OS X can't exec from a process using threads.
- # See http://github.com/ask/celery/issues#issue/152
- install_HUP_not_supported_handler(worker)
- else:
- install_worker_restart_handler(worker)
- install_worker_term_handler(worker)
- install_worker_int_handler(worker)
- signals.worker_init.send(sender=worker)
- def osx_proxy_detection_workaround(self):
- """See http://github.com/ask/celery/issues#issue/161"""
- os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
- def set_process_status(self, info):
- info = "%s (%s)" % (info, platforms.strargv(sys.argv))
- return platforms.set_mp_process_title("celeryd",
- info=info,
- hostname=self.hostname)
- def install_worker_int_handler(worker):
- def _stop(signum, frame):
- process_name = multiprocessing.current_process().name
- if process_name == "MainProcess":
- worker.logger.warn(
- "celeryd: Hitting Ctrl+C again will terminate "
- "all running tasks!")
- install_worker_int_again_handler(worker)
- worker.logger.warn("celeryd: Warm shutdown (%s)" % (
- process_name))
- worker.stop()
- raise SystemExit()
- platforms.install_signal_handler("SIGINT", _stop)
- def install_worker_int_again_handler(worker):
- def _stop(signum, frame):
- process_name = multiprocessing.current_process().name
- if process_name == "MainProcess":
- worker.logger.warn("celeryd: Cold shutdown (%s)" % (
- process_name))
- worker.terminate()
- raise SystemExit()
- platforms.install_signal_handler("SIGINT", _stop)
- def install_worker_term_handler(worker):
- def _stop(signum, frame):
- process_name = multiprocessing.current_process().name
- if process_name == "MainProcess":
- worker.logger.warn("celeryd: Warm shutdown (%s)" % (
- process_name))
- worker.stop()
- raise SystemExit()
- platforms.install_signal_handler("SIGTERM", _stop)
- def install_worker_restart_handler(worker):
- def restart_worker_sig_handler(signum, frame):
- """Signal handler restarting the current python program."""
- worker.logger.warn("Restarting celeryd (%s)" % (
- " ".join(sys.argv)))
- worker.stop()
- os.execv(sys.executable, [sys.executable] + sys.argv)
- platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)
- def install_HUP_not_supported_handler(worker):
- def warn_on_HUP_handler(signum, frame):
- worker.logger.error("SIGHUP not supported: "
- "Restarting with HUP is unstable on this platform!")
- platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)
- def run_worker(*args, **kwargs):
- return Worker(*args, **kwargs).run()
|