Sfoglia il codice sorgente

Added CELERY_REDIRECT_STDOUTS + CELERYD_REDIRECT_STDOUTS_LEVEL settings.CELERY_REDIRECT_STDOUTS is used by celeryd and celerybeat, and if enabled all output to stdout and stderr will be redirected to the current logger.CELERY_REDIRECT_STDOUTS_LEVEL decides the loglevel used and is WARNING by default.

Ask Solem 14 anni fa
parent
commit
ad7035f169
5 ha cambiato i file con 57 aggiunte e 12 eliminazioni
  1. 11 5
      celery/apps/beat.py
  2. 16 5
      celery/apps/worker.py
  3. 4 0
      celery/conf.py
  4. 3 1
      celery/log.py
  5. 23 1
      docs/configuration.rst

+ 11 - 5
celery/apps/beat.py

@@ -27,7 +27,8 @@ class Beat(object):
 
 
     def __init__(self, loglevel=None, logfile=None, schedule=None,
     def __init__(self, loglevel=None, logfile=None, schedule=None,
             max_interval=None, scheduler_cls=None, defaults=None,
             max_interval=None, scheduler_cls=None, defaults=None,
-            socket_timeout=30, **kwargs):
+            socket_timeout=30, redirect_stdouts=None,
+            redirect_stdouts_level=None, **kwargs):
         """Starts the celerybeat task scheduler."""
         """Starts the celerybeat task scheduler."""
 
 
         if defaults is None:
         if defaults is None:
@@ -41,13 +42,16 @@ class Beat(object):
         self.max_interval = max_interval
         self.max_interval = max_interval
         self.socket_timeout = socket_timeout
         self.socket_timeout = socket_timeout
         self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
         self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
+        self.redirect_stdouts = redirect_stdouts or defaults.REDIRECT_STDOUTS
+        self.redirect_stdouts_level = (redirect_stdouts_level or
+                                       defaults.REDIRECT_STDOUTS_LEVEL)
 
 
         if not isinstance(self.loglevel, int):
         if not isinstance(self.loglevel, int):
             self.loglevel = LOG_LEVELS[self.loglevel.upper()]
             self.loglevel = LOG_LEVELS[self.loglevel.upper()]
 
 
     def run(self):
     def run(self):
         logger = self.setup_logging()
         logger = self.setup_logging()
-        print(str(self.colored.magenta(
+        print(str(self.colored.cyan(
                     "celerybeat v%s is starting." % __version__)))
                     "celerybeat v%s is starting." % __version__)))
         self.init_loader()
         self.init_loader()
         self.set_process_title()
         self.set_process_title()
@@ -59,7 +63,9 @@ class Beat(object):
                                               logfile=self.logfile)
                                               logfile=self.logfile)
         if not handled:
         if not handled:
             logger = log.get_default_logger(name="celery.beat")
             logger = log.get_default_logger(name="celery.beat")
-            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+            if self.redirect_stdouts:
+                log.redirect_stdouts_to_logger(logger,
+                        loglevel=self.redirect_stdouts_level)
         return logger
         return logger
 
 
     def start_scheduler(self, logger=None):
     def start_scheduler(self, logger=None):
@@ -69,8 +75,8 @@ class Beat(object):
                             scheduler_cls=self.scheduler_cls,
                             scheduler_cls=self.scheduler_cls,
                             schedule_filename=self.schedule)
                             schedule_filename=self.schedule)
 
 
-        print(str(c.blue("__    ", c.red("-"),
-                  c.blue("    ... __   "), c.red("-"),
+        print(str(c.blue("__    ", c.magenta("-"),
+                  c.blue("    ... __   "), c.magenta("-"),
                   c.blue("        _\n"),
                   c.blue("        _\n"),
                   c.reset(self.startup_info(beat)))))
                   c.reset(self.startup_info(beat)))))
         if self.socket_timeout:
         if self.socket_timeout:

+ 16 - 5
celery/apps/worker.py

@@ -13,7 +13,9 @@ from celery import signals
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
 from celery.routes import Router
 from celery.routes import Router
 from celery.task import discard_all
 from celery.task import discard_all
-from celery.utils import info, get_full_cls_name, LOG_LEVELS
+from celery.utils import get_full_cls_name, LOG_LEVELS
+from celery.utils import info
+from celery.utils import term
 from celery.worker import WorkController
 from celery.worker import WorkController
 
 
 
 
@@ -43,7 +45,8 @@ class Worker(object):
             hostname=None, discard=False, run_clockservice=False,
             hostname=None, discard=False, run_clockservice=False,
             schedule=None, task_time_limit=None, task_soft_time_limit=None,
             schedule=None, task_time_limit=None, task_soft_time_limit=None,
             max_tasks_per_child=None, queues=None, events=False, db=None,
             max_tasks_per_child=None, queues=None, events=False, db=None,
-            include=None, defaults=None, pidfile=None, **kwargs):
+            include=None, defaults=None, pidfile=None,
+            redirect_stdouts=None, redirect_stdouts_level=None, **kwargs):
         if defaults is None:
         if defaults is None:
             from celery import conf
             from celery import conf
             defaults = conf
             defaults = conf
@@ -64,11 +67,16 @@ class Worker(object):
                                      defaults.CELERYD_TASK_SOFT_TIME_LIMIT)
                                      defaults.CELERYD_TASK_SOFT_TIME_LIMIT)
         self.max_tasks_per_child = (max_tasks_per_child or
         self.max_tasks_per_child = (max_tasks_per_child or
                                     defaults.CELERYD_MAX_TASKS_PER_CHILD)
                                     defaults.CELERYD_MAX_TASKS_PER_CHILD)
+        self.redirect_stdouts = (redirect_stdouts or
+                                 defaults.REDIRECT_STDOUTS)
+        self.redirect_stdouts_level = (redirect_stdouts_level or
+                                       defaults.REDIRECT_STDOUTS_LEVEL)
         self.db = db
         self.db = db
         self.queues = queues or []
         self.queues = queues or []
         self.include = include or []
         self.include = include or []
         self.pidfile = pidfile
         self.pidfile = pidfile
         self._isatty = sys.stdout.isatty()
         self._isatty = sys.stdout.isatty()
+        self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
 
 
         if isinstance(self.queues, basestring):
         if isinstance(self.queues, basestring):
             self.queues = self.queues.split(",")
             self.queues = self.queues.split(",")
@@ -89,7 +97,8 @@ class Worker(object):
         self.init_queues()
         self.init_queues()
         self.worker_init()
         self.worker_init()
         self.redirect_stdouts_to_logger()
         self.redirect_stdouts_to_logger()
-        print("celery@%s v%s is starting." % (self.hostname, __version__))
+        print(str(self.colored.cyan(
+                "celery@%s v%s is starting." % (self.hostname, __version__))))
 
 
         if getattr(os, "geteuid", None) and os.geteuid() == 0:
         if getattr(os, "geteuid", None) and os.geteuid() == 0:
             warnings.warn(
             warnings.warn(
@@ -104,7 +113,7 @@ class Worker(object):
 
 
         # Dump configuration to screen so we have some basic information
         # Dump configuration to screen so we have some basic information
         # for when users sends bug reports.
         # for when users sends bug reports.
-        print(self.startup_info())
+        print(str(self.colored.reset(" \n", self.startup_info())))
         self.set_process_status("Running...")
         self.set_process_status("Running...")
 
 
         self.run_worker()
         self.run_worker()
@@ -143,7 +152,9 @@ class Worker(object):
         # Redirect stdout/stderr to our logger.
         # Redirect stdout/stderr to our logger.
         if not handled:
         if not handled:
             logger = log.get_default_logger()
             logger = log.get_default_logger()
-            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+            if self.redirect_stdouts:
+                log.redirect_stdouts_to_logger(logger,
+                        loglevel=self.redirect_stdouts_level)
 
 
     def purge_messages(self):
     def purge_messages(self):
         discarded_count = discard_all()
         discarded_count = discard_all()

+ 4 - 0
celery/conf.py

@@ -31,6 +31,8 @@ _DEFAULTS = {
     "BROKER_USER": "guest",
     "BROKER_USER": "guest",
     "BROKER_PASSWORD": "guest",
     "BROKER_PASSWORD": "guest",
     "BROKER_VHOST": "/",
     "BROKER_VHOST": "/",
+    "CELERY_REDIRECT_STDOUTS": True,
+    "CELERY_REDIRECT_STDOUTS_LEVEL": "WARNING",
     "CELERY_RESULT_BACKEND": "database",
     "CELERY_RESULT_BACKEND": "database",
     "CELERY_ALWAYS_EAGER": False,
     "CELERY_ALWAYS_EAGER": False,
     "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
     "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
@@ -139,6 +141,8 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     m.IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
     m.IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
     m.TRACK_STARTED = _get("CELERY_TRACK_STARTED")
     m.TRACK_STARTED = _get("CELERY_TRACK_STARTED")
     m.ACKS_LATE = _get("CELERY_ACKS_LATE")
     m.ACKS_LATE = _get("CELERY_ACKS_LATE")
+    m.REDIRECT_STDOUTS = _get("CELERY_REDIRECT_STDOUTS")
+    m.REDIRECT_STDOUTS_LEVEL = _get("CELERY_REDIRECT_STDOUTS_LEVEL")
 
 
     # Make sure TASK_RESULT_EXPIRES is a timedelta.
     # Make sure TASK_RESULT_EXPIRES is a timedelta.
     if isinstance(m.TASK_RESULT_EXPIRES, int):
     if isinstance(m.TASK_RESULT_EXPIRES, int):

+ 3 - 1
celery/log.py

@@ -11,7 +11,7 @@ from multiprocessing import util as mputil
 
 
 from celery import conf
 from celery import conf
 from celery import signals
 from celery import signals
-from celery.utils import noop
+from celery.utils import noop, LOG_LEVELS
 from celery.utils.compat import LoggerAdapter
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger
 from celery.utils.patch import ensure_process_aware_logger
 from celery.utils.term import colored
 from celery.utils.term import colored
@@ -196,6 +196,8 @@ class LoggingProxy(object):
     def __init__(self, logger, loglevel=None):
     def __init__(self, logger, loglevel=None):
         self.logger = logger
         self.logger = logger
         self.loglevel = loglevel or self.logger.level or self.loglevel
         self.loglevel = loglevel or self.logger.level or self.loglevel
+        if not isinstance(self.loglevel, int):
+            self.loglevel = LOG_LEVELS[self.loglevel.upper()]
         self._safewrap_handlers()
         self._safewrap_handlers()
 
 
     def _safewrap_handlers(self):
     def _safewrap_handlers(self):

+ 23 - 1
docs/configuration.rst

@@ -998,7 +998,7 @@ The default is :const:`None` (``stderr``)
 CELERYD_LOG_LEVEL
 CELERYD_LOG_LEVEL
 ~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~
 
 
-Worker log level, can be any of :const:`DEBUG`, :const:`INFO`, :const:`WARNING`,
+Worker log level, can be one of :const:`DEBUG`, :const:`INFO`, :const:`WARNING`,
 :const:`ERROR` or :const:`CRITICAL`.
 :const:`ERROR` or :const:`CRITICAL`.
 
 
 Can also be set via the :option:`--loglevel` argument to
 Can also be set via the :option:`--loglevel` argument to
@@ -1034,6 +1034,28 @@ Default is::
 See the Python :mod:`logging` module for more information about log
 See the Python :mod:`logging` module for more information about log
 formats.
 formats.
 
 
+.. setting:: CELERY_REDIRECT_STDOUTS
+
+CELERY_REDIRECT_STDOUTS
+~~~~~~~~~~~~~~~~~~~~~~~
+
+If enabled ``stdout`` and ``stderr`` will be redirected
+to the current logger.
+
+Enabled by default.
+Used by :program:`celeryd` and :program:`celerybeat`.
+
+.. setting:: CELERY_REDIRECT_STDOUTS_LEVEL
+
+CELERY_REDIRECT_STDOUTS_LEVEL
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The loglevel output to ``stdout`` and ``stderr`` is logged as.
+Can be one of :const:`DEBUG`, :const:`INFO`, :const:`WARNING`,
+:const:`ERROR` or :const:`CRITICAL`.
+
+Default is :const:`WARNING`.
+
 .. _conf-custom-components:
 .. _conf-custom-components:
 
 
 Custom Component Classes (advanced)
 Custom Component Classes (advanced)