Ver Fonte

Massive logging improvements

To conform with best practices:

- Classes used by the worker no longer uses app.get_default_logger, but uses
  `celery.utils.log.get_logger` which simply gets the logger not setting the
  level, and adds a NullHandler.

- Loggers are no longer passed around, instead every module using logging
  defines a module global logger that is used throughout.

- All loggers inherit from a common logger called "celery".

- Before task.get_logger would setup a new logger for every task,
  and even set the loglevel.  This is no longer the case.

  - Instead all task loggers now inherit from a common "celery.task" logger
  that is set up when programs call `setup_logging_subsystem`.

  - Instead of using LoggerAdapter to augment the formatter with
    the task_id and task_name field, the task base logger now use
    a special formatter adding these values at runtime from the
    currently executing task.

- Redirected output from stdout/stderr is now logged to a "celery.redirected"
  logger.

- In addition a few warnings.warn have been replaced with logger.warn.
Ask Solem há 13 anos atrás
pai
commit
0406058b27

+ 0 - 2
celery/__compat__.py

@@ -54,8 +54,6 @@ COMPAT_MODULES = {
         "log": {
             "get_default_logger": "log.get_default_logger",
             "setup_logger": "log.setup_logger",
-            "setup_task_logger": "log.setup_task_logger",
-            "get_task_logger": "log.get_task_logger",
             "setup_loggig_subsystem": "log.setup_logging_subsystem",
             "redirect_stdouts_to_logger": "log.redirect_stdouts_to_logger",
         },

+ 82 - 97
celery/app/log.py

@@ -4,20 +4,37 @@ import logging
 import os
 import sys
 
+from kombu.log import NullHandler
+
 from celery import signals
 from celery.utils import isatty
-from celery.utils.compat import LoggerAdapter, WatchedFileHandler
+from celery.utils.compat import WatchedFileHandler
 from celery.utils.log import (
-    get_logger, NullHandler,
+    get_logger, mlevel,
     ColorFormatter, ensure_process_aware_logger,
     LoggingProxy, get_multiprocessing_logger,
-    reset_multiprocessing_logger, mlevel
+    reset_multiprocessing_logger,
 )
 from celery.utils.term import colored
 
+from .state import get_current_task
+
 is_py3k = sys.version_info[0] == 3
 
 
+class TaskFormatter(ColorFormatter):
+
+    def format(self, record):
+        task = get_current_task()
+        if task:
+            record.__dict__.update(task_id=task.request.id,
+                                   task_name=task.name)
+        else:
+            record.__dict__.setdefault("task_name", "???")
+            record.__dict__.setdefault("task_id", "???")
+        return ColorFormatter.format(self, record)
+
+
 class Logging(object):
     #: The logging subsystem is only configured once per process.
     #: setup_logging_subsystem sets this flag, and subsequent calls
@@ -31,29 +48,25 @@ class Logging(object):
         self.task_format = self.app.conf.CELERYD_TASK_LOG_FORMAT
         self.colorize = self.app.conf.CELERYD_LOG_COLOR
 
-    def supports_color(self, logfile=None):
-        if self.app.IS_WINDOWS:
-            # Windows does not support ANSI color codes.
-            return False
-        if self.colorize is None:
-            # Only use color if there is no active log file
-            # and stderr is an actual terminal.
-            return logfile is None and isatty(sys.stderr)
-        return self.colorize
-
-    def colored(self, logfile=None):
-        return colored(enabled=self.supports_color(logfile))
-
-    def get_task_logger(self, loglevel=None, name=None):
-        logger = get_logger(name or "celery.task.default")
-        if loglevel is not None:
-            logger.setLevel(mlevel(loglevel))
-        return logger
+    def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
+            redirect_level="WARNING"):
+        handled = self.setup_logging_subsystem(loglevel, logfile)
+        if not handled:
+            logger = get_logger("celery.redirected")
+            if redirect_stdouts:
+                self.redirect_stdouts_to_logger(logger,
+                                loglevel=redirect_level)
+        os.environ.update(
+            CELERY_LOG_LEVEL=str(loglevel) if loglevel else "",
+            CELERY_LOG_FILE=str(logfile) if logfile else "",
+            CELERY_LOG_REDIRECT="1" if redirect_stdouts else "",
+            CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))
 
     def setup_logging_subsystem(self, loglevel=None, logfile=None,
             format=None, colorize=None, **kwargs):
         if Logging._setup:
             return
+        Logging._setup = True
         loglevel = mlevel(loglevel or self.loglevel)
         format = format or self.format
         if colorize is None:
@@ -71,81 +84,26 @@ class Logging(object):
                 root.handlers = []
 
             for logger in filter(None, (root, get_multiprocessing_logger())):
-                self._setup_logger(logger, logfile, format, colorize, **kwargs)
+                self.setup_handlers(logger, logfile, format,
+                                    colorize, **kwargs)
                 if loglevel:
                     logger.setLevel(loglevel)
                 signals.after_setup_logger.send(sender=None, logger=logger,
                                             loglevel=loglevel, logfile=logfile,
                                             format=format, colorize=colorize)
+            # then setup the root task logger.
+            self.setup_task_loggers(loglevel, logfile, colorize=colorize)
 
         # This is a hack for multiprocessing's fork+exec, so that
         # logging before Process.run works.
+        logfile_name = logfile if isinstance(logfile, basestring) else ""
         os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
-                          _MP_FORK_LOGFILE_=logfile or "",
+                          _MP_FORK_LOGFILE_=logfile_name,
                           _MP_FORK_LOGFORMAT_=format)
-        Logging._setup = True
-
         return receivers
 
-    def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
-            redirect_level="WARNING"):
-        handled = self.setup_logging_subsystem(loglevel=loglevel,
-                                               logfile=logfile)
-        if not handled:
-            logger = self.get_default_logger()
-            if redirect_stdouts:
-                self.redirect_stdouts_to_logger(logger,
-                                loglevel=redirect_level)
-        os.environ.update(
-            CELERY_LOG_LEVEL=str(loglevel) if loglevel else "",
-            CELERY_LOG_FILE=str(logfile) if logfile else "",
-            CELERY_LOG_REDIRECT="1" if redirect_stdouts else "",
-            CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))
-
-    def _detect_handler(self, logfile=None):
-        """Create log handler with either a filename, an open stream
-        or :const:`None` (stderr)."""
-        logfile = sys.__stderr__ if logfile is None else logfile
-        if hasattr(logfile, "write"):
-            return logging.StreamHandler(logfile)
-        return WatchedFileHandler(logfile)
-
-    def get_default_logger(self, loglevel=None, name="celery"):
-        """Get default logger instance.
-
-        :keyword loglevel: Initial log level.
-
-        """
-        logger = get_logger(name)
-        if loglevel is not None:
-            logger.setLevel(mlevel(loglevel))
-        return logger
-
-    def setup_logger(self, loglevel=None, logfile=None,
-            format=None, colorize=None, name="celery", root=True,
-            app=None, **kwargs):
-        """Setup the :mod:`multiprocessing` logger.
-
-        If `logfile` is not specified, then `sys.stderr` is used.
-
-        Returns logger object.
-
-        """
-        loglevel = mlevel(loglevel or self.loglevel)
-        format = format or self.format
-        if colorize is None:
-            colorize = self.supports_color(logfile)
-
-        if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
-            return self._setup_logger(self.get_default_logger(loglevel, name),
-                                      logfile, format, colorize, **kwargs)
-        self.setup_logging_subsystem(loglevel, logfile,
-                                     format, colorize, **kwargs)
-        return self.get_default_logger(name=name)
-
-    def setup_task_logger(self, loglevel=None, logfile=None, format=None,
-            colorize=None, task_name=None, task_id=None, propagate=False,
-            app=None, **kwargs):
+    def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
+            colorize=None, propagate=False, **kwargs):
         """Setup the task logger.
 
         If `logfile` is not specified, then `sys.stderr` is used.
@@ -158,23 +116,16 @@ class Logging(object):
         if colorize is None:
             colorize = self.supports_color(logfile)
 
-        logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
-                                    logfile, format, colorize, **kwargs)
+        logger = self.setup_handlers(get_logger("celery.task"),
+                                     logfile, format, colorize,
+                                     formatter=TaskFormatter, **kwargs)
+        logger.setLevel(loglevel)
         logger.propagate = int(propagate)    # this is an int for some reason.
                                              # better to not question why.
         signals.after_setup_task_logger.send(sender=None, logger=logger,
                                      loglevel=loglevel, logfile=logfile,
                                      format=format, colorize=colorize)
-        return LoggerAdapter(logger, {"task_id": task_id,
-                                      "task_name": task_name})
-
-    def _has_handler(self, logger):
-        return (logger.handlers and
-                    not isinstance(logger.handlers[0], NullHandler))
-
-    def _is_configured(self, logger):
-        return self._has_handler(logger) and not getattr(
-                logger, "_rudimentary_setup", False)
+        return logger
 
     def redirect_stdouts_to_logger(self, logger, loglevel=None,
             stdout=True, stderr=True):
@@ -192,7 +143,20 @@ class Logging(object):
             sys.stderr = proxy
         return proxy
 
-    def _setup_logger(self, logger, logfile, format, colorize,
+    def supports_color(self, logfile=None):
+        if self.app.IS_WINDOWS:
+            # Windows does not support ANSI color codes.
+            return False
+        if self.colorize is None:
+            # Only use color if there is no active log file
+            # and stderr is an actual terminal.
+            return logfile is None and isatty(sys.stderr)
+        return self.colorize
+
+    def colored(self, logfile=None):
+        return colored(enabled=self.supports_color(logfile))
+
+    def setup_handlers(self, logger, logfile, format, colorize,
             formatter=ColorFormatter, **kwargs):
         if self._is_configured(logger):
             return logger
@@ -201,3 +165,24 @@ class Logging(object):
         handler.setFormatter(formatter(format, use_color=colorize))
         logger.addHandler(handler)
         return logger
+
+    def _detect_handler(self, logfile=None):
+        """Create log handler with either a filename, an open stream
+        or :const:`None` (stderr)."""
+        logfile = sys.__stderr__ if logfile is None else logfile
+        if hasattr(logfile, "write"):
+            return logging.StreamHandler(logfile)
+        return WatchedFileHandler(logfile)
+
+    def _has_handler(self, logger):
+        return (logger.handlers and
+                    not isinstance(logger.handlers[0], NullHandler))
+
+    def _is_configured(self, logger):
+        return self._has_handler(logger) and not getattr(
+                logger, "_rudimentary_setup", False)
+
+    def setup_logger(self, name="celery", *args, **kwargs):
+        """Deprecated: No longer used."""
+        self.setup_logging_subsystem(*args, **kwargs)
+        return logging.root

+ 13 - 7
celery/app/task.py

@@ -12,9 +12,12 @@
 
 from __future__ import absolute_import
 
+import logging
 import sys
 import threading
 
+from kombu.utils import cached_property
+
 from celery import current_app
 from celery import states
 from celery.datastructures import ExceptionInfo
@@ -23,6 +26,7 @@ from celery.result import EagerResult
 from celery.utils import fun_takes_kwargs, uuid, maybe_reraise
 from celery.utils.functional import mattrgetter, maybe_list
 from celery.utils.imports import instantiate
+from celery.utils.log import get_logger
 from celery.utils.mail import ErrorMail
 
 from .annotations import resolve_all as resolve_all_annotations
@@ -349,13 +353,12 @@ class BaseTask(object):
     def start_strategy(self, app, consumer):
         return instantiate(self.Strategy, self, app, consumer)
 
-    def get_logger(self, loglevel=None, logfile=None, propagate=False,
-            **kwargs):
+    def get_logger(self, **kwargs):
         """Get task-aware logger object."""
-        return self._get_app().log.setup_task_logger(
-            loglevel=self.request.loglevel if loglevel is None else loglevel,
-            logfile=self.request.logfile if logfile is None else logfile,
-            propagate=propagate, task_name=self.name, task_id=self.request.id)
+        logger = get_logger(self.name)
+        if logger.parent is logging.root:
+            logger.parent = get_logger("celery.task")
+        return logger
 
     def establish_connection(self, connect_timeout=None):
         """Establish a connection to the message broker."""
@@ -706,7 +709,6 @@ class BaseTask(object):
         this task, wrapping arguments and execution options
         for a single task invocation."""
         from celery.canvas import subtask
-        print("SUBTASK: %r" % (subtask, ))
         return subtask(self, *args, **kwargs)
 
     def s(self, *args, **kwargs):
@@ -821,6 +823,10 @@ class BaseTask(object):
         """`repr(task)`"""
         return "<@task: %s>" % (self.name, )
 
+    @cached_property
+    def logger(self):
+        return self.get_logger()
+
     @property
     def __name__(self):
         return self.__class__.__name__

+ 6 - 7
celery/apps/beat.py

@@ -9,7 +9,7 @@ from celery import __version__, platforms, beat
 from celery.app import app_or_default
 from celery.app.abstract import configurated, from_config
 from celery.utils.imports import qualname
-from celery.utils.log import LOG_LEVELS
+from celery.utils.log import LOG_LEVELS, get_logger
 from celery.utils.timeutils import humanize_seconds
 
 STARTUP_INFO_FMT = """
@@ -22,6 +22,8 @@ Configuration ->
     . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
 """.strip()
 
+logger = get_logger("celery.beat")
+
 
 class Beat(configurated):
     Service = beat.Service
@@ -49,29 +51,26 @@ class Beat(configurated):
             self.loglevel = LOG_LEVELS[self.loglevel.upper()]
 
     def run(self):
-        logger = self.setup_logging()
+        self.setup_logging()
         print(str(self.colored.cyan(
                     "celerybeat v%s is starting." % __version__)))
         self.init_loader()
         self.set_process_title()
-        self.start_scheduler(logger)
+        self.start_scheduler()
 
     def setup_logging(self):
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
                                                        logfile=self.logfile)
-        logger = self.app.log.get_default_logger(name="celery.beat")
         if self.redirect_stdouts and not handled:
             self.app.log.redirect_stdouts_to_logger(logger,
                     loglevel=self.redirect_stdouts_level)
-        return logger
 
-    def start_scheduler(self, logger=None):
+    def start_scheduler(self):
         c = self.colored
         if self.pidfile:
             pidlock = platforms.create_pidlock(self.pidfile).acquire()
             atexit.register(pidlock.release)
         beat = self.Service(app=self.app,
-                            logger=logger,
                             max_interval=self.max_interval,
                             scheduler_cls=self.scheduler_cls,
                             schedule_filename=self.schedule)

+ 6 - 4
celery/apps/worker.py

@@ -18,7 +18,7 @@ from celery.app.abstract import configurated, from_config
 from celery.exceptions import ImproperlyConfigured, SystemTerminate
 from celery.utils import cry, isatty
 from celery.utils.imports import qualname
-from celery.utils.log import LOG_LEVELS, mlevel
+from celery.utils.log import LOG_LEVELS, get_logger, mlevel
 from celery.utils.text import pluralize
 from celery.worker import WorkController
 
@@ -28,6 +28,8 @@ try:
 except ImportError:
     IGNORE_ERRORS = ()
 
+logger = get_logger(__name__)
+
 
 BANNER = """
  -------------- celery@%(hostname)s v%(version)s
@@ -249,7 +251,7 @@ class Worker(configurated):
         install_worker_term_handler(worker)
         install_worker_term_hard_handler(worker)
         install_worker_int_handler(worker)
-        install_cry_handler(worker.logger)
+        install_cry_handler()
         install_rdb_handler()
 
     def osx_proxy_detection_workaround(self):
@@ -329,7 +331,7 @@ def install_worker_restart_handler(worker):
     platforms.signals["SIGHUP"] = restart_worker_sig_handler
 
 
-def install_cry_handler(logger):
+def install_cry_handler():
     # Jython/PyPy does not have sys._current_frames
     is_jython = sys.platform.startswith("java")
     is_pypy = hasattr(sys, "pypy_version_info")
@@ -356,7 +358,7 @@ def install_rdb_handler(envvar="CELERY_RDBSIG"):  # pragma: no cover
 def install_HUP_not_supported_handler(worker):
 
     def warn_on_HUP_handler(signum, frame):
-        worker.logger.error("SIGHUP not supported: "
+        logger.error("SIGHUP not supported: "
             "Restarting with HUP is unstable on this platform!")
 
     platforms.signals["SIGHUP"] = warn_on_HUP_handler

+ 4 - 3
celery/backends/cassandra.py

@@ -14,10 +14,13 @@ import time
 
 from celery import states
 from celery.exceptions import ImproperlyConfigured
+from celery.utils.log import get_logger
 from celery.utils.timeutils import maybe_timedelta, timedelta_seconds
 
 from .base import BaseDictBackend
 
+logger = get_logger(__name__)
+
 
 class CassandraBackend(BaseDictBackend):
     """Highly fault tolerant Cassandra backend.
@@ -46,8 +49,6 @@ class CassandraBackend(BaseDictBackend):
 
         """
         super(CassandraBackend, self).__init__(**kwargs)
-        self.logger = self.app.log.setup_logger(
-                            name="celery.backends.cassandra")
 
         self.expires = kwargs.get("expires") or maybe_timedelta(
                                     self.app.conf.CELERY_TASK_RESULT_EXPIRES)
@@ -104,7 +105,7 @@ class CassandraBackend(BaseDictBackend):
                     Thrift.TException), exc:
                 if time.time() > ts:
                     raise
-                self.logger.warn('Cassandra error: %r. Retrying...', exc)
+                logger.warn('Cassandra error: %r. Retrying...', exc)
                 time.sleep(self._retry_wait)
 
     def _get_column_family(self):

+ 22 - 27
celery/beat.py

@@ -35,6 +35,10 @@ from .schedules import maybe_schedule, crontab
 from .utils import cached_property
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
+from .utils.log import get_logger
+
+logger = get_logger(__name__)
+debug, info, error = logger.debug, logger.info, logger.error
 
 
 class SchedulingError(Exception):
@@ -127,7 +131,6 @@ class Scheduler(object):
     """Scheduler for periodic tasks.
 
     :keyword schedule: see :attr:`schedule`.
-    :keyword logger: see :attr:`logger`.
     :keyword max_interval: see :attr:`max_interval`.
 
     """
@@ -137,9 +140,6 @@ class Scheduler(object):
     #: The schedule dict/shelve.
     schedule = None
 
-    #: Current logger.
-    logger = None
-
     #: Maximum time to sleep between re-checking the schedule.
     max_interval = 1
 
@@ -148,11 +148,10 @@ class Scheduler(object):
 
     _last_sync = None
 
-    def __init__(self, schedule=None, logger=None, max_interval=None,
+    def __init__(self, schedule=None, max_interval=None,
             app=None, Publisher=None, lazy=False, **kwargs):
         app = self.app = app_or_default(app)
         self.data = maybe_promise({} if schedule is None else schedule)
-        self.logger = logger or app.log.get_default_logger(name="celery.beat")
         self.max_interval = max_interval or \
                                 app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.Publisher = Publisher or app.amqp.TaskPublisher
@@ -173,15 +172,14 @@ class Scheduler(object):
         is_due, next_time_to_run = entry.is_due()
 
         if is_due:
-            self.logger.info("Scheduler: Sending due task %s", entry.task)
+            info("Scheduler: Sending due task %s", entry.task)
             try:
                 result = self.apply_async(entry, publisher=publisher)
             except Exception, exc:
-                self.logger.error("Message Error: %s\n%s", exc,
-                                  traceback.format_stack(),
-                                  exc_info=True)
+                error("Message Error: %s\n%s",
+                      exc, traceback.format_stack(), exc_info=True)
             else:
-                self.logger.debug("%s sent. id->%s", entry.task, result.id)
+                debug("%s sent. id->%s", entry.task, result.id)
         return next_time_to_run
 
     def tick(self):
@@ -242,7 +240,7 @@ class Scheduler(object):
 
     def _do_sync(self):
         try:
-            self.logger.debug("Celerybeat: Synchronizing schedule...")
+            debug("Celerybeat: Synchronizing schedule...")
             self.sync()
         finally:
             self._last_sync = time.time()
@@ -293,8 +291,8 @@ class Scheduler(object):
         # callback called for each retry while the connection
         # can't be established.
         def _error_handler(exc, interval):
-            self.logger.error("Celerybeat: Connection error: %s. "
-                              "Trying again in %s seconds...", exc, interval)
+            error("Celerybeat: Connection error: %s. "
+                  "Trying again in %s seconds...", exc, interval)
 
         return self.connection.ensure_connection(_error_handler,
                     self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
@@ -339,8 +337,8 @@ class PersistentScheduler(Scheduler):
                                                 writeback=True)
             entries = self._store.setdefault("entries", {})
         except Exception, exc:
-            self.logger.error("Removing corrupted schedule file %r: %r",
-                              self.schedule_filename, exc, exc_info=True)
+            error("Removing corrupted schedule file %r: %r",
+                  self.schedule_filename, exc, exc_info=True)
             self._remove_db()
             self._store = self.persistence.open(self.schedule_filename,
                                                 writeback=True)
@@ -352,8 +350,7 @@ class PersistentScheduler(Scheduler):
         self.install_default_entries(self.schedule)
         self._store["__version__"] = __version__
         self.sync()
-        self.logger.debug("Current schedule:\n" +
-                          "\n".join(repr(entry)
+        debug("Current schedule:\n" + "\n".join(repr(entry)
                                     for entry in entries.itervalues()))
 
     def get_schedule(self):
@@ -375,13 +372,12 @@ class PersistentScheduler(Scheduler):
 class Service(object):
     scheduler_cls = PersistentScheduler
 
-    def __init__(self, logger=None, max_interval=None, schedule_filename=None,
+    def __init__(self, max_interval=None, schedule_filename=None,
             scheduler_cls=None, app=None):
         app = self.app = app_or_default(app)
         self.max_interval = max_interval or \
                                 app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
-        self.logger = logger or app.log.get_default_logger(name="celery.beat")
         self.schedule_filename = schedule_filename or \
                                     app.conf.CELERYBEAT_SCHEDULE_FILENAME
 
@@ -389,9 +385,9 @@ class Service(object):
         self._is_stopped = threading.Event()
 
     def start(self, embedded_process=False):
-        self.logger.info("Celerybeat: Starting...")
-        self.logger.debug("Celerybeat: Ticking with max interval->%s",
-                          humanize_seconds(self.scheduler.max_interval))
+        info("Celerybeat: Starting...")
+        debug("Celerybeat: Ticking with max interval->%s",
+              humanize_seconds(self.scheduler.max_interval))
 
         signals.beat_init.send(sender=self)
         if embedded_process:
@@ -401,8 +397,8 @@ class Service(object):
         try:
             while not self._is_shutdown.isSet():
                 interval = self.scheduler.tick()
-                self.logger.debug("Celerybeat: Waking up %s.",
-                                   humanize_seconds(interval, prefix="in "))
+                debug("Celerybeat: Waking up %s.",
+                      humanize_seconds(interval, prefix="in "))
                 time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
             self._is_shutdown.set()
@@ -414,7 +410,7 @@ class Service(object):
         self._is_stopped.set()
 
     def stop(self, wait=False):
-        self.logger.info("Celerybeat: Shutting down...")
+        info("Celerybeat: Shutting down...")
         self._is_shutdown.set()
         wait and self._is_stopped.wait()  # block until shutdown done.
 
@@ -423,7 +419,6 @@ class Service(object):
         scheduler = instantiate(self.scheduler_cls,
                                 app=self.app,
                                 schedule_filename=filename,
-                                logger=self.logger,
                                 max_interval=self.max_interval,
                                 lazy=lazy)
         return scheduler

+ 1 - 1
celery/bin/celeryd.py

@@ -73,7 +73,7 @@
 """
 from __future__ import absolute_import
 
-if __name__ == "__main__" and globals().get("__package__") is None:
+if __name__ == "__main__" and __package__ is None:
     __package__ = "celery.bin.celeryd"
 
 import sys

+ 4 - 1
celery/bin/celeryd_detach.py

@@ -12,9 +12,12 @@ from optparse import OptionParser, BadOptionError
 
 from celery import __version__
 from celery.platforms import detached
+from celery.utils.log import get_logger
 
 from .base import daemon_options, Option
 
+logger = get_logger(__name__)
+
 OPTION_LIST = daemon_options(default_pidfile="celeryd.pid") + (
                 Option("--fake",
                        default=False, action="store_true", dest="fake",
@@ -28,7 +31,7 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
             os.execv(path, [path] + argv)
         except Exception:
             from celery import current_app
-            logger = current_app.log.setup_logger("ERROR", logfile)
+            current_app.log.setup_logging_subsystem("ERROR", logfile)
             logger.critical("Can't exec %r", " ".join([path] + argv),
                             exc_info=True)
 

+ 6 - 7
celery/concurrency/base.py

@@ -5,12 +5,12 @@ import logging
 import os
 import time
 
-from kombu.log import anon_logger
 from kombu.utils.encoding import safe_repr
 
 from celery.utils import timer2
+from celery.utils.log import get_logger
 
-_default_logger = anon_logger("celery.concurrency")
+logger = get_logger("celery.concurrency")
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
@@ -46,12 +46,11 @@ class BasePool(object):
     _state = None
     _pool = None
 
-    def __init__(self, limit=None, putlocks=True, logger=None, **options):
+    def __init__(self, limit=None, putlocks=True, **options):
         self.limit = limit
         self.putlocks = putlocks
-        self.logger = logger or _default_logger
         self.options = options
-        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
+        self._does_debug = logger.isEnabledFor(logging.DEBUG)
 
     def on_start(self):
         pass
@@ -94,8 +93,8 @@ class BasePool(object):
 
         """
         if self._does_debug:
-            self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
-                            target, safe_repr(args), safe_repr(kwargs))
+            logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
+                         target, safe_repr(args), safe_repr(kwargs))
 
         return self.on_apply(target, args, kwargs,
                              waitforslot=self.putlocks,

+ 1 - 5
celery/contrib/batches.py

@@ -46,7 +46,7 @@ from itertools import count
 from Queue import Empty, Queue
 
 from celery.task import Task
-from celery.utils import cached_property, timer2
+from celery.utils import timer2
 from celery.worker import state
 
 
@@ -192,7 +192,3 @@ class Batches(Task):
 
     def debug(self, msg):
         self.logger.debug("%s: %s", self.name, msg)
-
-    @cached_property
-    def logger(self):
-        return self.app.log.get_default_logger()

+ 10 - 14
celery/events/snapshot.py

@@ -24,9 +24,11 @@ from celery.app import app_or_default
 from celery.utils import timer2
 from celery.utils.dispatch import Signal
 from celery.utils.imports import instantiate
-from celery.utils.log import LOG_LEVELS
+from celery.utils.log import get_logger
 from celery.utils.timeutils import rate
 
+logger = get_logger("celery.evcam")
+
 
 class Polaroid(object):
     timer = timer2
@@ -38,14 +40,13 @@ class Polaroid(object):
     _ctref = None
 
     def __init__(self, state, freq=1.0, maxrate=None,
-            cleanup_freq=3600.0, logger=None, timer=None, app=None):
+            cleanup_freq=3600.0, timer=None, app=None):
         self.app = app_or_default(app)
         self.state = state
         self.freq = freq
         self.cleanup_freq = cleanup_freq
         self.timer = timer or self.timer
-        self.logger = logger or \
-                self.app.log.get_default_logger(name="celery.cam")
+        self.logger = logger
         self.maxrate = maxrate and TokenBucket(rate(maxrate))
 
     def install(self):
@@ -61,13 +62,13 @@ class Polaroid(object):
         pass
 
     def cleanup(self):
-        self.logger.debug("Cleanup: Running...")
+        logger.debug("Cleanup: Running...")
         self.cleanup_signal.send(None)
         self.on_cleanup()
 
     def shutter(self):
         if self.maxrate is None or self.maxrate.can_consume():
-            self.logger.debug("Shutter: %s", self.state)
+            logger.debug("Shutter: %s", self.state)
             self.shutter_signal.send(self.state)
             self.on_shutter(self.state)
 
@@ -97,19 +98,14 @@ def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
         pidlock = platforms.create_pidlock(pidfile).acquire()
         atexit.register(pidlock.release)
 
-    if not isinstance(loglevel, int):
-        loglevel = LOG_LEVELS[loglevel.upper()]
-    logger = app.log.setup_logger(loglevel=loglevel,
-                                  logfile=logfile,
-                                  name="celery.evcam")
+    app.log.setup_logging_subsystem(loglevel, logfile)
 
     logger.info(
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
             camera, freq))
     state = app.events.State()
-    cam = instantiate(camera, state, app=app,
-                      freq=freq, maxrate=maxrate, logger=logger,
-                      timer=timer)
+    cam = instantiate(camera, state, app=app, freq=freq,
+                      maxrate=maxrate, timer=timer)
     cam.install()
     conn = app.broker_connection()
     recv = app.events.Receiver(conn, handlers={"*": state.event})

+ 3 - 3
celery/task/http.py

@@ -131,7 +131,7 @@ class HttpDispatch(object):
     user_agent = "celery/%s" % celery_version
     timeout = 5
 
-    def __init__(self, url, method, task_kwargs, logger):
+    def __init__(self, url, method, task_kwargs, logger=None):
         self.url = url
         self.method = method
         self.task_kwargs = task_kwargs
@@ -186,12 +186,12 @@ class HttpDispatchTask(BaseTask):
 
     url = None
     method = None
+    accept_magic_kwargs = False
 
     def run(self, url=None, method="GET", **kwargs):
         url = url or self.url
         method = method or self.method
-        logger = self.get_logger(**kwargs)
-        return HttpDispatch(url, method, kwargs, logger).dispatch()
+        return HttpDispatch(url, method, kwargs, self.logger).dispatch()
 
 
 class URL(MutableURL):

+ 5 - 3
celery/task/trace.py

@@ -34,6 +34,9 @@ from celery.app.task import BaseTask
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import RetryTaskError
 from celery.utils.serialization import get_pickleable_exception
+from celery.utils.log import get_logger
+
+_logger = get_logger(__name__)
 
 send_prerun = signals.task_prerun.send
 prerun_receivers = signals.task_prerun.receivers
@@ -231,9 +234,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     except (KeyboardInterrupt, SystemExit, MemoryError):
                         raise
                     except Exception, exc:
-                        logger = current_app.log.get_default_logger()
-                        logger.error("Process cleanup failed: %r", exc,
-                                     exc_info=True)
+                        _logger.error("Process cleanup failed: %r", exc,
+                                      exc_info=True)
         except Exception, exc:
             if eager:
                 raise

+ 4 - 19
celery/tests/test_app/test_beat.py

@@ -1,8 +1,7 @@
 from __future__ import absolute_import
 
-import logging
-
 from datetime import datetime, timedelta
+from mock import patch
 from nose import SkipTest
 
 from celery import beat
@@ -97,22 +96,11 @@ class test_ScheduleEntry(Case):
         self.assertDictEqual(entry.options, {"routing_key": "urgent"})
 
 
-class MockLogger(logging.Logger):
-
-    def __init__(self, *args, **kwargs):
-        self.logged = []
-        logging.Logger.__init__(self, *args, **kwargs)
-
-    def _log(self, level, msg, args, **kwargs):
-        self.logged.append((level, msg, args, kwargs))
-
-
 class mScheduler(beat.Scheduler):
 
     def __init__(self, *args, **kwargs):
         self.sent = []
         beat.Scheduler.__init__(self, *args, **kwargs)
-        self.logger = MockLogger("celery.beat", logging.ERROR)
 
     def send_task(self, name=None, args=None, kwargs=None, **options):
         self.sent.append({"name": name,
@@ -183,16 +171,13 @@ class test_Scheduler(Case):
                       kwargs={"foo": "bar"})
         self.assertEqual(scheduler.tick(), 1)
 
-    def test_due_tick_SchedulingError(self):
+    @patch("celery.beat.error")
+    def test_due_tick_SchedulingError(self, error):
         scheduler = mSchedulerSchedulingError()
         scheduler.add(name="test_due_tick_SchedulingError",
                       schedule=always_due)
         self.assertEqual(scheduler.tick(), 1)
-        self.assertTrue(scheduler.logger.logged[0])
-        level, msg, args, kwargs = scheduler.logger.logged[0]
-        self.assertEqual(level, logging.ERROR)
-        self.assertIn("Couldn't apply scheduled task",
-                      repr(args[0].args[0]))
+        self.assertTrue(error.called)
 
     def test_due_tick_RuntimeError(self):
         scheduler = mSchedulerRuntimeError()

+ 29 - 62
celery/tests/test_app/test_log.py

@@ -6,11 +6,13 @@ import logging
 from tempfile import mktemp
 
 from celery import current_app
+from celery.app.log import Logging
 from celery.utils.log import LoggingProxy
 from celery.utils import uuid
-from celery.utils.compat import _CompatLoggerAdapter
-from celery.tests.utils import (Case, override_stdouts, wrap_logger,
-                                get_handlers, set_handlers)
+from celery.utils.log import get_logger
+from celery.tests.utils import (
+    Case, override_stdouts, wrap_logger, get_handlers,
+)
 
 log = current_app.log
 
@@ -19,8 +21,8 @@ class test_default_logger(Case):
 
     def setUp(self):
         self.setup_logger = log.setup_logger
-        self.get_logger = log.get_default_logger
-        log._setup = False
+        self.get_logger = lambda n=None: get_logger(n) if n else logging.root
+        Logging._setup = False
 
     def test_setup_logging_subsystem_colorize(self):
         log.setup_logging_subsystem(colorize=None)
@@ -51,29 +53,28 @@ class test_default_logger(Case):
     def test_setup_logger(self):
         logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,
                                    root=False, colorize=True)
-        set_handlers(logger, [])
+        logger.handlers = []
+        Logging._setup = False
         logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,
                                    root=False, colorize=None)
+        print(logger.handlers)
         self.assertIs(get_handlers(logger)[0].stream, sys.__stderr__,
                 "setup_logger logs to stderr without logfile argument.")
-        self.assertDidLogFalse(logger, "Logging something",
-                "Logger doesn't info when loglevel is ERROR",
-                loglevel=logging.INFO)
 
     def test_setup_logger_no_handlers_stream(self):
         l = self.get_logger()
-        set_handlers(l, [])
+        l.handlers = []
 
         with override_stdouts() as outs:
             stdout, stderr = outs
-            l = self.setup_logger(logfile=stderr, loglevel=logging.INFO,
-                                  root=False)
+            l = self.setup_logger(logfile=sys.stderr, loglevel=logging.INFO,
+                                root=False)
             l.info("The quick brown fox...")
             self.assertIn("The quick brown fox...", stderr.getvalue())
 
     def test_setup_logger_no_handlers_file(self):
         l = self.get_logger()
-        set_handlers(l, [])
+        l.handlers = []
         tempfile = mktemp(suffix="unittest", prefix="celery")
         l = self.setup_logger(logfile=tempfile, loglevel=0, root=False)
         self.assertIsInstance(get_handlers(l)[0],
@@ -115,17 +116,28 @@ class test_default_logger(Case):
 class test_task_logger(test_default_logger):
 
     def setUp(self):
-        logger = log.get_task_logger()
+        logger = self.logger = get_logger("celery.task")
         logger.handlers = []
         logging.root.manager.loggerDict.pop(logger.name, None)
         self.uid = uuid()
 
+        @current_app.task
+        def test_task():
+            pass
+        test_task.logger.handlers = []
+        self.task = test_task
+        from celery.app.state import _tls
+        _tls.current_task = test_task
+
+    def tearDown(self):
+        from celery.app.state import _tls
+        _tls.current_task = None
+
     def setup_logger(self, *args, **kwargs):
-        return log.setup_task_logger(*args, **dict(kwargs, task_name=self.uid,
-                                                   task_id=self.uid))
+        return log.setup_task_loggers(*args, **kwargs)
 
     def get_logger(self, *args, **kwargs):
-        return log.get_task_logger(*args, **dict(kwargs, name=self.uid))
+        return self.task.logger
 
 
 class MockLogger(logging.Logger):
@@ -140,48 +152,3 @@ class MockLogger(logging.Logger):
 
     def isEnabledFor(self, level):
         return True
-
-
-class test_CompatLoggerAdapter(Case):
-    levels = ("debug",
-              "info",
-              "warn", "warning",
-              "error",
-              "fatal", "critical")
-
-    def setUp(self):
-        self.logger, self.adapter = self.createAdapter()
-
-    def createAdapter(self, name=None, extra={"foo": "bar"}):
-        logger = MockLogger(name=name or uuid())
-        return logger, _CompatLoggerAdapter(logger, extra)
-
-    def test_levels(self):
-        for level in self.levels:
-            msg = "foo bar %s" % (level, )
-            logger, adapter = self.createAdapter()
-            getattr(adapter, level)(msg)
-            self.assertEqual(logger._records[0].msg, msg)
-
-    def test_exception(self):
-        try:
-            raise KeyError("foo")
-        except KeyError:
-            self.adapter.exception("foo bar exception")
-        self.assertEqual(self.logger._records[0].msg, "foo bar exception")
-
-    def test_setLevel(self):
-        self.adapter.setLevel(logging.INFO)
-        self.assertEqual(self.logger.level, logging.INFO)
-
-    def test_process(self):
-        msg, kwargs = self.adapter.process("foo bar baz", {"exc_info": 1})
-        self.assertDictEqual(kwargs, {"exc_info": 1,
-                                      "extra": {"foo": "bar"}})
-
-    def test_add_remove_handlers(self):
-        handler = logging.StreamHandler()
-        self.adapter.addHandler(handler)
-        self.assertIs(self.logger.handlers[0], handler)
-        self.adapter.removeHandler(handler)
-        self.assertListEqual(self.logger.handlers, [])

+ 5 - 14
celery/tests/test_bin/test_celerybeat.py

@@ -7,6 +7,7 @@ import sys
 from collections import defaultdict
 
 from kombu.tests.utils import redirect_stdouts
+from mock import patch
 
 from celery import beat
 from celery import platforms
@@ -119,21 +120,11 @@ class test_Beat(AppCase):
             sys.stdout.logger
 
     @redirect_stdouts
-    def test_logs_errors(self, stdout, stderr):
-        class MockLogger(object):
-            _critical = []
-
-            def debug(self, *args, **kwargs):
-                pass
-
-            def critical(self, msg, *args, **kwargs):
-                self._critical.append(msg)
-
-        logger = MockLogger()
+    @patch("celery.apps.beat.logger")
+    def test_logs_errors(self, logger, stdout, stderr):
         b = MockBeat3(socket_timeout=None)
-        b.start_scheduler(logger)
-
-        self.assertTrue(logger._critical)
+        b.start_scheduler()
+        self.assertTrue(logger.critical.called)
 
     @redirect_stdouts
     def test_use_pidfile(self, stdout, stderr):

+ 7 - 14
celery/tests/test_bin/test_celeryd.py

@@ -47,7 +47,7 @@ def disable_stdouts(fun):
 class _WorkController(object):
 
     def __init__(self, *args, **kwargs):
-        self.logger = current_app.log.get_default_logger()
+        pass
 
     def start(self):
         pass
@@ -331,7 +331,7 @@ class test_Worker(AppCase):
             controller.hup_not_supported_installed = True
 
         class Controller(object):
-            logger = logging.getLogger("celery.tests")
+            pass
 
         prev = cd.install_HUP_not_supported_handler
         cd.install_HUP_not_supported_handler = install_HUP_nosupport
@@ -353,7 +353,7 @@ class test_Worker(AppCase):
             restart_worker_handler_installed[0] = True
 
         class Controller(object):
-            logger = logging.getLogger("celery.tests")
+            pass
 
         prev = cd.install_worker_restart_handler
         cd.install_worker_restart_handler = install_worker_restart_handler
@@ -427,7 +427,6 @@ class test_signal_handlers(AppCase):
     class _Worker(object):
         stopped = False
         terminated = False
-        logger = current_app.log.get_default_logger()
 
         def stop(self, in_sighandler=False):
             self.stopped = True
@@ -516,22 +515,16 @@ class test_signal_handlers(AppCase):
             handlers["SIGTERM"]("SIGTERM", object())
         self.assertTrue(worker.stopped)
 
-    def test_worker_cry_handler(self):
+    @patch("celery.apps.worker.logger")
+    def test_worker_cry_handler(self, logger):
         if sys.platform.startswith("java"):
             raise SkipTest("Cry handler does not work on Jython")
         if hasattr(sys, "pypy_version_info"):
             raise SkipTest("Cry handler does not work on PyPy")
         if sys.version_info > (2, 5):
-
-            class Logger(object):
-                _errors = []
-
-                def error(self, msg, *args, **kwargs):
-                    self._errors.append(msg)
-            logger = Logger()
-            handlers = self.psig(cd.install_cry_handler, logger)
+            handlers = self.psig(cd.install_cry_handler)
             self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
-            self.assertTrue(Logger._errors)
+            self.assertTrue(logger.error.called)
         else:
             raise SkipTest("Needs Python 2.5 or later")
 

+ 0 - 2
celery/tests/test_concurrency/test_pool.py

@@ -2,7 +2,6 @@ from __future__ import absolute_import
 
 import sys
 import time
-import logging
 import itertools
 
 from nose import SkipTest
@@ -39,7 +38,6 @@ class TestTaskPool(Case):
     def test_attrs(self):
         p = self.TaskPool(2)
         self.assertEqual(p.limit, 2)
-        self.assertIsInstance(p.logger, logging.Logger)
         self.assertIsNone(p._pool)
 
     def x_apply(self):

+ 4 - 5
celery/tests/test_utils/test_timer2.py

@@ -108,17 +108,16 @@ class test_Timer(Case):
 
         self.assertEqual(t.enter_after.call_count, 2)
 
-    @redirect_stdouts
-    def test_apply_entry_error_handled(self, stdout, stderr):
+    @patch("celery.utils.timer2.logger")
+    def test_apply_entry_error_handled(self, logger):
         t = timer2.Timer()
         t.schedule.on_error = None
 
         fun = Mock()
         fun.side_effect = ValueError()
 
-        with self.assertWarns(timer2.TimedFunctionFailed):
-            t.apply_entry(fun)
-            fun.assert_called_with()
+        t.apply_entry(fun)
+        self.assertTrue(logger.error.called)
 
     @redirect_stdouts
     def test_apply_entry_error_not_handled(self, stdout, stderr):

+ 67 - 66
celery/tests/test_worker/__init__.py

@@ -102,7 +102,7 @@ class test_QoS(Case):
     class _QoS(QoS):
         def __init__(self, value):
             self.value = value
-            QoS.__init__(self, None, value, None)
+            QoS.__init__(self, None, value)
 
         def set(self, value):
             return value
@@ -153,8 +153,7 @@ class test_QoS(Case):
         self.assertEqual(qos.value, 1000)
 
     def test_exceeds_short(self):
-        qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1,
-                current_app.log.get_default_logger())
+        qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1)
         qos.update()
         self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
         qos.increment()
@@ -168,7 +167,7 @@ class test_QoS(Case):
 
     def test_consumer_increment_decrement(self):
         consumer = Mock()
-        qos = QoS(consumer, 10, current_app.log.get_default_logger())
+        qos = QoS(consumer, 10)
         qos.update()
         self.assertEqual(qos.value, 10)
         self.assertIn({"prefetch_count": 10}, consumer.qos.call_args)
@@ -188,7 +187,7 @@ class test_QoS(Case):
 
     def test_consumer_decrement_eventually(self):
         consumer = Mock()
-        qos = QoS(consumer, 10, current_app.log.get_default_logger())
+        qos = QoS(consumer, 10)
         qos.decrement_eventually()
         self.assertEqual(qos.value, 9)
         qos.value = 0
@@ -197,7 +196,7 @@ class test_QoS(Case):
 
     def test_set(self):
         consumer = Mock()
-        qos = QoS(consumer, 10, current_app.log.get_default_logger())
+        qos = QoS(consumer, 10)
         qos.set(12)
         self.assertEqual(qos.prev, 12)
         qos.set(qos.prev)
@@ -208,16 +207,14 @@ class test_Consumer(Case):
     def setUp(self):
         self.ready_queue = FastQueue()
         self.eta_schedule = Timer()
-        self.logger = current_app.log.get_default_logger()
-        self.logger.setLevel(0)
 
     def tearDown(self):
         self.eta_schedule.stop()
 
     def test_info(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
-        l.qos = QoS(l.task_consumer, 10, l.logger)
+        l.qos = QoS(l.task_consumer, 10)
         info = l.info
         self.assertEqual(info["prefetch_count"], 10)
         self.assertFalse(info["broker"])
@@ -227,13 +224,13 @@ class test_Consumer(Case):
         self.assertTrue(info["broker"])
 
     def test_start_when_closed(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                             send_events=False)
         l._state = CLOSE
         l.start()
 
     def test_connection(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
 
         l.reset_connection()
@@ -259,12 +256,12 @@ class test_Consumer(Case):
         self.assertIsNone(l.task_consumer)
 
     def test_close_connection(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
         l._state = RUN
         l.close_connection()
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
         eventer = l.event_dispatcher = Mock()
         eventer.enabled = True
@@ -274,21 +271,22 @@ class test_Consumer(Case):
         self.assertTrue(eventer.close.call_count)
         self.assertTrue(heart.closed)
 
-    def test_receive_message_unknown(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+    @patch("celery.worker.consumer.warn")
+    def test_receive_message_unknown(self, warn):
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
         backend = Mock()
         m = create_message(backend, unknown={"baz": "!!!"})
         l.event_dispatcher = Mock()
         l.pidbox_node = MockNode()
 
-        with self.assertWarnsRegex(RuntimeWarning, r'unknown message'):
-            l.receive_message(m.decode(), m)
+        l.receive_message(m.decode(), m)
+        self.assertTrue(warn.call_count)
 
     @patch("celery.utils.timer2.to_timestamp")
     def test_receive_message_eta_OverflowError(self, to_timestamp):
         to_timestamp.side_effect = OverflowError()
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False)
         m = create_message(Mock(), task=foo_task.name,
                                    args=("2, 2"),
@@ -302,9 +300,9 @@ class test_Consumer(Case):
         self.assertTrue(m.acknowledged)
         self.assertTrue(to_timestamp.call_count)
 
-    def test_receive_message_InvalidTaskError(self):
-        logger = Mock()
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
+    @patch("celery.worker.consumer.error")
+    def test_receive_message_InvalidTaskError(self, error):
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
         m = create_message(Mock(), task=foo_task.name,
                            args=(1, 2), kwargs="foobarbaz", id=1)
@@ -313,12 +311,11 @@ class test_Consumer(Case):
         l.pidbox_node = MockNode()
 
         l.receive_message(m.decode(), m)
-        self.assertIn("Received invalid task message",
-                      logger.error.call_args[0][0])
+        self.assertIn("Received invalid task message", error.call_args[0][0])
 
-    def test_on_decode_error(self):
-        logger = Mock()
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
+    @patch("celery.worker.consumer.crit")
+    def test_on_decode_error(self, crit):
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
 
         class MockMessage(Mock):
@@ -329,11 +326,10 @@ class test_Consumer(Case):
         message = MockMessage()
         l.on_decode_error(message, KeyError("foo"))
         self.assertTrue(message.ack.call_count)
-        self.assertIn("Can't decode message body",
-                      logger.critical.call_args[0][0])
+        self.assertIn("Can't decode message body", crit.call_args[0][0])
 
     def test_receieve_message(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                            send_events=False)
         m = create_message(Mock(), task=foo_task.name,
                            args=[2, 4, 8], kwargs={})
@@ -359,7 +355,7 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                 raise SyntaxError("bar")
 
-        l = MockConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MockConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False, pool=BasePool())
         l.connection_errors = (KeyError, )
         with self.assertRaises(SyntaxError):
@@ -380,7 +376,7 @@ class test_Consumer(Case):
                     raise KeyError("foo")
                 raise SyntaxError("bar")
 
-        l = MockConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MockConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False, pool=BasePool())
 
         l.channel_errors = (KeyError, )
@@ -397,12 +393,12 @@ class test_Consumer(Case):
                 self.obj.connection = None
                 raise socket.timeout(10)
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                             send_events=False)
         l.connection = Connection()
         l.task_consumer = Mock()
         l.connection.obj = l
-        l.qos = QoS(l.task_consumer, 10, l.logger)
+        l.qos = QoS(l.task_consumer, 10)
         l.consume_messages()
 
     def test_consume_messages_when_socket_error(self):
@@ -414,13 +410,13 @@ class test_Consumer(Case):
                 self.obj.connection = None
                 raise socket.error("foo")
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                             send_events=False)
         l._state = RUN
         c = l.connection = Connection()
         l.connection.obj = l
         l.task_consumer = Mock()
-        l.qos = QoS(l.task_consumer, 10, l.logger)
+        l.qos = QoS(l.task_consumer, 10)
         with self.assertRaises(socket.error):
             l.consume_messages()
 
@@ -436,12 +432,12 @@ class test_Consumer(Case):
             def drain_events(self, **kwargs):
                 self.obj.connection = None
 
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False)
         l.connection = Connection()
         l.connection.obj = l
         l.task_consumer = Mock()
-        l.qos = QoS(l.task_consumer, 10, l.logger)
+        l.qos = QoS(l.task_consumer, 10)
 
         l.consume_messages()
         l.consume_messages()
@@ -452,7 +448,7 @@ class test_Consumer(Case):
         l.task_consumer.qos.assert_called_with(prefetch_count=9)
 
     def test_maybe_conn_error(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False)
         l.connection_errors = (KeyError, )
         l.channel_errors = (SyntaxError, )
@@ -464,9 +460,9 @@ class test_Consumer(Case):
 
     def test_apply_eta_task(self):
         from celery.worker import state
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False)
-        l.qos = QoS(None, 10, l.logger)
+        l.qos = QoS(None, 10)
 
         task = object()
         qos = l.qos.value
@@ -476,14 +472,14 @@ class test_Consumer(Case):
         self.assertIs(self.ready_queue.get_nowait(), task)
 
     def test_receieve_message_eta_isoformat(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False)
         m = create_message(Mock(), task=foo_task.name,
                            eta=datetime.now().isoformat(),
                            args=[2, 4, 8], kwargs={})
 
         l.task_consumer = Mock()
-        l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
+        l.qos = QoS(l.task_consumer, l.initial_prefetch_count)
         l.event_dispatcher = Mock()
         l.enabled = False
         l.update_strategies()
@@ -500,7 +496,7 @@ class test_Consumer(Case):
         l.eta_schedule.stop()
 
     def test_on_control(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                              send_events=False)
         l.pidbox_node = Mock()
         l.reset_pidbox_node = Mock()
@@ -521,7 +517,7 @@ class test_Consumer(Case):
 
     def test_revoke(self):
         ready_queue = FastQueue()
-        l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(ready_queue, self.eta_schedule,
                            send_events=False)
         backend = Mock()
         id = uuid()
@@ -534,7 +530,7 @@ class test_Consumer(Case):
         self.assertTrue(ready_queue.empty())
 
     def test_receieve_message_not_registered(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                           send_events=False)
         backend = Mock()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
@@ -545,28 +541,29 @@ class test_Consumer(Case):
             self.ready_queue.get_nowait()
         self.assertTrue(self.eta_schedule.empty())
 
-    def test_receieve_message_ack_raises(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+    @patch("celery.worker.consumer.warn")
+    @patch("celery.worker.consumer.logger")
+    def test_receieve_message_ack_raises(self, logger, warn):
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                           send_events=False)
         backend = Mock()
         m = create_message(backend, args=[2, 4, 8], kwargs={})
 
         l.event_dispatcher = Mock()
         l.connection_errors = (socket.error, )
-        l.logger = Mock()
         m.reject = Mock()
         m.reject.side_effect = socket.error("foo")
-        with self.assertWarnsRegex(RuntimeWarning, r'unknown message'):
-            self.assertFalse(l.receive_message(m.decode(), m))
+        self.assertFalse(l.receive_message(m.decode(), m))
+        self.assertTrue(warn.call_count)
         with self.assertRaises(Empty):
             self.ready_queue.get_nowait()
         self.assertTrue(self.eta_schedule.empty())
         m.reject.assert_called_with()
-        self.assertTrue(l.logger.critical.call_count)
+        self.assertTrue(logger.critical.call_count)
 
     def test_receieve_message_eta(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
-                          send_events=False)
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
+                            send_events=False)
         l.event_dispatcher = Mock()
         l.event_dispatcher._outbound_buffer = deque()
         backend = Mock()
@@ -597,7 +594,7 @@ class test_Consumer(Case):
             self.ready_queue.get_nowait()
 
     def test_reset_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                           send_events=False)
         l.pidbox_node = Mock()
         chan = l.pidbox_node.channel = Mock()
@@ -608,7 +605,7 @@ class test_Consumer(Case):
         chan.close.assert_called_with()
 
     def test_reset_pidbox_node_green(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                           send_events=False)
         l.pool = Mock()
         l.pool.is_green = True
@@ -616,7 +613,7 @@ class test_Consumer(Case):
         l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
 
     def test__green_pidbox_node(self):
-        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule,
                           send_events=False)
         l.pidbox_node = Mock()
 
@@ -684,7 +681,7 @@ class test_Consumer(Case):
                     raise KeyError("foo")
 
         init_callback = Mock()
-        l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = _Consumer(self.ready_queue, self.eta_schedule,
                       send_events=False, init_callback=init_callback)
         l.task_consumer = Mock()
         l.broadcast_consumer = Mock()
@@ -707,7 +704,7 @@ class test_Consumer(Case):
         self.assertEqual(l.qos.prev, l.qos.value)
 
         init_callback.reset_mock()
-        l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
+        l = _Consumer(self.ready_queue, self.eta_schedule,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.task_consumer = Mock()
@@ -720,8 +717,8 @@ class test_Consumer(Case):
         self.assertTrue(l.consume_messages.call_count)
 
     def test_reset_connection_with_no_node(self):
-
-        l = MainConsumer(self.ready_queue, self.eta_schedule, self.logger)
+        l = MainConsumer(self.ready_queue, self.eta_schedule,
+                         send_events=False)
         self.assertEqual(None, l.pool)
         l.reset_connection()
 
@@ -730,11 +727,17 @@ class test_WorkController(AppCase):
 
     def setup(self):
         self.worker = self.create_worker()
+        from celery import worker
+        self._logger = worker.logger
+        self.logger = worker.logger = Mock()
+
+    def teardown(self):
+        from celery import worker
+        worker.logger = self._logger
 
     def create_worker(self, **kw):
         worker = WorkController(concurrency=1, loglevel=0, **kw)
         worker._shutdown_complete.set()
-        worker.logger = Mock()
         return worker
 
     @patch("celery.platforms.signals")
@@ -810,7 +813,6 @@ class test_WorkController(AppCase):
 
     def test_on_timer_error(self):
         worker = WorkController(concurrency=1, loglevel=0)
-        worker.logger = Mock()
 
         try:
             raise KeyError("foo")
@@ -818,15 +820,14 @@ class test_WorkController(AppCase):
             exc_info = sys.exc_info()
 
         worker.on_timer_error(exc_info)
-        msg, args = worker.logger.error.call_args[0]
+        msg, args = self.logger.error.call_args[0]
         self.assertIn("KeyError", msg % args)
 
     def test_on_timer_tick(self):
         worker = WorkController(concurrency=1, loglevel=10)
-        worker.logger = Mock()
 
         worker.on_timer_tick(30.0)
-        xargs = worker.logger.debug.call_args[0]
+        xargs = self.logger.debug.call_args[0]
         fmt, arg = xargs[0], xargs[1]
         self.assertEqual(30.0, arg)
         self.assertIn("Next eta %s secs", fmt)

+ 0 - 1
celery/tests/test_worker/test_bootsteps.py

@@ -160,7 +160,6 @@ class test_Namespace(AppCase):
         ns = self.NS(app=self.app)
         self.assertIs(ns.app, self.app)
         self.assertEqual(ns.name, "test_Namespace")
-        self.assertTrue(ns.logger)
         self.assertFalse(ns.services)
 
     def test_interface_modules(self):

+ 11 - 14
celery/tests/test_worker/test_worker_autoscale.py

@@ -1,6 +1,5 @@
 from __future__ import absolute_import
 
-import logging
 import sys
 
 from time import time
@@ -12,8 +11,6 @@ from celery.worker import state
 from celery.worker import autoscale
 from celery.tests.utils import Case, sleepdeprived
 
-logger = logging.getLogger("celery.tests.autoscale")
-
 
 class Object(object):
     pass
@@ -60,7 +57,7 @@ class test_Autoscaler(Case):
             def join(self, timeout=None):
                 self.joined = True
 
-        x = Scaler(self.pool, 10, 3, logger=logger)
+        x = Scaler(self.pool, 10, 3)
         x._is_stopped.set()
         x.stop()
         self.assertTrue(x.joined)
@@ -71,7 +68,7 @@ class test_Autoscaler(Case):
 
     @sleepdeprived(autoscale)
     def test_scale(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
+        x = autoscale.Autoscaler(self.pool, 10, 3)
         x.scale()
         self.assertEqual(x.pool.num_processes, 3)
         for i in range(20):
@@ -95,30 +92,30 @@ class test_Autoscaler(Case):
                 self.scale_called = True
                 self._is_shutdown.set()
 
-        x = Scaler(self.pool, 10, 3, logger=logger)
+        x = Scaler(self.pool, 10, 3)
         x.run()
         self.assertTrue(x._is_shutdown.isSet())
         self.assertTrue(x._is_stopped.isSet())
         self.assertTrue(x.scale_called)
 
     def test_shrink_raises_exception(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
+        x = autoscale.Autoscaler(self.pool, 10, 3)
         x.scale_up(3)
         x._last_action = time() - 10000
         x.pool.shrink_raises_exception = True
         x.scale_down(1)
 
-    def test_shrink_raises_ValueError(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
-        x.logger = Mock()
+    @patch("celery.worker.autoscale.debug")
+    def test_shrink_raises_ValueError(self, debug):
+        x = autoscale.Autoscaler(self.pool, 10, 3)
         x.scale_up(3)
         x._last_action = time() - 10000
         x.pool.shrink_raises_ValueError = True
         x.scale_down(1)
-        self.assertTrue(x.logger.debug.call_count)
+        self.assertTrue(debug.call_count)
 
     def test_update_and_force(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
+        x = autoscale.Autoscaler(self.pool, 10, 3)
         self.assertEqual(x.processes, 3)
         x.force_scale_up(5)
         self.assertEqual(x.processes, 8)
@@ -130,7 +127,7 @@ class test_Autoscaler(Case):
         self.assertEqual(x.processes, 3)
 
     def test_info(self):
-        x = autoscale.Autoscaler(self.pool, 10, 3, logger=logger)
+        x = autoscale.Autoscaler(self.pool, 10, 3)
         info = x.info()
         self.assertEqual(info['max'], 10)
         self.assertEqual(info['min'], 3)
@@ -144,7 +141,7 @@ class test_Autoscaler(Case):
             def body(self):
                 self._is_shutdown.set()
                 raise OSError("foo")
-        x = _Autoscaler(self.pool, 10, 3, logger=logger)
+        x = _Autoscaler(self.pool, 10, 3)
 
         stderr = Mock()
         p, sys.stderr = sys.stderr, stderr

+ 0 - 1
celery/tests/test_worker/test_worker_control.py

@@ -63,7 +63,6 @@ class test_ControlPanel(Case):
         self.panel = self.create_panel(consumer=Consumer())
 
     def create_state(self, **kwargs):
-        kwargs.setdefault("logger", self.app.log.get_default_logger())
         kwargs.setdefault("app", self.app)
         return AttributeDict(kwargs)
 

+ 27 - 47
celery/tests/test_worker/test_worker_job.py

@@ -11,7 +11,7 @@ from datetime import datetime, timedelta
 
 from kombu.transport.base import Message
 from kombu.utils.encoding import from_utf8, default_encode
-from mock import Mock
+from mock import Mock, patch
 from nose import SkipTest
 
 from celery import current_app
@@ -29,7 +29,7 @@ from celery.utils import uuid
 from celery.worker.job import Request, TaskRequest, execute_and_trace
 from celery.worker.state import revoked
 
-from celery.tests.utils import Case, WhateverIO, wrap_logger
+from celery.tests.utils import Case
 
 
 scratch = {"ACK": False}
@@ -109,21 +109,19 @@ class test_RetryTaskError(Case):
 
 class test_trace_task(Case):
 
-    def test_process_cleanup_fails(self):
+    @patch("celery.task.trace._logger")
+    def test_process_cleanup_fails(self, _logger):
         backend = mytask.backend
         mytask.backend = Mock()
         mytask.backend.process_cleanup = Mock(side_effect=KeyError())
         try:
-
-            logger = mytask.app.log.get_default_logger()
-            with wrap_logger(logger) as sio:
-                tid = uuid()
-                ret = jail(tid, mytask.name, [2], {})
-                self.assertEqual(ret, 4)
-                mytask.backend.store_result.assert_called_with(tid, 4,
-                                                               states.SUCCESS)
-                logs = sio.getvalue().strip()
-                self.assertIn("Process cleanup failed", logs)
+            tid = uuid()
+            ret = jail(tid, mytask.name, [2], {})
+            self.assertEqual(ret, 4)
+            mytask.backend.store_result.assert_called_with(tid, 4,
+                                                           states.SUCCESS)
+            self.assertIn("Process cleanup failed",
+                          _logger.error.call_args[0][0])
         finally:
             mytask.backend = backend
 
@@ -441,38 +439,25 @@ class test_TaskRequest(Case):
         with self.assertRaises(InvalidTaskError):
             TaskRequest.from_message(None, body)
 
-    def test_on_timeout(self):
-
-        class MockLogger(object):
-
-            def __init__(self):
-                self.warnings = []
-                self.errors = []
-
-            def warning(self, msg, *args, **kwargs):
-                self.warnings.append(msg % args)
-
-            def error(self, msg, *args, **kwargs):
-                self.errors.append(msg % args)
+    @patch("celery.worker.job.error")
+    @patch("celery.worker.job.warn")
+    def test_on_timeout(self, warn, error):
 
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-        tw.logger = MockLogger()
         tw.on_timeout(soft=True, timeout=1337)
-        self.assertIn("Soft time limit (1337s) exceeded",
-                      tw.logger.warnings[0])
+        self.assertIn("Soft time limit", warn.call_args[0][0])
         tw.on_timeout(soft=False, timeout=1337)
-        self.assertIn("Hard time limit (1337s) exceeded", tw.logger.errors[0])
+        self.assertIn("Hard time limit", error.call_args[0][0])
         self.assertEqual(mytask.backend.get_status(tw.id),
                          states.FAILURE)
 
         mytask.ignore_result = True
         try:
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-            tw.logger = MockLogger()
-        finally:
             tw.on_timeout(soft=True, timeout=1336)
             self.assertEqual(mytask.backend.get_status(tw.id),
                              states.PENDING)
+        finally:
             mytask.ignore_result = False
 
     def test_execute_and_trace(self):
@@ -559,7 +544,6 @@ class test_TaskRequest(Case):
         if sys.version_info < (2, 6):
             self.assertEqual(tw.kwargs.keys()[0], us)
             self.assertIsInstance(tw.kwargs.keys()[0], str)
-        self.assertTrue(tw.logger)
 
     def test_from_message_empty_args(self):
         body = {"task": mytask.name, "id": uuid()}
@@ -672,7 +656,8 @@ class test_TaskRequest(Case):
                     "delivery_info": {"exchange": None, "routing_key": None},
                     "task_name": tw.name})
 
-    def _test_on_failure(self, exception):
+    @patch("celery.worker.job.logger")
+    def _test_on_failure(self, exception, logger):
         app = app_or_default()
         tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
@@ -680,20 +665,15 @@ class test_TaskRequest(Case):
             raise exception
         except Exception:
             exc_info = ExceptionInfo(sys.exc_info())
-
-            logfh = WhateverIO()
-            tw.logger.handlers = []
-            tw.logger = app.log.setup_logger("INFO", logfh, root=False)
-
             app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
-
-            tw.on_failure(exc_info)
-            logvalue = logfh.getvalue()
-            self.assertIn(mytask.name, logvalue)
-            self.assertIn(tid, logvalue)
-            self.assertIn("ERROR", logvalue)
-
-            app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
+            try:
+                tw.on_failure(exc_info)
+                self.assertTrue(logger.log.called)
+                context = logger.log.call_args[0][2]
+                self.assertEqual(mytask.name, context["name"])
+                self.assertIn(tid, context["id"])
+            finally:
+                app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
 
     def test_on_failure(self):
         self._test_on_failure(Exception("Inside unit tests"))

+ 5 - 17
celery/tests/utils.py

@@ -25,11 +25,11 @@ from contextlib import contextmanager
 
 import mock
 from nose import SkipTest
+from kombu.log import NullHandler
 
 from ..app import app_or_default
-from ..utils.compat import WhateverIO, LoggerAdapter
+from ..utils.compat import WhateverIO
 from ..utils.functional import noop
-from ..utils.log import NullHandler
 
 from .compat import catch_warnings
 
@@ -208,20 +208,8 @@ class AppCase(Case):
         pass
 
 
-def filter_NullHandler(handlers):
-    return [h for h in handlers if not isinstance(h, NullHandler)]
-
-
 def get_handlers(logger):
-    if isinstance(logger, LoggerAdapter):
-        return filter_NullHandler(logger.logger.handlers)
-    return filter_NullHandler(logger.handlers)
-
-
-def set_handlers(logger, new_handlers):
-    if isinstance(logger, LoggerAdapter):
-        logger.logger.handlers = new_handlers
-    logger.handlers = new_handlers
+    return [h for h in logger.handlers if not isinstance(h, NullHandler)]
 
 
 @contextmanager
@@ -229,11 +217,11 @@ def wrap_logger(logger, loglevel=logging.ERROR):
     old_handlers = get_handlers(logger)
     sio = WhateverIO()
     siohandler = logging.StreamHandler(sio)
-    set_handlers(logger, [siohandler])
+    logger.handlers = [siohandler]
 
     yield sio
 
-    set_handlers(logger, old_handlers)
+    logger.handlers = old_handlers
 
 
 @contextmanager

+ 1 - 113
celery/utils/compat.py

@@ -53,119 +53,6 @@ try:
 except ImportError:
     from ordereddict import OrderedDict  # noqa
 
-############## logging.LoggerAdapter ########################################
-import logging
-try:
-    import multiprocessing
-except ImportError:
-    multiprocessing = None  # noqa
-import sys
-
-
-def _checkLevel(level):
-    if isinstance(level, int):
-        rv = level
-    elif str(level) == level:
-        if level not in logging._levelNames:
-            raise ValueError("Unknown level: %r" % level)
-        rv = logging._levelNames[level]
-    else:
-        raise TypeError("Level not an integer or a valid string: %r" % level)
-    return rv
-
-
-class _CompatLoggerAdapter(object):
-
-    def __init__(self, logger, extra):
-        self.logger = logger
-        self.extra = extra
-
-    def setLevel(self, level):
-        self.logger.level = _checkLevel(level)
-
-    def process(self, msg, kwargs):
-        kwargs["extra"] = self.extra
-        return msg, kwargs
-
-    def debug(self, msg, *args, **kwargs):
-        self.log(logging.DEBUG, msg, *args, **kwargs)
-
-    def info(self, msg, *args, **kwargs):
-        self.log(logging.INFO, msg, *args, **kwargs)
-
-    def warning(self, msg, *args, **kwargs):
-        self.log(logging.WARNING, msg, *args, **kwargs)
-    warn = warning
-
-    def error(self, msg, *args, **kwargs):
-        self.log(logging.ERROR, msg, *args, **kwargs)
-
-    def exception(self, msg, *args, **kwargs):
-        kwargs.setdefault("exc_info", 1)
-        self.error(msg, *args, **kwargs)
-
-    def critical(self, msg, *args, **kwargs):
-        self.log(logging.CRITICAL, msg, *args, **kwargs)
-    fatal = critical
-
-    def log(self, level, msg, *args, **kwargs):
-        if self.logger.isEnabledFor(level):
-            msg, kwargs = self.process(msg, kwargs)
-            self._log(level, msg, args, **kwargs)
-
-    def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
-            func=None, extra=None):
-        rv = logging.LogRecord(name, level, fn, lno, msg, args, exc_info, func)
-        if extra is not None:
-            for key, value in extra.items():
-                if key in ("message", "asctime") or key in rv.__dict__:
-                    raise KeyError(
-                            "Attempt to override %r in LogRecord" % key)
-                rv.__dict__[key] = value
-        if multiprocessing is not None:
-            rv.processName = multiprocessing.current_process()._name
-        else:
-            rv.processName = ""
-        return rv
-
-    def _log(self, level, msg, args, exc_info=None, extra=None):
-        defcaller = "(unknown file)", 0, "(unknown function)"
-        if logging._srcfile:
-            # IronPython doesn't track Python frames, so findCaller
-            # throws an exception on some versions of IronPython.
-            # We trap it here so that IronPython can use logging.
-            try:
-                fn, lno, func = self.logger.findCaller()
-            except ValueError:
-                fn, lno, func = defcaller
-        else:
-            fn, lno, func = defcaller
-        if exc_info:
-            if not isinstance(exc_info, tuple):
-                exc_info = sys.exc_info()
-        record = self.makeRecord(self.logger.name, level, fn, lno, msg,
-                                 args, exc_info, func, extra)
-        self.logger.handle(record)
-
-    def isEnabledFor(self, level):
-        return self.logger.isEnabledFor(level)
-
-    def addHandler(self, hdlr):
-        self.logger.addHandler(hdlr)
-
-    def removeHandler(self, hdlr):
-        self.logger.removeHandler(hdlr)
-
-    @property
-    def level(self):
-        return self.logger.level
-
-
-try:
-    from logging import LoggerAdapter
-except ImportError:
-    LoggerAdapter = _CompatLoggerAdapter  # noqa
-
 ############## itertools.zip_longest #######################################
 
 try:
@@ -205,6 +92,7 @@ except AttributeError:
 
 
 ############## logging.handlers.WatchedFileHandler ##########################
+import logging
 import os
 from stat import ST_DEV, ST_INO
 import platform as _platform

+ 18 - 7
celery/utils/log.py

@@ -12,27 +12,38 @@ try:
 except ImportError:
     current_process = mputil = None  # noqa
 
-from kombu.log import get_logger, LOG_LEVELS, NullHandler
+from kombu.log import get_logger as _get_logger, LOG_LEVELS
 
 from .encoding import safe_str, str_t
 from .term import colored
 
 _process_aware = False
-
 is_py3k = sys.version_info[0] == 3
 
 
+# Sets up our logging hierarchy.
+#
+# Every logger in the celery package inherits from the "celery"
+# logger, and every task logger inherits from the "celery.task"
+# logger.
+logger = _get_logger("celery")
+mp_logger = _get_logger("multiprocessing")
+
+
+def get_logger(name):
+    l = _get_logger(name)
+    if l.parent is logging.root and l is not logger:
+        l.parent = logger
+    return l
+task_logger = get_logger("celery.task")
+
+
 def mlevel(level):
     if level and not isinstance(level, int):
         return LOG_LEVELS[level.upper()]
     return level
 
 
-# ensure loggers exists, to avoid 'no handler for' warnings.
-logger = get_logger("celery")
-mp_logger = get_logger("multiprocessing")
-
-
 class ColorFormatter(logging.Formatter):
     #: Loglevel -> Color mapping.
     COLORS = colored().names

+ 4 - 15
celery/utils/timer2.py

@@ -14,17 +14,15 @@ from __future__ import with_statement
 
 import atexit
 import heapq
-import logging
 import os
 import sys
-import traceback
-import warnings
 
 from itertools import count
 from threading import Condition, Event, Lock, Thread
 from time import time, sleep, mktime
 
 from datetime import datetime, timedelta
+from kombu.log import get_logger
 
 VERSION = (1, 0, 0)
 __version__ = ".".join(map(str, VERSION))
@@ -35,9 +33,7 @@ __docformat__ = "restructuredtext"
 
 DEFAULT_MAX_INTERVAL = 2
 
-
-class TimedFunctionFailed(UserWarning):
-    pass
+logger = get_logger("timer2")
 
 
 class Entry(object):
@@ -180,7 +176,6 @@ class Timer(Thread):
         self._is_shutdown = Event()
         self._is_stopped = Event()
         self.mutex = Lock()
-        self.logger = logging.getLogger("timer2.Timer")
         self.not_empty = Condition(self.mutex)
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
@@ -192,12 +187,7 @@ class Timer(Thread):
             exc_info = sys.exc_info()
             try:
                 if not self.schedule.handle_error(exc_info):
-                    warnings.warn(TimedFunctionFailed(repr(exc))),
-                    sys.stderr.write("Error in timer: %r\n" % (exc, ))
-                    traceback.print_exception(exc_info[0],
-                                              exc_info[1],
-                                              exc_info[2],
-                                              None, sys.stderr)
+                    logger.error("Error in timer: %r\n", exc, exc_info=True)
             finally:
                 del(exc_info)
 
@@ -231,8 +221,7 @@ class Timer(Thread):
                 # so gc collected built-in modules.
                 pass
         except Exception, exc:
-            self.logger.error("Thread Timer crashed: %r", exc,
-                              exc_info=True)
+            logger.error("Thread Timer crashed: %r", exc, exc_info=True)
             os._exit(1)
 
     def stop(self):

+ 17 - 19
celery/worker/__init__.py

@@ -29,6 +29,7 @@ from celery.app.abstract import configurated, from_config
 from celery.exceptions import SystemTerminate
 from celery.utils.functional import noop
 from celery.utils.imports import qualname, reload_from_cwd
+from celery.utils.log import get_logger
 
 from . import abstract
 from . import state
@@ -38,6 +39,8 @@ RUN = 0x1
 CLOSE = 0x2
 TERMINATE = 0x3
 
+logger = get_logger(__name__)
+
 
 class Namespace(abstract.Namespace):
     """This is the boot-step namespace of the :class:`WorkController`.
@@ -84,7 +87,6 @@ class Pool(abstract.StartStopComponent):
 
     def create(self, w):
         pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
-                                logger=w.logger,
                                 initargs=(w.app, w.hostname),
                                 maxtasksperchild=w.max_tasks_per_child,
                                 timeout=w.task_time_limit,
@@ -111,7 +113,6 @@ class Beat(abstract.StartStopComponent):
     def create(self, w):
         from celery.beat import EmbeddedService
         b = w.beat = EmbeddedService(app=w.app,
-                                     logger=w.logger,
                                      schedule_filename=w.schedule_filename,
                                      scheduler_cls=w.scheduler_cls)
         return b
@@ -197,8 +198,7 @@ class WorkController(configurated):
     _state = None
     _running = 0
 
-    def __init__(self, loglevel=None, hostname=None, logger=None,
-            ready_callback=noop,
+    def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
             queues=None, app=None, **kwargs):
         self.app = app_or_default(app or self.app)
 
@@ -216,7 +216,6 @@ class WorkController(configurated):
 
         # Options
         self.loglevel = loglevel or self.loglevel
-        self.logger = self.app.log.get_default_logger()
         self.hostname = hostname or socket.gethostname()
         self.ready_callback = ready_callback
         self._finalize = Finalize(self, self.stop, exitpriority=1)
@@ -225,8 +224,7 @@ class WorkController(configurated):
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
         self.components = []
-        self.namespace = Namespace(app=self.app,
-                                   logger=self.logger).apply(self, **kwargs)
+        self.namespace = Namespace(app=self.app).apply(self, **kwargs)
 
     def start(self):
         """Starts the workers main loop."""
@@ -234,15 +232,15 @@ class WorkController(configurated):
 
         try:
             for i, component in enumerate(self.components):
-                self.logger.debug("Starting %s...", qualname(component))
+                logger.debug("Starting %s...", qualname(component))
                 self._running = i + 1
                 component.start()
-                self.logger.debug("%s OK!", qualname(component))
+                logger.debug("%s OK!", qualname(component))
         except SystemTerminate:
             self.terminate()
         except Exception, exc:
-            self.logger.error("Unrecoverable error: %r", exc,
-                              exc_info=True)
+            logger.error("Unrecoverable error: %r", exc,
+                         exc_info=True)
             self.stop()
         except (KeyboardInterrupt, SystemExit):
             self.stop()
@@ -257,9 +255,9 @@ class WorkController(configurated):
             request.task.execute(request, self.pool,
                                  self.loglevel, self.logfile)
         except Exception, exc:
-            self.logger.critical("Internal error %s: %s\n%s",
-                                 exc.__class__, exc, traceback.format_exc(),
-                                 exc_info=True)
+            logger.critical("Internal error %s: %s\n%s",
+                            exc.__class__, exc, traceback.format_exc(),
+                            exc_info=True)
         except SystemTerminate:
             self.terminate()
             raise
@@ -292,7 +290,7 @@ class WorkController(configurated):
         self._state = self.CLOSE
 
         for component in reversed(self.components):
-            self.logger.debug("%s %s...", what, qualname(component))
+            logger.debug("%s %s...", what, qualname(component))
             stop = component.stop
             if not warm:
                 stop = getattr(component, "terminate", None) or stop
@@ -310,18 +308,18 @@ class WorkController(configurated):
 
         for module in set(modules or ()):
             if module not in sys.modules:
-                self.logger.debug("importing module %s", module)
+                logger.debug("importing module %s", module)
                 imp(module)
             elif reload:
-                self.logger.debug("reloading module %s", module)
+                logger.debug("reloading module %s", module)
                 reload_from_cwd(sys.modules[module], reloader)
         self.pool.restart()
 
     def on_timer_error(self, einfo):
-        self.logger.error("Timer error: %r", einfo[1], exc_info=einfo)
+        logger.error("Timer error: %r", einfo[1], exc_info=einfo)
 
     def on_timer_tick(self, delay):
-        self.logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
+        logger.debug("Scheduler wake-up! Next eta %s secs.", delay)
 
     @property
     def state(self):

+ 6 - 4
celery/worker/abstract.py

@@ -16,6 +16,9 @@ from importlib import import_module
 
 from celery.datastructures import DependencyGraph
 from celery.utils.imports import instantiate
+from celery.utils.log import get_logger
+
+logger = get_logger(__name__)
 
 
 class Namespace(object):
@@ -35,10 +38,9 @@ class Namespace(object):
     _unclaimed = defaultdict(dict)
     _started_count = 0
 
-    def __init__(self, name=None, app=None, logger=None):
+    def __init__(self, name=None, app=None):
         self.app = app
         self.name = name or self.name
-        self.logger = logger or self.app.log.get_default_logger()
         self.services = []
 
     def modules(self):
@@ -105,8 +107,8 @@ class Namespace(object):
         return self._unclaimed[self.name]
 
     def _debug(self, msg, *args):
-        return self.logger.debug("[%s] " + msg,
-                                *(self.name.capitalize(), ) + args)
+        return logger.debug("[%s] " + msg,
+                            *(self.name.capitalize(), ) + args)
 
 
 class ComponentType(type):

+ 6 - 6
celery/worker/autoreload.py

@@ -18,6 +18,7 @@ import time
 from collections import defaultdict
 
 from celery.utils.imports import module_file
+from celery.utils.log import get_logger
 from celery.utils.threads import bgThread, Event
 
 from .abstract import StartStopComponent
@@ -29,6 +30,8 @@ except ImportError:
     pyinotify = None        # noqa
     _ProcessEvent = object  # noqa
 
+logger = get_logger(__name__)
+
 
 class WorkerComponent(StartStopComponent):
     name = "worker.autoreloader"
@@ -40,8 +43,7 @@ class WorkerComponent(StartStopComponent):
 
     def create(self, w):
         w.autoreloader = self.instantiate(w.autoreloader_cls,
-                                          controller=w,
-                                          logger=w.logger)
+                                          controller=w)
         return w.autoreloader
 
 
@@ -201,13 +203,11 @@ class Autoreloader(bgThread):
     """Tracks changes in modules and fires reload commands"""
     Monitor = Monitor
 
-    def __init__(self, controller, modules=None, monitor_cls=None,
-            logger=None, **options):
+    def __init__(self, controller, modules=None, monitor_cls=None, **options):
         super(Autoreloader, self).__init__()
         self.controller = controller
         app = self.controller.app
         self.modules = app.loader.task_modules if modules is None else modules
-        self.logger = logger
         self.options = options
         self.Monitor = monitor_cls or self.Monitor
         self._monitor = None
@@ -235,7 +235,7 @@ class Autoreloader(bgThread):
         modified = [f for f in files if self._maybe_modified(f)]
         if modified:
             names = [self._module_name(module) for module in modified]
-            self.logger.info("Detected modified modules: %r", names)
+            logger.info("Detected modified modules: %r", names)
             self._reload(names)
 
     def _reload(self, modules):

+ 10 - 10
celery/worker/autoscale.py

@@ -21,11 +21,15 @@ import threading
 
 from time import sleep, time
 
+from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
 
 from . import state
 from .abstract import StartStopComponent
 
+logger = get_logger(__name__)
+debug, info, error = logger.debug, logger.info, logger.error
+
 
 class WorkerComponent(StartStopComponent):
     name = "worker.autoscaler"
@@ -38,22 +42,19 @@ class WorkerComponent(StartStopComponent):
     def create(self, w):
         scaler = w.autoscaler = self.instantiate(w.autoscaler_cls, w.pool,
                                     max_concurrency=w.max_concurrency,
-                                    min_concurrency=w.min_concurrency,
-                                    logger=w.logger)
+                                    min_concurrency=w.min_concurrency)
         return scaler
 
 
 class Autoscaler(bgThread):
 
-    def __init__(self, pool, max_concurrency, min_concurrency=0,
-            keepalive=30, logger=None):
+    def __init__(self, pool, max_concurrency, min_concurrency=0, keepalive=30):
         super(Autoscaler, self).__init__()
         self.pool = pool
         self.mutex = threading.Lock()
         self.max_concurrency = max_concurrency
         self.min_concurrency = min_concurrency
         self.keepalive = keepalive
-        self.logger = logger
         self._last_action = None
 
         assert self.keepalive, "can't scale down too fast."
@@ -101,18 +102,17 @@ class Autoscaler(bgThread):
         return self._grow(n)
 
     def _grow(self, n):
-        self.logger.info("Scaling up %s processes.", n)
+        info("Scaling up %s processes.", n)
         self.pool.grow(n)
 
     def _shrink(self, n):
-        self.logger.info("Scaling down %s processes.", n)
+        info("Scaling down %s processes.", n)
         try:
             self.pool.shrink(n)
         except ValueError:
-            self.logger.debug(
-                "Autoscaler won't scale down: all processes busy.")
+            debug("Autoscaler won't scale down: all processes busy.")
         except Exception, exc:
-            self.logger.error("Autoscaler: scale_down: %r", exc, exc_info=True)
+            error("Autoscaler: scale_down: %r", exc, exc_info=True)
 
     def scale_down(self, n):
         if not self._last_action or not n:

+ 59 - 63
celery/worker/consumer.py

@@ -79,7 +79,6 @@ from __future__ import with_statement
 import logging
 import socket
 import threading
-import warnings
 
 from kombu.utils.encoding import safe_repr
 
@@ -88,6 +87,7 @@ from celery.datastructures import AttributeDict
 from celery.exceptions import InvalidTaskError
 from celery.utils import timer2
 from celery.utils.functional import noop
+from celery.utils.log import get_logger
 
 from . import state
 from .abstract import StartStopComponent
@@ -100,6 +100,11 @@ CLOSE = 0x2
 #: Prefetch count can't exceed short.
 PREFETCH_COUNT_MAX = 0xFFFF
 
+UNKNOWN_FORMAT = """\
+Received and deleted unknown message. Wrong destination?!?
+
+The full contents of the message body was: %s
+"""
 #: Error message for when an unregistered task is received.
 UNKNOWN_TASK_ERROR = """\
 Received unregistered task of type %s.
@@ -130,6 +135,20 @@ body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
 """
 
 
+RETRY_CONNECTION = """\
+Consumer: Connection to broker lost. \
+Trying to re-establish the connection...\
+"""
+
+logger = get_logger(__name__)
+info, warn, error, crit = (logger.info, logger.warn,
+                           logger.error, logger.critical)
+
+
+def debug(msg, *args, **kwargs):
+    logger.debug("Consumer: %s" % (msg, ), *args, **kwargs)
+
+
 class Component(StartStopComponent):
     name = "worker.consumer"
     last = True
@@ -138,7 +157,7 @@ class Component(StartStopComponent):
         prefetch_count = w.concurrency * w.prefetch_multiplier
         c = w.consumer = self.instantiate(
                 w.consumer_cls, w.ready_queue, w.scheduler,
-                logger=w.logger, hostname=w.hostname,
+                hostname=w.hostname,
                 send_events=w.send_events,
                 init_callback=w.ready_callback,
                 initial_prefetch_count=prefetch_count,
@@ -156,14 +175,12 @@ class QoS(object):
 
     :param consumer: A :class:`kombu.messaging.Consumer` instance.
     :param initial_value: Initial prefetch count value.
-    :param logger: Logger used to log debug messages.
 
     """
     prev = None
 
-    def __init__(self, consumer, initial_value, logger):
+    def __init__(self, consumer, initial_value):
         self.consumer = consumer
-        self.logger = logger
         self._mutex = threading.RLock()
         self.value = initial_value
 
@@ -203,10 +220,10 @@ class QoS(object):
         if pcount != self.prev:
             new_value = pcount
             if pcount > PREFETCH_COUNT_MAX:
-                self.logger.warning("QoS: Disabled: prefetch_count exceeds %r",
-                                    PREFETCH_COUNT_MAX)
+                warn("QoS: Disabled: prefetch_count exceeds %r",
+                     PREFETCH_COUNT_MAX)
                 new_value = 0
-            self.logger.debug("basic.qos: prefetch_count->%s", new_value)
+            debug("basic.qos: prefetch_count->%s", new_value)
             self.consumer.qos(prefetch_count=new_value)
             self.prev = pcount
         return pcount
@@ -254,9 +271,6 @@ class Consumer(object):
     #: went offline/disappeared.
     heart = None
 
-    #: The logger instance to use.  Defaults to the default Celery logger.
-    logger = None
-
     #: The broker connection.
     connection = None
 
@@ -281,10 +295,10 @@ class Consumer(object):
     # Consumer state, can be RUN or CLOSE.
     _state = None
 
-    def __init__(self, ready_queue, eta_schedule, logger,
+    def __init__(self, ready_queue, eta_schedule,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
-            priority_timer=None, controller=None):
+            priority_timer=None, controller=None, **kwargs):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
@@ -294,7 +308,6 @@ class Consumer(object):
         self.eta_schedule = eta_schedule
         self.send_events = send_events
         self.init_callback = init_callback
-        self.logger = logger
         self.hostname = hostname or socket.gethostname()
         self.initial_prefetch_count = initial_prefetch_count
         self.event_dispatcher = None
@@ -302,7 +315,6 @@ class Consumer(object):
         self.pool = pool
         self.priority_timer = priority_timer or timer2.default_timer
         pidbox_state = AttributeDict(app=self.app,
-                                     logger=logger,
                                      hostname=self.hostname,
                                      listener=self,     # pre 2.2
                                      consumer=self)
@@ -313,7 +325,7 @@ class Consumer(object):
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
 
-        self._does_info = self.logger.isEnabledFor(logging.INFO)
+        self._does_info = logger.isEnabledFor(logging.INFO)
         self.strategies = {}
 
     def update_strategies(self):
@@ -337,15 +349,13 @@ class Consumer(object):
                 self.reset_connection()
                 self.consume_messages()
             except self.connection_errors + self.channel_errors:
-                self.logger.error("Consumer: Connection to broker lost."
-                                + " Trying to re-establish the connection...",
-                                exc_info=True)
+                error(RETRY_CONNECTION, exc_info=True)
 
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
-        self._debug("Starting message consumer...")
+        debug("Starting message consumer...")
         self.task_consumer.consume()
-        self._debug("Ready to accept tasks!")
+        debug("Ready to accept tasks!")
 
         while self._state != CLOSE and self.connection:
             if self.qos.prev != self.qos.value:
@@ -370,7 +380,7 @@ class Consumer(object):
             return
 
         if self._does_info:
-            self.logger.info("Got task from broker: %s", task.shortinfo())
+            info("Got task from broker: %s", task.shortinfo())
 
         if self.event_dispatcher.enabled:
             self.event_dispatcher.send("task-received", uuid=task.id,
@@ -384,10 +394,8 @@ class Consumer(object):
             try:
                 eta = timer2.to_timestamp(task.eta)
             except OverflowError, exc:
-                self.logger.error(
-                    "Couldn't convert eta %s to timestamp: %r. Task: %r",
-                    task.eta, exc, task.info(safe=True),
-                    exc_info=True)
+                error("Couldn't convert eta %s to timestamp: %r. Task: %r",
+                      task.eta, exc, task.info(safe=True), exc_info=True)
                 task.acknowledge()
             else:
                 self.qos.increment()
@@ -402,11 +410,9 @@ class Consumer(object):
         try:
             self.pidbox_node.handle_message(body, message)
         except KeyError, exc:
-            self.logger.error("No such control command: %s", exc)
+            error("No such control command: %s", exc)
         except Exception, exc:
-            self.logger.error(
-                "Error occurred while handling control command: %r",
-                    exc, exc_info=True)
+            error("Control command error: %r", exc, exc_info=True)
             self.reset_pidbox_node()
 
     def apply_eta_task(self, task):
@@ -432,23 +438,18 @@ class Consumer(object):
         try:
             name = body["task"]
         except (KeyError, TypeError):
-            warnings.warn(RuntimeWarning(
-                "Received and deleted unknown message. Wrong destination?!? \
-                the full contents of the message body was: %s" % (
-                 self._message_report(body, message), )))
-            message.reject_log_error(self.logger, self.connection_errors)
+            warn(UNKNOWN_FORMAT, self._message_report(body, message))
+            message.reject_log_error(logger, self.connection_errors)
             return
 
         try:
             self.strategies[name](message, body, message.ack_log_error)
         except KeyError, exc:
-            self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
-                              exc_info=True)
-            message.reject_log_error(self.logger, self.connection_errors)
+            error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
+            message.reject_log_error(logger, self.connection_errors)
         except InvalidTaskError, exc:
-            self.logger.error(INVALID_TASK_ERROR, str(exc), safe_repr(body),
-                              exc_info=True)
-            message.reject_log_error(self.logger, self.connection_errors)
+            error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
+            message.reject_log_error(logger, self.connection_errors)
 
     def maybe_conn_error(self, fun):
         """Applies function but ignores any connection or channel
@@ -468,14 +469,14 @@ class Consumer(object):
         connection, self.connection = self.connection, None
 
         if self.task_consumer:
-            self._debug("Closing consumer channel...")
+            debug("Closing consumer channel...")
             self.task_consumer = \
                     self.maybe_conn_error(self.task_consumer.close)
 
         self.stop_pidbox_node()
 
         if connection:
-            self._debug("Closing broker connection...")
+            debug("Closing broker connection...")
             self.maybe_conn_error(connection.close)
 
     def stop_consumers(self, close_connection=True):
@@ -491,19 +492,19 @@ class Consumer(object):
 
         if self.heart:
             # Stop the heartbeat thread if it's running.
-            self.logger.debug("Heart: Going into cardiac arrest...")
+            debug("Heart: Going into cardiac arrest...")
             self.heart = self.heart.stop()
 
-        self._debug("Cancelling task consumer...")
+        debug("Cancelling task consumer...")
         if self.task_consumer:
             self.maybe_conn_error(self.task_consumer.cancel)
 
         if self.event_dispatcher:
-            self._debug("Shutting down event dispatcher...")
+            debug("Shutting down event dispatcher...")
             self.event_dispatcher = \
                     self.maybe_conn_error(self.event_dispatcher.close)
 
-        self._debug("Cancelling broadcast consumer...")
+        debug("Cancelling broadcast consumer...")
         if self.broadcast_consumer:
             self.maybe_conn_error(self.broadcast_consumer.cancel)
 
@@ -521,10 +522,9 @@ class Consumer(object):
         :param exc: The original exception instance.
 
         """
-        self.logger.critical(
-            "Can't decode message body: %r (type:%r encoding:%r raw:%r')",
-                    exc, message.content_type, message.content_encoding,
-                    safe_repr(message.body))
+        crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
+             exc, message.content_type, message.content_encoding,
+             safe_repr(message.body))
         message.ack()
 
     def reset_pidbox_node(self):
@@ -547,11 +547,11 @@ class Consumer(object):
     def stop_pidbox_node(self):
         if self._pidbox_node_stopped:
             self._pidbox_node_shutdown.set()
-            self._debug("Waiting for broadcast thread to shutdown...")
+            debug("Waiting for broadcast thread to shutdown...")
             self._pidbox_node_stopped.wait()
             self._pidbox_node_stopped = self._pidbox_node_shutdown = None
         elif self.broadcast_consumer:
-            self._debug("Closing broadcast channel...")
+            debug("Closing broadcast channel...")
             self.broadcast_consumer = \
                 self.maybe_conn_error(self.broadcast_consumer.channel.close)
 
@@ -579,7 +579,7 @@ class Consumer(object):
     def reset_connection(self):
         """Re-establish the broker connection and set up consumers,
         heartbeat and the event dispatcher."""
-        self._debug("Re-establishing connection to the broker...")
+        debug("Re-establishing connection to the broker...")
         self.stop_consumers()
 
         # Clear internal queues to get rid of old messages.
@@ -590,12 +590,11 @@ class Consumer(object):
 
         # Re-establish the broker connection and setup the task consumer.
         self.connection = self._open_connection()
-        self._debug("Connection established.")
+        debug("Connection established.")
         self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
                                     on_decode_error=self.on_decode_error)
         # QoS: Reset prefetch window.
-        self.qos = QoS(self.task_consumer,
-                       self.initial_prefetch_count, self.logger)
+        self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
         self.qos.update()
 
         # receive_message handles incoming messages.
@@ -643,8 +642,8 @@ class Consumer(object):
         # Callback called for each retry while the connection
         # can't be established.
         def _error_handler(exc, interval):
-            self.logger.error("Consumer: Connection Error: %s. "
-                              "Trying again in %d seconds...", exc, interval)
+            error("Consumer: Connection Error: %s. "
+                  "Trying again in %d seconds...", exc, interval)
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.
@@ -667,7 +666,7 @@ class Consumer(object):
         # Notifies other threads that this instance can't be used
         # anymore.
         self._state = CLOSE
-        self._debug("Stopping consumers...")
+        debug("Stopping consumers...")
         self.stop_consumers(close_connection=False)
 
     @property
@@ -684,6 +683,3 @@ class Consumer(object):
             conninfo = self.connection.info()
             conninfo.pop("password", None)  # don't send password.
         return {"broker": conninfo, "prefetch_count": self.qos.value}
-
-    def _debug(self, msg, **kwargs):
-        self.logger.debug("Consumer: %s", msg, **kwargs)

+ 28 - 28
celery/worker/control.py

@@ -18,11 +18,13 @@ from kombu.utils.encoding import safe_repr
 from celery.platforms import signals as _signals
 from celery.utils import timeutils
 from celery.utils.compat import UserDict
+from celery.utils.log import get_logger
 
 from . import state
 from .state import revoked
 
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
+logger = get_logger(__name__)
 
 
 class Panel(UserDict):
@@ -47,7 +49,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
                 request.terminate(panel.consumer.pool, signal=signum)
                 break
 
-    panel.logger.info("Task %s %s.", task_id, action)
+    logger.info("Task %s %s.", task_id, action)
     return {"ok": "task %s %s" % (task_id, action)}
 
 
@@ -62,7 +64,7 @@ def enable_events(panel):
     if not dispatcher.enabled:
         dispatcher.enable()
         dispatcher.send("worker-online")
-        panel.logger.info("Events enabled by remote.")
+        logger.info("Events enabled by remote.")
         return {"ok": "events enabled"}
     return {"ok": "events already enabled"}
 
@@ -73,14 +75,14 @@ def disable_events(panel):
     if dispatcher.enabled:
         dispatcher.send("worker-offline")
         dispatcher.disable()
-        panel.logger.info("Events disabled by remote.")
+        logger.info("Events disabled by remote.")
         return {"ok": "events disabled"}
     return {"ok": "events already disabled"}
 
 
 @Panel.register
 def heartbeat(panel):
-    panel.logger.debug("Heartbeat requested by remote.")
+    logger.debug("Heartbeat requested by remote.")
     dispatcher = panel.consumer.event_dispatcher
     dispatcher.send("worker-heartbeat", freq=5, **state.SOFTWARE_INFO)
 
@@ -104,23 +106,22 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
     try:
         panel.app.tasks[task_name].rate_limit = rate_limit
     except KeyError:
-        panel.logger.error("Rate limit attempt for unknown task %s",
-                           task_name, exc_info=True)
+        logger.error("Rate limit attempt for unknown task %s",
+                     task_name, exc_info=True)
         return {"error": "unknown task"}
 
     if not hasattr(panel.consumer.ready_queue, "refresh"):
-        panel.logger.error("Rate limit attempt, but rate limits disabled.")
+        logger.error("Rate limit attempt, but rate limits disabled.")
         return {"error": "rate limits disabled"}
 
     panel.consumer.ready_queue.refresh()
 
     if not rate_limit:
-        panel.logger.info("Disabled rate limits for tasks of type %s",
-                          task_name)
+        logger.info("Rate limits disabled for tasks of type %s", task_name)
         return {"ok": "rate limit disabled successfully"}
 
-    panel.logger.info("New rate limit for tasks of type %s: %s.",
-                      task_name, rate_limit)
+    logger.info("New rate limit for tasks of type %s: %s.",
+                task_name, rate_limit)
     return {"ok": "new rate limit set successfully"}
 
 
@@ -129,15 +130,15 @@ def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
     try:
         task = panel.app.tasks[task_name]
     except KeyError:
-        panel.logger.error("Change time limit attempt for unknown task %s",
-                           task_name, exc_info=True)
+        logger.error("Change time limit attempt for unknown task %s",
+                     task_name, exc_info=True)
         return {"error": "unknown task"}
 
     task.soft_time_limit = soft
     task.time_limit = hard
 
-    panel.logger.info("New time limits for tasks of type %s: soft=%s hard=%s",
-                      task_name, soft, hard)
+    logger.info("New time limits for tasks of type %s: soft=%s hard=%s",
+                task_name, soft, hard)
     return {"ok": "time limits set successfully"}
 
 
@@ -145,7 +146,7 @@ def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
 def dump_schedule(panel, safe=False, **kwargs):
     schedule = panel.consumer.eta_schedule.schedule
     if not schedule.queue:
-        panel.logger.info("--Empty schedule--")
+        logger.info("--Empty schedule--")
         return []
 
     formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
@@ -153,7 +154,7 @@ def dump_schedule(panel, safe=False, **kwargs):
             item["priority"],
             item["item"])
     info = map(formatitem, enumerate(schedule.info()))
-    panel.logger.debug("* Dump of current schedule:\n%s", "\n".join(info))
+    logger.debug("* Dump of current schedule:\n%s", "\n".join(info))
     scheduled_tasks = []
     for item in schedule.info():
         scheduled_tasks.append({"eta": item["eta"],
@@ -168,10 +169,10 @@ def dump_reserved(panel, safe=False, **kwargs):
     ready_queue = panel.consumer.ready_queue
     reserved = ready_queue.items
     if not reserved:
-        panel.logger.info("--Empty queue--")
+        logger.info("--Empty queue--")
         return []
-    panel.logger.debug("* Dump of currently reserved tasks:\n%s",
-                       "\n".join(map(safe_repr, reserved)))
+    logger.debug("* Dump of currently reserved tasks:\n%s",
+                 "\n".join(map(safe_repr, reserved)))
     return [request.info(safe=safe)
             for request in reserved]
 
@@ -213,8 +214,7 @@ def dump_tasks(panel, **kwargs):
 
     info = map(_extract_info, (tasks[task]
                                     for task in sorted(tasks.keys())))
-    panel.logger.debug("* Dump of currently registered tasks:\n%s",
-                       "\n".join(info))
+    logger.debug("* Dump of currently registered tasks:\n%s", "\n".join(info))
 
     return info
 
@@ -258,9 +258,9 @@ def autoscale(panel, max=None, min=None):
 
 
 @Panel.register
-def shutdown(panel, **kwargs):
-    panel.logger.warning("Got shutdown from remote.")
-    raise SystemExit("Got shutdown from remote")
+def shutdown(panel, msg="Got shutdown from remote", **kwargs):
+    logger.warning(msg)
+    raise SystemExit(msg)
 
 
 @Panel.register
@@ -275,10 +275,10 @@ def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
                            **options)
         cset.add_consumer_from_dict(**declaration)
         cset.consume()
-        panel.logger.info("Started consuming from %r", declaration)
-        return {"ok": "started consuming from %s" % (queue, )}
+        logger.info("Started consuming from %r", declaration)
+        return {"ok": "started consuming from %r" % (queue, )}
     else:
-        return {"ok": "already consuming from %s" % (queue, )}
+        return {"ok": "already consuming from %r" % (queue, )}
 
 
 @Panel.register

+ 28 - 29
celery/worker/job.py

@@ -30,11 +30,16 @@ from celery.task.trace import build_tracer, trace_task, report_internal_error
 from celery.platforms import set_mp_process_title as setps
 from celery.utils import fun_takes_kwargs
 from celery.utils.functional import noop
+from celery.utils.log import get_logger
 from celery.utils.text import truncate
 from celery.utils.timeutils import maybe_iso8601, timezone
 
 from . import state
 
+logger = get_logger(__name__)
+debug, info, warn, error = (logger.debug, logger.info,
+                            logger.warn, logger.error)
+
 # Localize
 tz_to_local = timezone.to_local
 tz_or_local = timezone.tz_or_local
@@ -70,7 +75,7 @@ class Request(object):
     __slots__ = ("app", "name", "id", "args", "kwargs",
                  "on_ack", "delivery_info", "hostname",
                  "callbacks", "errbacks",
-                 "logger", "eventer", "connection_errors",
+                 "eventer", "connection_errors",
                  "task", "eta", "expires",
                  "_does_debug", "_does_info", "request_dict",
                  "acknowledged", "success_msg", "error_msg",
@@ -96,7 +101,7 @@ class Request(object):
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
     def __init__(self, body, on_ack=noop,
-            hostname=None, logger=None, eventer=None, app=None,
+            hostname=None, eventer=None, app=None,
             connection_errors=None, request_dict=None,
             delivery_info=None, task=None, **opts):
         self.app = app or app_or_default(app)
@@ -116,7 +121,6 @@ class Request(object):
         utc = body.get("utc", False)
         self.on_ack = on_ack
         self.hostname = hostname or socket.gethostname()
-        self.logger = logger or self.app.log.get_default_logger()
         self.eventer = eventer
         self.connection_errors = connection_errors or ()
         self.task = task or self.app.tasks[name]
@@ -145,8 +149,8 @@ class Request(object):
         }
 
         ## shortcuts
-        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
-        self._does_info = self.logger.isEnabledFor(logging.INFO)
+        self._does_debug = logger.isEnabledFor(logging.DEBUG)
+        self._does_info = logger.isEnabledFor(logging.INFO)
 
         self.request_dict = body
 
@@ -266,8 +270,7 @@ class Request(object):
         if self.expires:
             self.maybe_expire()
         if self.id in state.revoked:
-            self.logger.warn("Skipping revoked task: %s[%s]",
-                             self.name, self.id)
+            warn("Skipping revoked task: %s[%s]", self.name, self.id)
             self.send_event("task-revoked", uuid=self.id)
             self.acknowledge()
             self._already_revoked = True
@@ -287,8 +290,7 @@ class Request(object):
             self.acknowledge()
         self.send_event("task-started", uuid=self.id, pid=pid)
         if self._does_debug:
-            self.logger.debug("Task accepted: %s[%s] pid:%r",
-                              self.name, self.id, pid)
+            debug("Task accepted: %s[%s] pid:%r", self.name, self.id, pid)
         if self._terminate_on_ack is not None:
             _, pool, signal = self._terminate_on_ack
             self.terminate(pool, signal)
@@ -297,12 +299,12 @@ class Request(object):
         """Handler called if the task times out."""
         state.task_ready(self)
         if soft:
-            self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]",
-                                timeout, self.name, self.id)
+            warn("Soft time limit (%ss) exceeded for %s[%s]",
+                 timeout, self.name, self.id)
             exc = exceptions.SoftTimeLimitExceeded(timeout)
         else:
-            self.logger.error("Hard time limit (%ss) exceeded for %s[%s]",
-                              timeout, self.name, self.id)
+            error("Hard time limit (%ss) exceeded for %s[%s]",
+                  timeout, self.name, self.id)
             exc = exceptions.TimeLimitExceeded(timeout)
 
         if self.store_errors:
@@ -329,11 +331,10 @@ class Request(object):
         if self._does_info:
             now = now or time.time()
             runtime = self.time_start and (time.time() - self.time_start) or 0
-            self.logger.info(self.success_msg.strip(),
-                        {"id": self.id,
-                        "name": self.name,
-                        "return_value": self.repr_result(ret_value),
-                        "runtime": runtime})
+            info(self.success_msg.strip(), {
+                    "id": self.id, "name": self.name,
+                    "return_value": self.repr_result(ret_value),
+                    "runtime": runtime})
 
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
@@ -342,11 +343,9 @@ class Request(object):
                          traceback=safe_str(exc_info.traceback))
 
         if self._does_info:
-            self.logger.info(self.retry_msg.strip(),
-                            {"id": self.id,
-                             "name": self.name,
-                             "exc": safe_repr(exc_info.exception.exc)},
-                            exc_info=exc_info)
+            info(self.retry_msg.strip(), {
+                "id": self.id, "name": self.name,
+                "exc": safe_repr(exc_info.exception.exc)}, exc_info=exc_info)
 
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""
@@ -390,11 +389,11 @@ class Request(object):
                    "kwargs": safe_repr(self.kwargs),
                    "description": description}
 
-        self.logger.log(severity, format.strip(), context,
-                        exc_info=exc_info.exc_info,
-                        extra={"data": {"id": self.id,
-                                        "name": self.name,
-                                        "hostname": self.hostname}})
+        logger.log(severity, format.strip(), context,
+                   exc_info=exc_info.exc_info,
+                   extra={"data": {"id": self.id,
+                                   "name": self.name,
+                                   "hostname": self.hostname}})
 
         task_obj = self.app.tasks.get(self.name, object)
         task_obj.send_error_email(context, exc_info.exception)
@@ -402,7 +401,7 @@ class Request(object):
     def acknowledge(self):
         """Acknowledge task."""
         if not self.acknowledged:
-            self.on_ack(self.logger, self.connection_errors)
+            self.on_ack(logger, self.connection_errors)
             self.acknowledged = True
 
     def repr_result(self, result, maxlen=46):

+ 13 - 12
celery/worker/mediator.py

@@ -24,9 +24,12 @@ from Queue import Empty
 
 from celery.app import app_or_default
 from celery.utils.threads import bgThread
+from celery.utils.log import get_logger
 
 from .abstract import StartStopComponent
 
+logger = get_logger(__name__)
+
 
 class WorkerComponent(StartStopComponent):
     name = "worker.mediator"
@@ -40,8 +43,7 @@ class WorkerComponent(StartStopComponent):
 
     def create(self, w):
         m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,
-                                          app=w.app, callback=w.process_task,
-                                          logger=w.logger)
+                                          app=w.app, callback=w.process_task)
         return m
 
 
@@ -53,12 +55,11 @@ class Mediator(bgThread):
     #: Callback called when a task is obtained.
     callback = None
 
-    def __init__(self, ready_queue, callback, logger=None, app=None):
+    def __init__(self, ready_queue, callback, app=None, **kw):
         self.app = app_or_default(app)
-        self.logger = logger or self.app.log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
-        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
+        self._does_debug = logger.isEnabledFor(logging.DEBUG)
         super(Mediator, self).__init__()
 
     def body(self):
@@ -71,15 +72,15 @@ class Mediator(bgThread):
             return
 
         if self._does_debug:
-            self.logger.debug("Mediator: Running callback for task: %s[%s]",
-                              task.name, task.id)
+            logger.debug("Mediator: Running callback for task: %s[%s]",
+                         task.name, task.id)
 
         try:
             self.callback(task)
         except Exception, exc:
-            self.logger.error("Mediator callback raised exception %r",
-                              exc, exc_info=True,
-                              extra={"data": {"id": task.id,
-                                              "name": task.name,
-                                              "hostname": task.hostname}})
+            logger.error("Mediator callback raised exception %r",
+                         exc, exc_info=True,
+                         extra={"data": {"id": task.id,
+                                         "name": task.name,
+                                         "hostname": task.hostname}})
     move = body   # XXX compat

+ 2 - 4
celery/worker/strategy.py

@@ -4,7 +4,6 @@ from .job import Request
 
 
 def default(task, app, consumer):
-    logger = consumer.logger
     hostname = consumer.hostname
     eventer = consumer.event_dispatcher
     Req = Request
@@ -13,9 +12,8 @@ def default(task, app, consumer):
 
     def task_message_handler(message, body, ack):
         handle(Req(body, on_ack=ack, app=app, hostname=hostname,
-                         eventer=eventer, logger=logger,
+                         eventer=eventer, task=task,
                          connection_errors=connection_errors,
-                         delivery_info=message.delivery_info,
-                         task=task))
+                         delivery_info=message.delivery_info))
 
     return task_message_handler