| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 | import atexitimport loggingimport multiprocessingimport osimport socketimport sysfrom celery import __version__from celery import platformsfrom celery import signalsfrom celery.app import app_or_defaultfrom celery.exceptions import ImproperlyConfiguredfrom celery.utils import get_full_cls_name, LOG_LEVELSfrom celery.worker import WorkControllerSTARTUP_INFO_FMT = """Configuration ->    . broker -> %(conninfo)s    . queues ->%(queues)s    . concurrency -> %(concurrency)s    . loader -> %(loader)s    . logfile -> %(logfile)s@%(loglevel)s    . events -> %(events)s    . beat -> %(celerybeat)s%(tasks)s""".strip()TASK_LIST_FMT = """    . tasks ->\n%s"""class Worker(object):    WorkController = WorkController    def __init__(self, concurrency=None, loglevel=None, logfile=None,            hostname=None, discard=False, run_clockservice=False,            schedule=None, task_time_limit=None, task_soft_time_limit=None,            max_tasks_per_child=None, queues=None, events=False, db=None,            include=None, app=None, pidfile=None, **kwargs):        self.app = app = app_or_default(app)        self.concurrency = (concurrency or                            app.conf.CELERYD_CONCURRENCY or                            multiprocessing.cpu_count())        self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL        self.logfile = logfile or app.conf.CELERYD_LOG_FILE        self.hostname = hostname or socket.gethostname()        self.discard = discard        self.run_clockservice = run_clockservice        self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME        self.events = events        self.task_time_limit = (task_time_limit or                                app.conf.CELERYD_TASK_TIME_LIMIT)        self.task_soft_time_limit = (task_soft_time_limit or                                     app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)        self.max_tasks_per_child = (max_tasks_per_child or                                    app.conf.CELERYD_MAX_TASKS_PER_CHILD)        self.db = db        self.use_queues = queues or []        self.queues = None        self.include = include or []        self.pidfile = pidfile        self._isatty = sys.stdout.isatty()        if isinstance(self.use_queues, basestring):            self.use_queues = self.use_queues.split(",")        if isinstance(self.include, basestring):            self.include = self.include.split(",")        if not isinstance(self.loglevel, int):            self.loglevel = LOG_LEVELS[self.loglevel.upper()]    def run(self):        self.init_loader()        self.init_queues()        self.worker_init()        self.redirect_stdouts_to_logger()        print("celery@%s v%s is starting." % (self.hostname, __version__))        if getattr(os, "geteuid", None) and os.geteuid() == 0:            warnings.warn(                "Running celeryd with superuser privileges is not encouraged!")        if self.discard:            self.purge_messages()        # Dump configuration to screen so we have some basic information        # for when users sends bug reports.        print(self.startup_info())        self.set_process_status("Running...")        self.run_worker()    def on_listener_ready(self, listener):        signals.worker_ready.send(sender=listener)        print("celery@%s has started." % self.hostname)    def init_queues(self):        if self.use_queues:            create_missing = self.app.conf.CELERY_CREATE_MISSING_QUEUES            try:                self.app.amqp.queues.select_subset(self.use_queues,                                                   create_missing)            except KeyError, exc:                raise ImproperlyConfigured(                    "Trying to select queue subset of %r, but queue %s"                    "is not defined in CELERY_QUEUES. If you want to "                    "automatically declare unknown queues you have to "                    "enable CELERY_CREATE_MISSING_QUEUES" % (                        self.use_queues, exc))        self.queues = self.app.amqp.queues    def init_loader(self):        self.loader = self.app.loader        self.settings = self.app.conf        map(self.loader.import_module, self.include)    def redirect_stdouts_to_logger(self):        handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,                                                       logfile=self.logfile)        if not handled:            logger = self.app.log.get_default_logger()            self.app.log.redirect_stdouts_to_logger(logger,                                                    loglevel=logging.WARNING)    def purge_messages(self):        count = self.app.control.discard_all()        what = (not count or count > 1) and "messages" or "message"        print("discard: Erased %d %s from the queue.\n" % (count, what))    def worker_init(self):        # Run the worker init handler.        # (Usually imports task modules and such.)        self.loader.init_worker()    def tasklist(self, include_builtins=True):        from celery.registry import tasks        tasklist = tasks.keys()        if not include_builtins:            tasklist = filter(lambda s: not s.startswith("celery."),                              tasklist)        return TASK_LIST_FMT % "\n".join("\t. %s" % task                                            for task in sorted(tasklist))    def startup_info(self):        tasklist = ""        if self.loglevel <= logging.INFO:            include_builtins = self.loglevel <= logging.DEBUG            tasklist = self.tasklist(include_builtins=include_builtins)        return STARTUP_INFO_FMT % {            "conninfo": self.app.amqp.format_broker_info(),            "queues": self.queues.format(indent=8),            "concurrency": self.concurrency,            "loglevel": LOG_LEVELS[self.loglevel],            "logfile": self.logfile or "[stderr]",            "celerybeat": self.run_clockservice and "ON" or "OFF",            "events": self.events and "ON" or "OFF",            "tasks": tasklist,            "loader": get_full_cls_name(self.loader.__class__),        }    def run_worker(self):        if self.pidfile:            pidlock = platforms.create_pidlock(self.pidfile).acquire()            atexit.register(pidlock.release)        worker = self.WorkController(app=self.app,                                concurrency=self.concurrency,                                loglevel=self.loglevel,                                logfile=self.logfile,                                hostname=self.hostname,                                ready_callback=self.on_listener_ready,                                embed_clockservice=self.run_clockservice,                                schedule_filename=self.schedule,                                send_events=self.events,                                db=self.db,                                queues=self.queues,                                max_tasks_per_child=self.max_tasks_per_child,                                task_time_limit=self.task_time_limit,                                task_soft_time_limit=self.task_soft_time_limit)        self.install_platform_tweaks(worker)        worker.start()    def install_platform_tweaks(self, worker):        """Install platform specific tweaks and workarounds."""        if self.app.IS_OSX:            self.osx_proxy_detection_workaround()        # Install signal handler so SIGHUP restarts the worker.        if not self._isatty:            # only install HUP handler if detached from terminal,            # so closing the terminal window doesn't restart celeryd            # into the background.            if self.app.IS_OSX:                # OS X can't exec from a process using threads.                # See http://github.com/ask/celery/issues#issue/152                install_HUP_not_supported_handler(worker)            else:                install_worker_restart_handler(worker)        install_worker_term_handler(worker)        install_worker_int_handler(worker)        signals.worker_init.send(sender=worker)    def osx_proxy_detection_workaround(self):        """See http://github.com/ask/celery/issues#issue/161"""        os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")    def set_process_status(self, info):        info = "%s (%s)" % (info, platforms.strargv(sys.argv))        return platforms.set_mp_process_title("celeryd",                                              info=info,                                              hostname=self.hostname)def install_worker_int_handler(worker):    def _stop(signum, frame):        process_name = multiprocessing.current_process().name        if process_name == "MainProcess":            worker.logger.warn(                "celeryd: Hitting Ctrl+C again will terminate "                "all running tasks!")            install_worker_int_again_handler(worker)            worker.logger.warn("celeryd: Warm shutdown (%s)" % (                process_name))            worker.stop()        raise SystemExit()    platforms.install_signal_handler("SIGINT", _stop)def install_worker_int_again_handler(worker):    def _stop(signum, frame):        process_name = multiprocessing.current_process().name        if process_name == "MainProcess":            worker.logger.warn("celeryd: Cold shutdown (%s)" % (                process_name))            worker.terminate()        raise SystemExit()    platforms.install_signal_handler("SIGINT", _stop)def install_worker_term_handler(worker):    def _stop(signum, frame):        process_name = multiprocessing.current_process().name        if process_name == "MainProcess":            worker.logger.warn("celeryd: Warm shutdown (%s)" % (                process_name))            worker.stop()        raise SystemExit()    platforms.install_signal_handler("SIGTERM", _stop)def install_worker_restart_handler(worker):    def restart_worker_sig_handler(signum, frame):        """Signal handler restarting the current python program."""        worker.logger.warn("Restarting celeryd (%s)" % (            " ".join(sys.argv)))        worker.stop()        os.execv(sys.executable, [sys.executable] + sys.argv)    platforms.install_signal_handler("SIGHUP", restart_worker_sig_handler)def install_HUP_not_supported_handler(worker):    def warn_on_HUP_handler(signum, frame):        worker.logger.error("SIGHUP not supported: "            "Restarting with HUP is unstable on this platform!")    platforms.install_signal_handler("SIGHUP", warn_on_HUP_handler)def run_worker(*args, **kwargs):    return Worker(*args, **kwargs).run()
 |