|
@@ -11,6 +11,7 @@ from celery import signals
|
|
|
from celery.app import app_or_default
|
|
|
from celery.exceptions import ImproperlyConfigured
|
|
|
from celery.utils import get_full_cls_name, LOG_LEVELS
|
|
|
+from celery.utils import term
|
|
|
from celery.worker import WorkController
|
|
|
|
|
|
|
|
@@ -37,7 +38,8 @@ class Worker(object):
|
|
|
hostname=None, discard=False, run_clockservice=False,
|
|
|
schedule=None, task_time_limit=None, task_soft_time_limit=None,
|
|
|
max_tasks_per_child=None, queues=None, events=False, db=None,
|
|
|
- include=None, app=None, pidfile=None, **kwargs):
|
|
|
+ include=None, app=None, pidfile=None,
|
|
|
+ redirect_stdouts=None, redirect_stdouts_level=None, **kwargs):
|
|
|
self.app = app = app_or_default(app)
|
|
|
self.concurrency = (concurrency or
|
|
|
app.conf.CELERYD_CONCURRENCY or
|
|
@@ -55,12 +57,17 @@ class Worker(object):
|
|
|
app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
|
|
|
self.max_tasks_per_child = (max_tasks_per_child or
|
|
|
app.conf.CELERYD_MAX_TASKS_PER_CHILD)
|
|
|
+ self.redirect_stdouts = (redirect_stdouts or
|
|
|
+ app.conf.CELERY_REDIRECT_STDOUTS)
|
|
|
+ self.redirect_stdouts_level = (redirect_stdouts_level or
|
|
|
+ app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
|
|
|
self.db = db
|
|
|
self.use_queues = queues or []
|
|
|
self.queues = None
|
|
|
self.include = include or []
|
|
|
self.pidfile = pidfile
|
|
|
self._isatty = sys.stdout.isatty()
|
|
|
+ self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
|
|
|
|
|
|
if isinstance(self.use_queues, basestring):
|
|
|
self.use_queues = self.use_queues.split(",")
|
|
@@ -81,7 +88,8 @@ class Worker(object):
|
|
|
self.init_queues()
|
|
|
self.worker_init()
|
|
|
self.redirect_stdouts_to_logger()
|
|
|
- print("celery@%s v%s is starting." % (self.hostname, __version__))
|
|
|
+ print(str(self.colored.cyan(
|
|
|
+ "celery@%s v%s is starting." % (self.hostname, __version__))))
|
|
|
|
|
|
if getattr(os, "geteuid", None) and os.geteuid() == 0:
|
|
|
warnings.warn(
|
|
@@ -92,7 +100,7 @@ class Worker(object):
|
|
|
|
|
|
# Dump configuration to screen so we have some basic information
|
|
|
# for when users sends bug reports.
|
|
|
- print(self.startup_info())
|
|
|
+ print(str(self.colored.reset(" \n", self.startup_info())))
|
|
|
self.set_process_status("Running...")
|
|
|
|
|
|
self.run_worker()
|
|
@@ -126,8 +134,9 @@ class Worker(object):
|
|
|
logfile=self.logfile)
|
|
|
if not handled:
|
|
|
logger = self.app.log.get_default_logger()
|
|
|
- self.app.log.redirect_stdouts_to_logger(logger,
|
|
|
- loglevel=logging.WARNING)
|
|
|
+ if self.redirect_stdouts:
|
|
|
+ self.app.log.redirect_stdouts_to_logger(logger,
|
|
|
+ loglevel=self.redirect_stdouts_level)
|
|
|
|
|
|
def purge_messages(self):
|
|
|
count = self.app.control.discard_all()
|