Browse Source

Use custom logger for tasks, that supports task magic kwargs in formats. The default format for tasks (CELERYD_TASK_LOG_FORMAT) now includes task_id and
task_name so the origin of task log messages can easily be traced.

Ask Solem 15 năm trước cách đây
mục cha
commit
9a7d18570a
3 tập tin đã thay đổi với 43 bổ sung8 xóa
  1. 4 0
      celery/conf.py
  2. 34 4
      celery/log.py
  3. 5 4
      celery/task/base.py

+ 4 - 0
celery/conf.py

@@ -6,6 +6,8 @@ from celery.loaders import load_settings
 
 DEFAULT_P_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
+DEFAULT_TASK_LOG_FMT = ('[%(asctime)s: %(levelname)s/%(processName)s] '
+                        '[%(task_name)s(%(task_id)s)] %(message)s')
 
 LOG_LEVELS = dict(logging._levelNames)
 LOG_LEVELS["FATAL"] = logging.FATAL
@@ -33,6 +35,7 @@ _DEFAULTS = {
     "CELERYD_CONCURRENCY": 0, # defaults to cpu count
     "CELERYD_PREFETCH_MULTIPLIER": 4,
     "CELERYD_LOG_FORMAT": DEFAULT_P_LOG_FMT,
+    "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
     "CELERYD_LOG_LEVEL": "WARN",
     "CELERYD_LOG_FILE": None, # stderr
     "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
@@ -99,6 +102,7 @@ CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
                                      compat=["SEND_CELERY_TASK_ERROR_EMAILS"])
 CELERYD_LOG_FORMAT = _get("CELERYD_LOG_FORMAT",
                           compat=["CELERYD_DAEMON_LOG_FORMAT"])
+CELERYD_TASK_LOG_FORMAT = _get("CELERYD_TASK_LOG_FORMAT")
 CELERYD_LOG_FILE = _get("CELERYD_LOG_FILE")
 CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
                         compat=["CELERYD_DAEMON_LOG_LEVEL"])

+ 34 - 4
celery/log.py

@@ -13,6 +13,14 @@ _hijacked = False
 _monkeypatched = False
 
 
+def get_task_logger(loglevel=None):
+    ensure_process_aware_logger()
+    logger = logging.getLogger("celery.Task")
+    if loglevel is not None:
+        logger.setLevel(loglevel)
+    return logger
+
+
 def _hijack_multiprocessing_logger():
     from multiprocessing import util as mputil
     global _hijacked
@@ -54,21 +62,43 @@ def get_default_logger(loglevel=None):
     return logger
 
 
+
 def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
         format=conf.CELERYD_LOG_FORMAT, **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
+    then ``stderr`` is used.
+
+    Returns logger object.
+
+    """
+    return _setup_logger(get_default_logger(loglevel),
+                         logfile, format, **kwargs)
+
+
+def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
+        format=conf.CELERYD_TASK_LOG_FORMAT, task_kwargs=None, **kwargs):
+    """Setup the task logger. If ``logfile`` is not specified, then
     ``stderr`` is used.
 
     Returns logger object.
 
     """
-    logger = get_default_logger(loglevel=loglevel)
+    if task_kwargs is None:
+        task_kwargs = {}
+    task_kwargs.setdefault("task_id", "-?-")
+    task_kwargs.setdefault("task_name", "-?-")
+    logger = _setup_logger(get_task_logger(loglevel),
+                           logfile, format, **kwargs)
+    return logging.LoggerAdapter(logger, task_kwargs)
+
+
+def _setup_logger(logger, logfile, format,
+        formatter=logging.Formatter, **kwargs):
+
     if logger.handlers: # Logger already configured
         return logger
-
     handler = _detect_handler(logfile)
-    formatter = logging.Formatter(format)
-    handler.setFormatter(formatter)
+    handler.setFormatter(formatter(format))
     logger.addHandler(handler)
     return logger
 

+ 5 - 4
celery/task/base.py

@@ -6,7 +6,7 @@ from Queue import Queue
 from billiard.serialization import pickle
 
 from celery import conf
-from celery.log import setup_logger
+from celery.log import setup_task_logger
 from celery.utils import gen_unique_id, padlist, timedelta_seconds
 from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
 from celery.execute import apply_async, apply
@@ -208,12 +208,13 @@ class Task(object):
 
     @classmethod
     def get_logger(self, loglevel=None, logfile=None, **kwargs):
-        """Get process-aware logger object.
+        """Get task-aware logger object.
 
-        See :func:`celery.log.setup_logger`.
+        See :func:`celery.log.setup_task_logger`.
 
         """
-        return setup_logger(loglevel=loglevel, logfile=logfile)
+        return setup_task_logger(loglevel=loglevel, logfile=logfile,
+                                 task_kwargs=kwargs)
 
     @classmethod
     def establish_connection(self,