|
@@ -51,7 +51,7 @@ EXTRA_INFO_FMT = """
|
|
|
%(tasks)s
|
|
|
"""
|
|
|
|
|
|
-UNKNOWN_QUEUE_ERROR = """\
|
|
|
+UNKNOWN_QUEUE = """\
|
|
|
Trying to select queue subset of %r, but queue %s is not
|
|
|
defined in the CELERY_QUEUES setting.
|
|
|
|
|
@@ -69,7 +69,7 @@ class Worker(configurated):
|
|
|
redirect_stdouts = from_config()
|
|
|
redirect_stdouts_level = from_config()
|
|
|
|
|
|
- def __init__(self, hostname=None, discard=False, embed_clockservice=False,
|
|
|
+ def __init__(self, hostname=None, purge=False, beat=False,
|
|
|
queues=None, include=None, app=None, pidfile=None,
|
|
|
autoscale=None, autoreload=False, no_execv=False, **kwargs):
|
|
|
self.app = app = app_or_default(app or self.app)
|
|
@@ -86,14 +86,11 @@ class Worker(configurated):
|
|
|
self.concurrency = cpu_count()
|
|
|
except NotImplementedError:
|
|
|
self.concurrency = 2
|
|
|
- self.discard = discard
|
|
|
- self.embed_clockservice = embed_clockservice
|
|
|
- if self.app.IS_WINDOWS and self.embed_clockservice:
|
|
|
- self.die("-B option does not work on Windows. "
|
|
|
- "Please run celerybeat as a separate service.")
|
|
|
+ self.purge = purge
|
|
|
+ self.beat = beat
|
|
|
self.use_queues = [] if queues is None else queues
|
|
|
self.queues = None
|
|
|
- self.include = [] if include is None else include
|
|
|
+ self.include = include
|
|
|
self.pidfile = pidfile
|
|
|
self.autoscale = None
|
|
|
self.autoreload = autoreload
|
|
@@ -107,34 +104,29 @@ class Worker(configurated):
|
|
|
|
|
|
if isinstance(self.use_queues, basestring):
|
|
|
self.use_queues = self.use_queues.split(",")
|
|
|
- if isinstance(self.include, basestring):
|
|
|
- self.include = self.include.split(",")
|
|
|
-
|
|
|
- try:
|
|
|
- self.loglevel = mlevel(self.loglevel)
|
|
|
- except KeyError:
|
|
|
- self.die("Unknown level %r. Please use one of %s." % (
|
|
|
- self.loglevel,
|
|
|
- "|".join(l for l in LOG_LEVELS.keys()
|
|
|
- if isinstance(l, basestring))))
|
|
|
+ if self.include:
|
|
|
+ if isinstance(self.include, basestring):
|
|
|
+ self.include = self.include.split(",")
|
|
|
+ app.conf.CELERY_IMPORTS = tuple(
|
|
|
+ self.include) + tuple(app.conf.CELERY_IMPORTS)
|
|
|
+ self.loglevel = mlevel(self.loglevel)
|
|
|
|
|
|
def run(self):
|
|
|
- self.init_loader()
|
|
|
self.init_queues()
|
|
|
- self.worker_init()
|
|
|
+ self.app.loader.init_worker()
|
|
|
self.redirect_stdouts_to_logger()
|
|
|
|
|
|
if getattr(os, "getuid", None) and os.getuid() == 0:
|
|
|
warnings.warn(RuntimeWarning(
|
|
|
"Running celeryd with superuser privileges is discouraged!"))
|
|
|
|
|
|
- if self.discard:
|
|
|
+ if self.purge:
|
|
|
self.purge_messages()
|
|
|
|
|
|
# Dump configuration to screen so we have some basic information
|
|
|
# for when users sends bug reports.
|
|
|
print(str(self.colored.cyan(" \n", self.startup_info())) +
|
|
|
- str(self.colored.reset(self.extra_info())))
|
|
|
+ str(self.colored.reset(self.extra_info() or "")))
|
|
|
self.set_process_status("-active-")
|
|
|
|
|
|
try:
|
|
@@ -150,50 +142,34 @@ class Worker(configurated):
|
|
|
try:
|
|
|
self.app.select_queues(self.use_queues)
|
|
|
except KeyError, exc:
|
|
|
- raise ImproperlyConfigured(
|
|
|
- UNKNOWN_QUEUE_ERROR % (self.use_queues, exc))
|
|
|
-
|
|
|
- def init_loader(self):
|
|
|
- self.loader = self.app.loader
|
|
|
- self.settings = self.app.conf
|
|
|
- for module in self.include:
|
|
|
- self.loader.import_task_module(module)
|
|
|
+ raise ImproperlyConfigured(UNKNOWN_QUEUE % (self.use_queues, exc))
|
|
|
|
|
|
def redirect_stdouts_to_logger(self):
|
|
|
self.app.log.setup(self.loglevel, self.logfile,
|
|
|
- self.redirect_stdouts,
|
|
|
- self.redirect_stdouts_level)
|
|
|
+ self.redirect_stdouts, self.redirect_stdouts_level)
|
|
|
|
|
|
def purge_messages(self):
|
|
|
- count = self.app.control.discard_all()
|
|
|
- print("discard: Erased %d %s from the queue.\n" % (
|
|
|
+ count = self.app.control.purge()
|
|
|
+ print("purge: Erased %d %s from the queue.\n" % (
|
|
|
count, pluralize(count, "message")))
|
|
|
|
|
|
- def worker_init(self):
|
|
|
- # Run the worker init handler.
|
|
|
- # (Usually imports task modules and such.)
|
|
|
- self.loader.init_worker()
|
|
|
-
|
|
|
def tasklist(self, include_builtins=True):
|
|
|
- tasklist = self.app.tasks.keys()
|
|
|
+ tasks = self.app.tasks.keys()
|
|
|
if not include_builtins:
|
|
|
- tasklist = filter(lambda s: not s.startswith("celery."),
|
|
|
- tasklist)
|
|
|
- return "\n".join(" . %s" % task for task in sorted(tasklist))
|
|
|
+ tasks = filter(lambda s: not s.startswith("celery."), tasks)
|
|
|
+ return "\n".join(" . %s" % task for task in sorted(tasks))
|
|
|
|
|
|
def extra_info(self):
|
|
|
if self.loglevel <= logging.INFO:
|
|
|
include_builtins = self.loglevel <= logging.DEBUG
|
|
|
tasklist = self.tasklist(include_builtins=include_builtins)
|
|
|
return EXTRA_INFO_FMT % {"tasks": tasklist}
|
|
|
- return ""
|
|
|
|
|
|
def startup_info(self):
|
|
|
app = self.app
|
|
|
concurrency = self.concurrency
|
|
|
if self.autoscale:
|
|
|
- cmax, cmin = self.autoscale
|
|
|
- concurrency = "{min=%s, max=%s}" % (cmin, cmax)
|
|
|
+ concurrency = "{min=%s, max=%s}" % self.autoscale
|
|
|
return BANNER % {
|
|
|
"hostname": self.hostname,
|
|
|
"version": __version__,
|
|
@@ -201,24 +177,21 @@ class Worker(configurated):
|
|
|
"concurrency": concurrency,
|
|
|
"loglevel": LOG_LEVELS[self.loglevel],
|
|
|
"logfile": self.logfile or "[stderr]",
|
|
|
- "celerybeat": "ON" if self.embed_clockservice else "OFF",
|
|
|
+ "celerybeat": "ON" if self.beat else "OFF",
|
|
|
"events": "ON" if self.send_events else "OFF",
|
|
|
- "loader": qualname(self.loader),
|
|
|
+ "loader": qualname(self.app.loader),
|
|
|
"queues": app.amqp.queues.format(indent=18, indent_first=False),
|
|
|
}
|
|
|
|
|
|
def run_worker(self):
|
|
|
if self.pidfile:
|
|
|
- pidlock = platforms.create_pidlock(self.pidfile).acquire()
|
|
|
- atexit.register(pidlock.release)
|
|
|
+ platforms.create_pidlock(self.pidfile)
|
|
|
worker = self.WorkController(app=self.app,
|
|
|
- hostname=self.hostname,
|
|
|
- ready_callback=self.on_consumer_ready,
|
|
|
- embed_clockservice=self.embed_clockservice,
|
|
|
- autoscale=self.autoscale,
|
|
|
- autoreload=self.autoreload,
|
|
|
- no_execv=self.no_execv,
|
|
|
- **self.confopts_as_dict())
|
|
|
+ hostname=self.hostname,
|
|
|
+ ready_callback=self.on_consumer_ready, beat=self.beat,
|
|
|
+ autoscale=self.autoscale, autoreload=self.autoreload,
|
|
|
+ no_execv=self.no_execv,
|
|
|
+ **self.confopts_as_dict())
|
|
|
self.install_platform_tweaks(worker)
|
|
|
signals.worker_init.send(sender=worker)
|
|
|
worker.start()
|
|
@@ -250,26 +223,20 @@ class Worker(configurated):
|
|
|
os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
|
|
|
|
|
|
def set_process_status(self, info):
|
|
|
- info = "%s (%s)" % (info, platforms.strargv(sys.argv))
|
|
|
return platforms.set_mp_process_title("celeryd",
|
|
|
- info=info,
|
|
|
- hostname=self.hostname)
|
|
|
+ info="%s (%s)" % (info, platforms.strargv(sys.argv)),
|
|
|
+ hostname=self.hostname)
|
|
|
|
|
|
- def die(self, msg, exitcode=1):
|
|
|
- sys.stderr.write("Error: %s\n" % (msg, ))
|
|
|
- sys.exit(exitcode)
|
|
|
|
|
|
|
|
|
def _shutdown_handler(worker, sig="TERM", how="stop", exc=SystemExit,
|
|
|
- callback=None):
|
|
|
- types = {"terminate": "Cold", "stop": "Warm"}
|
|
|
+ callback=None, types={"terminate": "Cold", "stop": "Warm"}):
|
|
|
|
|
|
def _handle_request(signum, frame):
|
|
|
- process_name = current_process()._name
|
|
|
- if not process_name or process_name == "MainProcess":
|
|
|
+ if current_process()._name == "MainProcess":
|
|
|
if callback:
|
|
|
callback(worker)
|
|
|
- print("celeryd: %s shutdown (%s)" % (types[how], process_name, ))
|
|
|
+ print("celeryd: %s shutdown (MainProcess)" % types[how])
|
|
|
getattr(worker, how)(in_sighandler=True)
|
|
|
raise exc()
|
|
|
_handle_request.__name__ = "worker_" + how
|