Browse Source

Cosmetics and cleanup

Ask Solem 14 years ago
parent
commit
156c8edb02

+ 30 - 1
Changelog

@@ -246,6 +246,35 @@ News
 
 
 * The internal module `celery.task.builtins` has been removed.
 * The internal module `celery.task.builtins` has been removed.
 
 
+* The module `celery.task.schedules` is deprecated, and
+  `celery.schedules` should be used instead.
+
+    For example if you have::
+
+        from celery.task.schedules import crontab
+
+    You should replace that with::
+
+        from celery.schedules import crontab
+
+    The module needs to be renamed because it must be possible
+    to import schedules without importing the `celery.task` module.
+
+* The following functions have been deprecated and is scheduled for
+  removed in version 2.3:
+
+    * `celery.execute.apply_async`
+
+        Use `task.apply_async()` instead.
+
+    * `celery.execute.apply`
+
+        Use `task.apply()` instead.
+
+    * `celery.execute.delay_task`
+
+        Use `registry.tasks[name].delay()` instead.
+
 * Importing `TaskSet` from `celery.task.base` is now deprecated.
 * Importing `TaskSet` from `celery.task.base` is now deprecated.
 
 
     You should use::
     You should use::
@@ -2322,7 +2351,7 @@ News
 
 
     .. code-block:: python
     .. code-block:: python
 
 
-        from celery.task.schedules import crontab
+        from celery.schedules import crontab
         from celery.decorators import periodic_task
         from celery.decorators import periodic_task
 
 
         @periodic_task(run_every=crontab(hour=7, minute=30))
         @periodic_task(run_every=crontab(hour=7, minute=30))

+ 1 - 1
FAQ

@@ -667,7 +667,7 @@ Can I schedule tasks to execute at a specific time?
 **Answer**: Yes. You can use the `eta` argument of :meth:`Task.apply_async`.
 **Answer**: Yes. You can use the `eta` argument of :meth:`Task.apply_async`.
 
 
 Or to schedule a periodic task at a specific time, use the
 Or to schedule a periodic task at a specific time, use the
-:class:`celery.task.schedules.crontab` schedule behavior:
+:class:`celery.schedules.crontab` schedule behavior:
 
 
 
 
 .. code-block:: python
 .. code-block:: python

+ 13 - 41
celery/app/amqp.py

@@ -15,7 +15,7 @@ from kombu import BrokerConnection
 from kombu.connection import Resource
 from kombu.connection import Resource
 from kombu import compat as messaging
 from kombu import compat as messaging
 
 
-from celery import routes
+from celery import routes as _routes
 from celery import signals
 from celery import signals
 from celery.utils import gen_unique_id, textindent, cached_property
 from celery.utils import gen_unique_id, textindent, cached_property
 from celery.utils import promise, maybe_promise
 from celery.utils import promise, maybe_promise
@@ -23,9 +23,8 @@ from celery.utils.compat import UserDict
 
 
 #: List of known options to a Kombu producers send method.
 #: List of known options to a Kombu producers send method.
 #: Used to extract the message related options out of any `dict`.
 #: Used to extract the message related options out of any `dict`.
-MSG_OPTIONS = ("mandatory", "priority", "immediate",
-               "routing_key", "serializer", "delivery_mode",
-               "compression")
+MSG_OPTIONS = ("mandatory", "priority", "immediate", "routing_key",
+                "serializer", "delivery_mode", "compression")
 
 
 #: Human readable queue declaration.
 #: Human readable queue declaration.
 QUEUE_FORMAT = """
 QUEUE_FORMAT = """
@@ -33,12 +32,7 @@ QUEUE_FORMAT = """
 binding:%(binding_key)s
 binding:%(binding_key)s
 """
 """
 
 
-#: Broker connection info -> URI
-BROKER_FORMAT = """\
-%(transport)s://%(userid)s@%(hostname)s%(port)s%(virtual_host)s\
-"""
-
-#: Set of exchange names that has already been declared.
+#: Set of exchange names that have already been declared.
 _exchanges_declared = set()
 _exchanges_declared = set()
 
 
 
 
@@ -253,14 +247,13 @@ class AMQP(object):
     _queues_declared = False
     _queues_declared = False
 
 
     #: Cached and prepared routing table.
     #: Cached and prepared routing table.
-    _routes = None
+    _rtable = None
 
 
     def __init__(self, app):
     def __init__(self, app):
         self.app = app
         self.app = app
 
 
     def flush_routes(self):
     def flush_routes(self):
-        self._routes = routes.prepare(
-                        self.app.conf.get("CELERY_ROUTES") or {})
+        self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
 
 
     def Queues(self, queues):
     def Queues(self, queues):
         """Create new :class:`Queues` instance, using queue defaults
         """Create new :class:`Queues` instance, using queue defaults
@@ -271,17 +264,14 @@ class AMQP(object):
                         "exchange": conf.CELERY_DEFAULT_EXCHANGE,
                         "exchange": conf.CELERY_DEFAULT_EXCHANGE,
                         "exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
                         "exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
                         "binding_key": conf.CELERY_DEFAULT_ROUTING_KEY}}
                         "binding_key": conf.CELERY_DEFAULT_ROUTING_KEY}}
-        return Queues.with_defaults(queues,
-                                    conf.CELERY_DEFAULT_EXCHANGE,
-                                    conf.CELERY_DEFAULT_EXCHANGE_TYPE)
+        return Queues.with_defaults(queues, conf.CELERY_DEFAULT_EXCHANGE,
+                                            conf.CELERY_DEFAULT_EXCHANGE_TYPE)
 
 
     def Router(self, queues=None, create_missing=None):
     def Router(self, queues=None, create_missing=None):
         """Returns the current task router."""
         """Returns the current task router."""
-        return routes.Router(self.routes,
-                             queues or self.queues,
-                             self.app.either("CELERY_CREATE_MISSING_QUEUES",
-                                             create_missing),
-                             app=self.app)
+        return _routes.Router(self.routes, queues or self.queues,
+                              self.app.either("CELERY_CREATE_MISSING_QUEUES",
+                                              create_missing), app=self.app)
 
 
     def TaskConsumer(self, *args, **kwargs):
     def TaskConsumer(self, *args, **kwargs):
         """Returns consumer for a single task queue."""
         """Returns consumer for a single task queue."""
@@ -331,24 +321,6 @@ class AMQP(object):
         q = self.app.conf.CELERY_DEFAULT_QUEUE
         q = self.app.conf.CELERY_DEFAULT_QUEUE
         return q, self.queues[q]
         return q, self.queues[q]
 
 
-    def get_broker_info(self, broker_connection=None):
-        """Returns information about the current broker connection
-        as a `dict`."""
-        if broker_connection is None:
-            broker_connection = self.app.broker_connection()
-        info = broker_connection.info()
-        port = info["port"]
-        if port:
-            info["port"] = ":%s" % (port, )
-        vhost = info["virtual_host"]
-        if not vhost.startswith("/"):
-            info["virtual_host"] = "/" + vhost
-        return info
-
-    def format_broker_info(self, info=None):
-        """Get message broker connection info string for log dumps."""
-        return BROKER_FORMAT % self.get_broker_info()
-
     @cached_property
     @cached_property
     def queues(self):
     def queues(self):
         """Queue name⇒ declaration mapping."""
         """Queue name⇒ declaration mapping."""
@@ -360,6 +332,6 @@ class AMQP(object):
 
 
     @property
     @property
     def routes(self):
     def routes(self):
-        if self._routes is None:
+        if self._rtable is None:
             self.flush_routes()
             self.flush_routes()
-        return self._routes
+        return self._rtable

+ 3 - 7
celery/app/base.py

@@ -15,7 +15,7 @@ from datetime import timedelta
 
 
 from celery.app.defaults import DEFAULTS
 from celery.app.defaults import DEFAULTS
 from celery.datastructures import ConfigurationView
 from celery.datastructures import ConfigurationView
-from celery.utils import instantiate, cached_property, maybe_promise
+from celery.utils import cached_property, instantiate, lpmerge, maybe_promise
 from celery.utils.functional import wraps
 from celery.utils.functional import wraps
 
 
 
 
@@ -224,14 +224,10 @@ class BaseApp(object):
                 return value
                 return value
         return self.conf.get(default_key)
         return self.conf.get(default_key)
 
 
-    def merge(self, a, b):
+    def merge(self, l, r):
         """Like `dict(a, **b)` except it will keep values from `a`
         """Like `dict(a, **b)` except it will keep values from `a`
         if the value in `b` is :const:`None`."""
         if the value in `b` is :const:`None`."""
-        b = dict(b)
-        for key, value in a.items():
-            if b.get(key) is None:
-                b[key] = value
-        return b
+        return lpmerge(l, r)
 
 
     def _get_backend(self):
     def _get_backend(self):
         from celery.backends import get_backend_cls
         from celery.backends import get_backend_cls

+ 8 - 9
celery/app/defaults.py

@@ -4,22 +4,20 @@ DEFAULT_PROCESS_LOG_FMT = """
     [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
     [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
 """.strip()
 """.strip()
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
-DEFAULT_TASK_LOG_FMT = " ".join("""
-    [%(asctime)s: %(levelname)s/%(processName)s]
-    [%(task_name)s(%(task_id)s)] %(message)s
-""".strip().split())
+DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
+[%(task_name)s(%(task_id)s)] %(message)s"""
 
 
 
 
-def str_to_bool(s, table={"false": False, "no": False, "0": False,
-                          "true":  True, "yes": True,  "1": True}):
+def str_to_bool(term, table={"false": False, "no": False, "0": False,
+                             "true":  True, "yes": True,  "1": True}):
     try:
     try:
-        return table[s.lower()]
+        return table[term.lower()]
     except KeyError:
     except KeyError:
-        raise TypeError("%r can not be converted to type bool" % (s, ))
+        raise TypeError("%r can not be converted to type bool" % (term, ))
 
 
 
 
 class Option(object):
 class Option(object):
-    typemap = dict(string=str, int=int, float=float,
+    typemap = dict(string=str, int=int, float=float, any=lambda v: v,
                    bool=str_to_bool, dict=dict, tuple=tuple)
                    bool=str_to_bool, dict=dict, tuple=tuple)
 
 
     def __init__(self, default=None, *args, **kwargs):
     def __init__(self, default=None, *args, **kwargs):
@@ -75,6 +73,7 @@ NAMESPACES = {
         "RESULT_EXCHANGE_TYPE": Option("direct"),
         "RESULT_EXCHANGE_TYPE": Option("direct"),
         "RESULT_SERIALIZER": Option("pickle"),
         "RESULT_SERIALIZER": Option("pickle"),
         "RESULT_PERSISTENT": Option(False, type="bool"),
         "RESULT_PERSISTENT": Option(False, type="bool"),
+        "ROUTES": Option(None, type="any"),
         "SEND_EVENTS": Option(False, type="bool"),
         "SEND_EVENTS": Option(False, type="bool"),
         "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
         "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
         "SEND_TASK_SENT_EVENT": Option(False, type="bool"),
         "SEND_TASK_SENT_EVENT": Option(False, type="bool"),

+ 1 - 1
celery/apps/beat.py

@@ -99,7 +99,7 @@ class Beat(object):
     def startup_info(self, beat):
     def startup_info(self, beat):
         scheduler = beat.get_scheduler(lazy=True)
         scheduler = beat.get_scheduler(lazy=True)
         return STARTUP_INFO_FMT % {
         return STARTUP_INFO_FMT % {
-            "conninfo": self.app.amqp.format_broker_info(),
+            "conninfo": self.app.broker_connection().as_uri(),
             "logfile": self.logfile or "[stderr]",
             "logfile": self.logfile or "[stderr]",
             "loglevel": LOG_LEVELS[self.loglevel],
             "loglevel": LOG_LEVELS[self.loglevel],
             "loader": get_full_cls_name(self.app.loader.__class__),
             "loader": get_full_cls_name(self.app.loader.__class__),

+ 1 - 1
celery/apps/worker.py

@@ -179,7 +179,7 @@ class Worker(object):
             tasklist = self.tasklist(include_builtins=include_builtins)
             tasklist = self.tasklist(include_builtins=include_builtins)
 
 
         return STARTUP_INFO_FMT % {
         return STARTUP_INFO_FMT % {
-            "conninfo": self.app.amqp.format_broker_info(),
+            "conninfo": self.app.broker_connection().as_uri(),
             "queues": self.queues.format(indent=8),
             "queues": self.queues.format(indent=8),
             "concurrency": self.concurrency,
             "concurrency": self.concurrency,
             "loglevel": LOG_LEVELS[self.loglevel],
             "loglevel": LOG_LEVELS[self.loglevel],

+ 1 - 2
celery/bin/camqadm.py

@@ -340,9 +340,8 @@ class AMQPAdmin(object):
     def connect(self, conn=None):
     def connect(self, conn=None):
         if conn:
         if conn:
             conn.close()
             conn.close()
-        self.say("-> connecting to %s." % (
-                    self.app.amqp.format_broker_info(), ))
         conn = self.app.broker_connection()
         conn = self.app.broker_connection()
+        self.say("-> connecting to %s." % conn.as_uri())
         conn.connect()
         conn.connect()
         self.say("-> connected.")
         self.say("-> connected.")
         return conn
         return conn

+ 3 - 3
celery/bin/celeryctl.py

@@ -225,10 +225,10 @@ class inspect(Command):
         if destination and isinstance(destination, basestring):
         if destination and isinstance(destination, basestring):
             destination = map(str.strip, destination.split(","))
             destination = map(str.strip, destination.split(","))
 
 
-        def on_reply(message_data):
+        def on_reply(body):
             c = self.colored
             c = self.colored
-            node = message_data.keys()[0]
-            reply = message_data[node]
+            node = body.keys()[0]
+            reply = body[node]
             status, preply = self.prettify(reply)
             status, preply = self.prettify(reply)
             self.say("->", c.cyan(node, ": ") + status, indent(preply))
             self.say("->", c.cyan(node, ": ") + status, indent(preply))
 
 

+ 8 - 14
celery/datastructures.py

@@ -144,8 +144,8 @@ class ConfigurationView(AttributeDictMixin):
 class ExceptionInfo(object):
 class ExceptionInfo(object):
     """Exception wrapping an exception and its traceback.
     """Exception wrapping an exception and its traceback.
 
 
-    :param exc_info: The exception tuple info as returned by
-        :func:`traceback.format_exception`.
+    :param exc_info: The exception info tuple as returned by
+        :func:`sys.exc_info`.
 
 
     """
     """
 
 
@@ -156,7 +156,7 @@ class ExceptionInfo(object):
     traceback = None
     traceback = None
 
 
     def __init__(self, exc_info):
     def __init__(self, exc_info):
-        type_, exception, tb = exc_info
+        _, exception, _ = exc_info
         self.exception = exception
         self.exception = exception
         self.traceback = ''.join(traceback.format_exception(*exc_info))
         self.traceback = ''.join(traceback.format_exception(*exc_info))
 
 
@@ -164,10 +164,7 @@ class ExceptionInfo(object):
         return self.traceback
         return self.traceback
 
 
     def __repr__(self):
     def __repr__(self):
-        return "<%s.%s: %s>" % (
-                self.__class__.__module__,
-                self.__class__.__name__,
-                str(self.exception))
+        return "<ExceptionInfo: %r>" % (self.exception, )
 
 
 
 
 def consume_queue(queue):
 def consume_queue(queue):
@@ -176,7 +173,7 @@ def consume_queue(queue):
 
 
     The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
     The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
 
 
-    Example
+    *Examples*
 
 
         >>> q = Queue()
         >>> q = Queue()
         >>> map(q.put, range(4))
         >>> map(q.put, range(4))
@@ -251,7 +248,7 @@ class SharedCounter(object):
         return self._update_value()
         return self._update_value()
 
 
     def __repr__(self):
     def __repr__(self):
-        return "<SharedCounter: int(%s)>" % str(int(self))
+        return repr(int(self))
 
 
 
 
 class LimitedSet(object):
 class LimitedSet(object):
@@ -293,7 +290,7 @@ class LimitedSet(object):
                 if not self.expires or time.time() > when + self.expires:
                 if not self.expires or time.time() > when + self.expires:
                     try:
                     try:
                         self.pop_value(value)
                         self.pop_value(value)
-                    except TypeError:                   # pragma: no cover
+                    except TypeError:  # pragma: no cover
                         continue
                         continue
             break
             break
 
 
@@ -354,10 +351,7 @@ class TokenBucket(object):
 
 
     .. admonition:: Thread safety
     .. admonition:: Thread safety
 
 
-        This implementation is not thread safe.
-
-    :param fill_rate: Refill rate in tokens/second.
-    :keyword capacity: Max number of tokens.  Default is 1.
+        This implementation may not be thread safe.
 
 
     """
     """
 
 

+ 4 - 4
celery/decorators.py

@@ -17,7 +17,7 @@ import warnings
 from celery import task as _task
 from celery import task as _task
 
 
 
 
-DEPRECATION_TEXT = """\
+warnings.warn("""
 The `celery.decorators` module and the magic keyword arguments
 The `celery.decorators` module and the magic keyword arguments
 are pending deprecation and will be deprecated in 2.4, then removed
 are pending deprecation and will be deprecated in 2.4, then removed
 in 3.0.
 in 3.0.
@@ -25,16 +25,16 @@ in 3.0.
 `task.request` should be used instead of magic keyword arguments,
 `task.request` should be used instead of magic keyword arguments,
 and `celery.task.task` used instead of `celery.decorators.task`.
 and `celery.task.task` used instead of `celery.decorators.task`.
 
 
-"""
+See the 2.2 Changelog for more information.
+
+""")
 
 
 
 
 def task(*args, **kwargs):  # ✞
 def task(*args, **kwargs):  # ✞
-    warnings.warn(PendingDeprecationWarning(DEPRECATION_TEXT))
     kwargs.setdefault("accept_magic_kwargs", True)
     kwargs.setdefault("accept_magic_kwargs", True)
     return _task.task(*args, **kwargs)
     return _task.task(*args, **kwargs)
 
 
 
 
 def periodic_task(*args, **kwargs):  # ✞
 def periodic_task(*args, **kwargs):  # ✞
-    warnings.warn(PendingDeprecationWarning(DEPRECATION_TEXT))
     kwargs.setdefault("accept_magic_kwargs", True)
     kwargs.setdefault("accept_magic_kwargs", True)
     return _task.periodic_task(*args, **kwargs)
     return _task.periodic_task(*args, **kwargs)

+ 3 - 3
celery/events/__init__.py

@@ -205,9 +205,9 @@ class EventReceiver(object):
             except socket.error:
             except socket.error:
                 pass
                 pass
 
 
-    def _receive(self, message_data, message):
-        type = message_data.pop("type").lower()
-        self.process(type, create_event(type, message_data))
+    def _receive(self, body, message):
+        type = body.pop("type").lower()
+        self.process(type, create_event(type, body))
 
 
 
 
 class Events(object):
 class Events(object):

+ 6 - 23
celery/exceptions.py

@@ -1,65 +1,50 @@
+UNREGISTERED_FMT = """\
+Task of kind %s is not registered, please make sure it's imported.\
 """
 """
 
 
-Common Exceptions
-
-"""
-
-UNREGISTERED_FMT = """
-Task of kind %s is not registered, please make sure it's imported.
-""".strip()
-
 
 
 class SystemTerminate(SystemExit):
 class SystemTerminate(SystemExit):
-    pass
+    """Signals that the worker should terminate."""
 
 
 
 
 class QueueNotFound(KeyError):
 class QueueNotFound(KeyError):
     """Task routed to a queue not in CELERY_QUEUES."""
     """Task routed to a queue not in CELERY_QUEUES."""
-    pass
 
 
 
 
 class TimeLimitExceeded(Exception):
 class TimeLimitExceeded(Exception):
     """The time limit has been exceeded and the job has been terminated."""
     """The time limit has been exceeded and the job has been terminated."""
-    pass
 
 
 
 
 class SoftTimeLimitExceeded(Exception):
 class SoftTimeLimitExceeded(Exception):
     """The soft time limit has been exceeded. This exception is raised
     """The soft time limit has been exceeded. This exception is raised
     to give the task a chance to clean up."""
     to give the task a chance to clean up."""
-    pass
 
 
 
 
 class WorkerLostError(Exception):
 class WorkerLostError(Exception):
     """The worker processing a job has exited prematurely."""
     """The worker processing a job has exited prematurely."""
-    pass
 
 
 
 
 class ImproperlyConfigured(Exception):
 class ImproperlyConfigured(Exception):
     """Celery is somehow improperly configured."""
     """Celery is somehow improperly configured."""
-    pass
 
 
 
 
 class NotRegistered(KeyError):
 class NotRegistered(KeyError):
     """The task is not registered."""
     """The task is not registered."""
 
 
-    def __init__(self, message, *args, **kwargs):
-        message = UNREGISTERED_FMT % str(message)
-        KeyError.__init__(self, message, *args, **kwargs)
+    def __repr__(self):
+        return UNREGISTERED_FMT % str(self)
 
 
 
 
 class AlreadyRegistered(Exception):
 class AlreadyRegistered(Exception):
     """The task is already registered."""
     """The task is already registered."""
-    pass
 
 
 
 
 class TimeoutError(Exception):
 class TimeoutError(Exception):
     """The operation timed out."""
     """The operation timed out."""
-    pass
 
 
 
 
 class MaxRetriesExceededError(Exception):
 class MaxRetriesExceededError(Exception):
     """The tasks max restart limit has been exceeded."""
     """The tasks max restart limit has been exceeded."""
-    pass
 
 
 
 
 class RetryTaskError(Exception):
 class RetryTaskError(Exception):
@@ -67,13 +52,11 @@ class RetryTaskError(Exception):
 
 
     def __init__(self, message, exc, *args, **kwargs):
     def __init__(self, message, exc, *args, **kwargs):
         self.exc = exc
         self.exc = exc
-        Exception.__init__(self, message, exc, *args,
-                           **kwargs)
+        Exception.__init__(self, message, exc, *args, **kwargs)
 
 
 
 
 class TaskRevokedError(Exception):
 class TaskRevokedError(Exception):
     """The task has been revoked, so no result available."""
     """The task has been revoked, so no result available."""
-    pass
 
 
 
 
 class NotConfigured(UserWarning):
 class NotConfigured(UserWarning):

+ 8 - 17
celery/execute/__init__.py

@@ -1,25 +1,16 @@
-from celery.app import app_or_default
-from celery.registry import tasks
+from celery import current_app
+from celery.utils import deprecated
 
 
+send_task = current_app.send_task
 
 
+
+@deprecated(removal="2.3", alternative="Use task.apply_async() instead.")
 def apply_async(task, *args, **kwargs):
 def apply_async(task, *args, **kwargs):
-    """Deprecated. See :meth:`celery.task.base.BaseTask.apply_async`."""
-    # FIXME Deprecate!
+    """*[Deprecated]* Use `task.apply_async()`"""
     return task.apply_async(*args, **kwargs)
     return task.apply_async(*args, **kwargs)
 
 
 
 
+@deprecated(removal="2.3", alternative="Use task.apply() instead.")
 def apply(task, *args, **kwargs):
 def apply(task, *args, **kwargs):
-    """Deprecated. See :meth:`celery.task.base.BaseTask.apply`."""
-    # FIXME Deprecate!
+    """*[Deprecated]* Use `task.apply()`"""
     return task.apply(*args, **kwargs)
     return task.apply(*args, **kwargs)
-
-
-def send_task(*args, **kwargs):
-    """Deprecated. See :meth:`celery.app.App.send_task`."""
-    # FIXME Deprecate!
-    return app_or_default().send_task(*args, **kwargs)
-
-
-def delay_task(task_name, *args, **kwargs):
-    # FIXME Deprecate!
-    return tasks[task_name].apply_async(args, kwargs)

+ 26 - 32
celery/log.py

@@ -3,30 +3,24 @@ import logging
 import threading
 import threading
 import sys
 import sys
 import traceback
 import traceback
-import types
 
 
 from multiprocessing import current_process
 from multiprocessing import current_process
 from multiprocessing import util as mputil
 from multiprocessing import util as mputil
 
 
 from celery import signals
 from celery import signals
-from celery.app import app_or_default
+from celery import current_app
 from celery.utils import LOG_LEVELS, isatty
 from celery.utils import LOG_LEVELS, isatty
 from celery.utils.compat import LoggerAdapter
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger
 from celery.utils.patch import ensure_process_aware_logger
 from celery.utils.term import colored
 from celery.utils.term import colored
 
 
-# The logging subsystem is only configured once per process.
-# setup_logging_subsystem sets this flag, and subsequent calls
-# will do nothing.
-_setup = False
-
-COLORS = {"DEBUG": "blue",
-          "WARNING": "yellow",
-          "ERROR": "red",
-          "CRITICAL": "magenta"}
 
 
 
 
 class ColorFormatter(logging.Formatter):
 class ColorFormatter(logging.Formatter):
+    #: Loglevel -> Color mapping.
+    COLORS = colored().names
+    colors = {"DEBUG": COLORS["blue"], "WARNING": COLORS["yellow"],
+              "ERROR": COLORS["red"],  "CRITICAL": COLORS["magenta"]}
 
 
     def __init__(self, msg, use_color=True):
     def __init__(self, msg, use_color=True):
         logging.Formatter.__init__(self, msg)
         logging.Formatter.__init__(self, msg)
@@ -34,16 +28,16 @@ class ColorFormatter(logging.Formatter):
 
 
     def formatException(self, ei):
     def formatException(self, ei):
         r = logging.Formatter.formatException(self, ei)
         r = logging.Formatter.formatException(self, ei)
-        if type(r) in [types.StringType]:
-            r = r.decode("utf-8", "replace")    # Convert to unicode
+        if isinstance(r, str):
+            return r.decode("utf-8", "replace")    # Convert to unicode
         return r
         return r
 
 
     def format(self, record):
     def format(self, record):
         levelname = record.levelname
         levelname = record.levelname
+        color = self.colors.get(levelname)
 
 
-        if self.use_color and levelname in COLORS:
-            record.msg = unicode(colored().names[COLORS[levelname]](
-                            record.msg))
+        if self.use_color and color:
+            record.msg = unicode(color(record.msg))
 
 
         # Very ugly, but have to make sure processName is supported
         # Very ugly, but have to make sure processName is supported
         # by foreign logger instances.
         # by foreign logger instances.
@@ -51,12 +45,15 @@ class ColorFormatter(logging.Formatter):
         if "processName" not in record.__dict__:
         if "processName" not in record.__dict__:
             record.__dict__["processName"] = current_process()._name
             record.__dict__["processName"] = current_process()._name
         t = logging.Formatter.format(self, record)
         t = logging.Formatter.format(self, record)
-        if type(t) in [types.UnicodeType]:
-            t = t.encode('utf-8', 'replace')
+        if isinstance(t, unicode):
+            return t.encode("utf-8", "replace")
         return t
         return t
 
 
 
 
 class Logging(object):
 class Logging(object):
+    #: The logging subsystem is only configured once per process.
+    #: setup_logging_subsystem sets this flag, and subsequent calls
+    #: will do nothing.
     _setup = False
     _setup = False
 
 
     def __init__(self, app):
     def __init__(self, app):
@@ -86,14 +83,13 @@ class Logging(object):
 
 
     def setup_logging_subsystem(self, loglevel=None, logfile=None,
     def setup_logging_subsystem(self, loglevel=None, logfile=None,
             format=None, colorize=None, **kwargs):
             format=None, colorize=None, **kwargs):
+        if Logging._setup:
+            return
         loglevel = loglevel or self.loglevel
         loglevel = loglevel or self.loglevel
         format = format or self.format
         format = format or self.format
         if colorize is None:
         if colorize is None:
             colorize = self.supports_color(logfile)
             colorize = self.supports_color(logfile)
 
 
-        if self.__class__._setup:
-            return
-
         try:
         try:
             mputil._logger = None
             mputil._logger = None
         except AttributeError:
         except AttributeError:
@@ -113,10 +109,9 @@ class Logging(object):
 
 
             mp = mputil.get_logger()
             mp = mputil.get_logger()
             for logger in (root, mp):
             for logger in (root, mp):
-                self._setup_logger(logger, logfile,
-                                   format, colorize, **kwargs)
+                self._setup_logger(logger, logfile, format, colorize, **kwargs)
                 logger.setLevel(loglevel)
                 logger.setLevel(loglevel)
-        self.__class__._setup = True
+        Logging._setup = True
         return receivers
         return receivers
 
 
     def _detect_handler(self, logfile=None):
     def _detect_handler(self, logfile=None):
@@ -202,7 +197,7 @@ class Logging(object):
     def _setup_logger(self, logger, logfile, format, colorize,
     def _setup_logger(self, logger, logfile, format, colorize,
             formatter=ColorFormatter, **kwargs):
             formatter=ColorFormatter, **kwargs):
 
 
-        if logger.handlers:                 # Logger already configured
+        if logger.handlers:  # Logger already configured
             return logger
             return logger
 
 
         handler = self._detect_handler(logfile)
         handler = self._detect_handler(logfile)
@@ -211,13 +206,12 @@ class Logging(object):
         return logger
         return logger
 
 
 
 
-_default_logging = Logging(app_or_default())
-setup_logging_subsystem = _default_logging.setup_logging_subsystem
-get_default_logger = _default_logging.get_default_logger
-setup_logger = _default_logging.setup_logger
-setup_task_logger = _default_logging.setup_task_logger
-get_task_logger = _default_logging.get_task_logger
-redirect_stdouts_to_logger = _default_logging.redirect_stdouts_to_logger
+setup_logging_subsystem = current_app.log.setup_logging_subsystem
+get_default_logger = current_app.log.get_default_logger
+setup_logger = current_app.log.setup_logger
+setup_task_logger = current_app.log.setup_task_logger
+get_task_logger = current_app.log.get_task_logger
+redirect_stdouts_to_logger = current_app.log.redirect_stdouts_to_logger
 
 
 
 
 class LoggingProxy(object):
 class LoggingProxy(object):

+ 8 - 38
celery/messaging.py

@@ -1,38 +1,8 @@
-"""
-
-Sending and Receiving Messages
-
-"""
-
-from celery.app import app_or_default
-
-default_app = app_or_default()
-TaskPublisher = default_app.amqp.TaskPublisher
-ConsumerSet = default_app.amqp.ConsumerSet
-TaskConsumer = default_app.amqp.TaskConsumer
-
-
-def establish_connection(**kwargs):
-    """Establish a connection to the message broker."""
-    # FIXME: # Deprecate
-    app = app_or_default(kwargs.pop("app", None))
-    return app.broker_connection(**kwargs)
-
-
-def with_connection(fun):
-    """Decorator for providing default message broker connection for functions
-    supporting the `connection` and `connect_timeout` keyword
-    arguments."""
-    # FIXME: Deprecate!
-    return default_app.with_default_connection(fun)
-
-
-def get_consumer_set(connection, queues=None, **options):
-    """Get the :class:`kombu.messaging.Consumer` for a queue
-    configuration.
-
-    Defaults to the queues in :setting:`CELERY_QUEUES`.
-
-    """
-    # FIXME: Deprecate!
-    return default_app.amqp.get_task_consumer(connection, queues, **options)
+from celery import current_app
+
+TaskPublisher = current_app.amqp.TaskPublisher
+ConsumerSet = current_app.amqp.ConsumerSet
+TaskConsumer = current_app.amqp.TaskConsumer
+establish_connection = current_app.broker_connection
+with_connection = current_app.with_default_connection
+get_consumer_set = current_app.amqp.get_task_consumer

+ 8 - 16
celery/registry.py

@@ -28,8 +28,7 @@ class TaskRegistry(UserDict):
 
 
         """
         """
         task = inspect.isclass(task) and task() or task
         task = inspect.isclass(task) and task() or task
-        name = task.name
-        self.data[name] = task
+        self.data[task.name] = task
 
 
     def unregister(self, name):
     def unregister(self, name):
         """Unregister task by name.
         """Unregister task by name.
@@ -46,34 +45,27 @@ class TaskRegistry(UserDict):
             name = name.name
             name = name.name
         except AttributeError:
         except AttributeError:
             pass
             pass
-
         self.pop(name)
         self.pop(name)
 
 
     def filter_types(self, type):
     def filter_types(self, type):
         """Return all tasks of a specific type."""
         """Return all tasks of a specific type."""
-        return dict((task_name, task)
-                        for task_name, task in self.data.items()
-                            if task.type == type)
+        return dict((task_name, task) for task_name, task in self.data.items()
+                                        if task.type == type)
 
 
     def __getitem__(self, key):
     def __getitem__(self, key):
         try:
         try:
             return UserDict.__getitem__(self, key)
             return UserDict.__getitem__(self, key)
-        except KeyError, exc:
-            raise self.NotRegistered(str(exc))
+        except KeyError:
+            raise self.NotRegistered(key)
 
 
     def pop(self, key, *args):
     def pop(self, key, *args):
         try:
         try:
             return UserDict.pop(self, key, *args)
             return UserDict.pop(self, key, *args)
-        except KeyError, exc:
-            raise self.NotRegistered(str(exc))
-
-
-"""
-.. data:: tasks
+        except KeyError:
+            raise self.NotRegistered(key)
 
 
-    The global task registry.
 
 
-"""
+#: Global task registry.
 tasks = TaskRegistry()
 tasks = TaskRegistry()
 
 
 
 

+ 16 - 24
celery/routes.py

@@ -1,15 +1,9 @@
 from celery.exceptions import QueueNotFound
 from celery.exceptions import QueueNotFound
-from celery.utils import instantiate, firstmethod, mpromise
+from celery.utils import firstmethod, instantiate, lpmerge, mpromise
 
 
 _first_route = firstmethod("route_for_task")
 _first_route = firstmethod("route_for_task")
 
 
 
 
-def merge(a, b):
-    """Like `dict(a, **b)` except it will keep values from `a`, if the value
-    in `b` is :const:`None`."""
-    return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
-
-
 class MapRoute(object):
 class MapRoute(object):
     """Creates a router out of a :class:`dict`."""
     """Creates a router out of a :class:`dict`."""
 
 
@@ -27,47 +21,43 @@ class Router(object):
     def __init__(self, routes=None, queues=None, create_missing=False,
     def __init__(self, routes=None, queues=None, create_missing=False,
             app=None):
             app=None):
         from celery.app import app_or_default
         from celery.app import app_or_default
+        self.app = app_or_default(app)
         if queues is None:
         if queues is None:
             queues = {}
             queues = {}
         if routes is None:
         if routes is None:
             routes = []
             routes = []
-        self.app = app_or_default(app)
         self.queues = queues
         self.queues = queues
         self.routes = routes
         self.routes = routes
         self.create_missing = create_missing
         self.create_missing = create_missing
 
 
     def route(self, options, task, args=(), kwargs={}):
     def route(self, options, task, args=(), kwargs={}):
-        # Expand "queue" keys in options.
-        options = self.expand_destination(options)
+        options = self.expand_destination(options)  # expands 'queue'
         if self.routes:
         if self.routes:
             route = self.lookup_route(task, args, kwargs)
             route = self.lookup_route(task, args, kwargs)
-            if route:
-                # Also expand "queue" keys in route.
-                return merge(self.expand_destination(route), options)
+            if route:  # expands 'queue' in route.
+                return lpmerge(self.expand_destination(route), options)
         return options
         return options
 
 
     def expand_destination(self, route):
     def expand_destination(self, route):
-        # The route can simply be a queue name,
-        # this is convenient for direct exchanges.
+        # Route can be a queue name: convenient for direct exchanges.
         if isinstance(route, basestring):
         if isinstance(route, basestring):
             queue, route = route, {}
             queue, route = route, {}
         else:
         else:
-            # For topic exchanges you can use the defaults from a queue
-            # definition, and override e.g. just the routing_key.
+            # can use defaults from configured queue, but override specific
+            # things (like the routing_key): great for topic exchanges.
             queue = route.pop("queue", None)
             queue = route.pop("queue", None)
 
 
-        if queue:
+        if queue:  # expand config from configured queue.
             try:
             try:
                 dest = dict(self.queues[queue])
                 dest = dict(self.queues[queue])
             except KeyError:
             except KeyError:
-                if self.create_missing:
-                    dest = self.app.amqp.queues.add(queue, queue, queue)
-                else:
+                if not self.create_missing:
                     raise QueueNotFound(
                     raise QueueNotFound(
-                        "Queue '%s' is not defined in CELERY_QUEUES" % queue)
+                        "Queue %r is not defined in CELERY_QUEUES" % queue)
+                dest = self.app.amqp.queues.add(queue, queue, queue)
+            # routing_key and binding_key are synonyms.
             dest.setdefault("routing_key", dest.get("binding_key"))
             dest.setdefault("routing_key", dest.get("binding_key"))
-            return merge(dest, route)
-
+            return lpmerge(dest, route)
         return route
         return route
 
 
     def lookup_route(self, task, args=None, kwargs=None):
     def lookup_route(self, task, args=None, kwargs=None):
@@ -84,6 +74,8 @@ def prepare(routes):
             return mpromise(instantiate, route)
             return mpromise(instantiate, route)
         return route
         return route
 
 
+    if routes is None:
+        return ()
     if not isinstance(routes, (list, tuple)):
     if not isinstance(routes, (list, tuple)):
         routes = (routes, )
         routes = (routes, )
     return map(expand_route, routes)
     return map(expand_route, routes)

+ 26 - 59
celery/task/base.py

@@ -1,4 +1,4 @@
-# -*- coding: utf-8 -*-
+# -*- coding: utf-8 -*-"
 import sys
 import sys
 import threading
 import threading
 import warnings
 import warnings
@@ -10,32 +10,19 @@ from celery.execute.trace import TaskTrace
 from celery.registry import tasks, _unpickle_task
 from celery.registry import tasks, _unpickle_task
 from celery.result import EagerResult
 from celery.result import EagerResult
 from celery.schedules import maybe_schedule
 from celery.schedules import maybe_schedule
-from celery.utils import mattrgetter, gen_unique_id, fun_takes_kwargs
+from celery.utils import deprecated, mattrgetter, gen_unique_id, \
+                         fun_takes_kwargs
 from celery.utils.functional import wraps
 from celery.utils.functional import wraps
 from celery.utils.timeutils import timedelta_seconds
 from celery.utils.timeutils import timedelta_seconds
 
 
 from celery.task import sets
 from celery.task import sets
 
 
-IMPORT_DEPRECATION_TEXT = """
-Importing %(symbol)s from `celery.task.base` is deprecated,
-and is scheduled for removal in 2.4.
-
-Please use `from celery.task import %(symbol)s` instead.
-
-"""
-
-
-def __deprecated_import(fun):
-
-    @wraps(fun)
-    def _inner(*args, **kwargs):
-        warnings.warn(DeprecationWarning(
-            IMPORT_DEPRECATION_TEXT % {"symbol": fun.__name__, }))
-        return fun(*args, **kwargs)
-
-    return _inner
-TaskSet = __deprecated_import(sets.TaskSet)  # ✟
-subtask = __deprecated_import(sets.subtask)  # ✟
+TaskSet = deprecated("Importing TaskSet from celery.task.base",
+                     alternative="Use celery.task.TaskSet instead.",
+                     removal="2.4")(sets.TaskSet)
+subtask = deprecated("Importing subtask from celery.task.base",
+                     alternative="Use celery.task.subtask instead.",
+                     removal="2.4")(sets.subtask)
 
 
 extract_exec_options = mattrgetter("queue", "routing_key",
 extract_exec_options = mattrgetter("queue", "routing_key",
                                    "exchange", "immediate",
                                    "exchange", "immediate",
@@ -249,39 +236,13 @@ class BaseTask(object):
         return (_unpickle_task, (self.name, ), None)
         return (_unpickle_task, (self.name, ), None)
 
 
     def run(self, *args, **kwargs):
     def run(self, *args, **kwargs):
-        """The body of the task executed by the worker.
-
-        The following standard keyword arguments are reserved and is
-        automatically passed by the worker if the function/method
-        supports them:
-
-            * `task_id`
-            * `task_name`
-            * `task_retries`
-            * `task_is_eager`
-            * `logfile`
-            * `loglevel`
-            * `delivery_info`
-
-        To take these default arguments, the task can either list the ones
-        it wants explicitly or just take an arbitrary list of keyword
-        arguments (\*\*kwargs).
-
-        Magic keyword arguments can be disabled using the
-        :attr:`accept_magic_kwargs` flag.  The information can then
-        be found in the :attr:`request` attribute.
-
-        """
+        """The body of the task executed by workers."""
         raise NotImplementedError("Tasks must define the run method.")
         raise NotImplementedError("Tasks must define the run method.")
 
 
     @classmethod
     @classmethod
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
             **kwargs):
             **kwargs):
-        """Get task-aware logger object.
-
-        See :func:`celery.log.setup_task_logger`.
-
-        """
+        """Get task-aware logger object."""
         if loglevel is None:
         if loglevel is None:
             loglevel = self.request.loglevel
             loglevel = self.request.loglevel
         if logfile is None:
         if logfile is None:
@@ -303,13 +264,17 @@ class BaseTask(object):
 
 
         :rtype :class:`~celery.app.amqp.TaskPublisher`:
         :rtype :class:`~celery.app.amqp.TaskPublisher`:
 
 
-        Please be sure to close the AMQP connection after you're done
-        with this object.  Example::
+        Please be sure to close the connection after use::
 
 
             >>> publisher = self.get_publisher()
             >>> publisher = self.get_publisher()
             >>> # ... do something with publisher
             >>> # ... do something with publisher
             >>> publisher.connection.close()
             >>> publisher.connection.close()
 
 
+        The connection can also be used as a context::
+
+            >>> with self.get_publisher() as publisher:
+            ...     # ... do something with publisher
+
         """
         """
         if exchange is None:
         if exchange is None:
             exchange = self.exchange
             exchange = self.exchange
@@ -329,8 +294,9 @@ class BaseTask(object):
 
 
         .. warning::
         .. warning::
 
 
-            Please be sure to close the AMQP connection when you're done
-            with this object.  Example::
+            If you don't specify a connection, one will automatically
+            be established for you, in that case you need to close this
+            connection after use::
 
 
                 >>> consumer = self.get_consumer()
                 >>> consumer = self.get_consumer()
                 >>> # do something with consumer
                 >>> # do something with consumer
@@ -345,8 +311,9 @@ class BaseTask(object):
 
 
     @classmethod
     @classmethod
     def delay(self, *args, **kwargs):
     def delay(self, *args, **kwargs):
-        """Shortcut to :meth:`apply_async` giving star arguments, but without
-        options.
+        """Star argument version of :meth:`apply_async`.
+
+        Does not support the extra options enabled by :meth:`apply_async`.
 
 
         :param \*args: positional arguments passed on to the task.
         :param \*args: positional arguments passed on to the task.
         :param \*\*kwargs: keyword arguments passed on to the task.
         :param \*\*kwargs: keyword arguments passed on to the task.
@@ -361,7 +328,7 @@ class BaseTask(object):
             eta=None, task_id=None, publisher=None, connection=None,
             eta=None, task_id=None, publisher=None, connection=None,
             connect_timeout=None, router=None, expires=None, queues=None,
             connect_timeout=None, router=None, expires=None, queues=None,
             **options):
             **options):
-        """Run a task asynchronously by the celery daemon(s).
+        """Apply tasks asynchronously by sending a message.
 
 
         :keyword args: The positional arguments to pass on to the
         :keyword args: The positional arguments to pass on to the
                        task (a :class:`list` or :class:`tuple`).
                        task (a :class:`list` or :class:`tuple`).
@@ -767,7 +734,7 @@ class PeriodicTask(Task):
 
 
         *REQUIRED* Defines how often the task is run (its interval),
         *REQUIRED* Defines how often the task is run (its interval),
         it can be a :class:`~datetime.timedelta` object, a
         it can be a :class:`~datetime.timedelta` object, a
-        :class:`~celery.task.schedules.crontab` object or an integer
+        :class:`~celery.schedules.crontab` object or an integer
         specifying the time in seconds.
         specifying the time in seconds.
 
 
     .. attribute:: relative
     .. attribute:: relative
@@ -791,7 +758,7 @@ class PeriodicTask(Task):
         ...         logger.info("Execute every 30 seconds")
         ...         logger.info("Execute every 30 seconds")
 
 
         >>> from celery.task import PeriodicTask
         >>> from celery.task import PeriodicTask
-        >>> from celery.task.schedules import crontab
+        >>> from celery.schedules import crontab
 
 
         >>> class EveryMondayMorningTask(PeriodicTask):
         >>> class EveryMondayMorningTask(PeriodicTask):
         ...     run_every = crontab(hour=7, minute=30, day_of_week=1)
         ...     run_every = crontab(hour=7, minute=30, day_of_week=1)

+ 5 - 1
celery/task/schedules.py

@@ -1,3 +1,7 @@
+import warnings
 from celery.schedules import schedule, crontab_parser, crontab
 from celery.schedules import schedule, crontab_parser, crontab
 
 
-__all__ = ["schedule", "crontab_parser", "crontab"]
+warnings.warn(DeprecationWarning(
+    "celery.task.schedules is deprecated and renamed to celery.schedules"))
+
+

+ 7 - 5
celery/tests/__init__.py

@@ -55,12 +55,14 @@ def find_distribution_modules(name=__name__, file=__file__):
                     yield ".".join([package, filename])[:-3]
                     yield ".".join([package, filename])[:-3]
 
 
 
 
-def import_all_modules(name=__name__, file=__file__):
+def import_all_modules(name=__name__, file=__file__,
+        skip=["celery.decorators"]):
     for module in find_distribution_modules(name, file):
     for module in find_distribution_modules(name, file):
-        try:
-            import_module(module)
-        except ImportError:
-            pass
+        if module not in skip:
+            try:
+                import_module(module)
+            except ImportError:
+                pass
 
 
 
 
 if os.environ.get("COVER_ALL_MODULES") or "--with-coverage3" in sys.argv:
 if os.environ.get("COVER_ALL_MODULES") or "--with-coverage3" in sys.argv:

+ 5 - 5
celery/tests/test_app.py

@@ -146,15 +146,15 @@ class test_App(unittest.TestCase):
                                        "userid": "guest",
                                        "userid": "guest",
                                        "password": "guest",
                                        "password": "guest",
                                        "virtual_host": "/"},
                                        "virtual_host": "/"},
-                                      self.app.amqp.get_broker_info())
+                                      self.app.broker_connection().info())
         self.app.conf.BROKER_PORT = 1978
         self.app.conf.BROKER_PORT = 1978
         self.app.conf.BROKER_VHOST = "foo"
         self.app.conf.BROKER_VHOST = "foo"
-        self.assertDictContainsSubset({"port": ":1978",
-                                       "virtual_host": "/foo"},
-                                      self.app.amqp.get_broker_info())
+        self.assertDictContainsSubset({"port": 1978,
+                                       "virtual_host": "foo"},
+                                      self.app.broker_connection().info())
         conn = self.app.broker_connection(virtual_host="/value")
         conn = self.app.broker_connection(virtual_host="/value")
         self.assertDictContainsSubset({"virtual_host": "/value"},
         self.assertDictContainsSubset({"virtual_host": "/value"},
-                                      self.app.amqp.get_broker_info(conn))
+                                      conn.info())
 
 
     def test_send_task_sent_event(self):
     def test_send_task_sent_event(self):
         from celery.app import amqp
         from celery.app import amqp

+ 1 - 1
celery/tests/test_datastructures.py

@@ -133,7 +133,7 @@ class test_SharedCounter(unittest.TestCase):
         self.assertEqual(int(c), -10)
         self.assertEqual(int(c), -10)
 
 
     def test_repr(self):
     def test_repr(self):
-        self.assertIn("<SharedCounter:", repr(SharedCounter(10)))
+        self.assertIn("10", repr(SharedCounter(10)))
 
 
 
 
 class test_LimitedSet(unittest.TestCase):
 class test_LimitedSet(unittest.TestCase):

+ 8 - 9
celery/tests/test_decorators.py

@@ -1,6 +1,5 @@
 import warnings
 import warnings
 
 
-from celery import decorators
 from celery.task import base
 from celery.task import base
 
 
 from celery.tests.compat import catch_warnings
 from celery.tests.compat import catch_warnings
@@ -14,24 +13,24 @@ def add(x, y):
 
 
 class test_decorators(unittest.TestCase):
 class test_decorators(unittest.TestCase):
 
 
-    def assertCompatDecorator(self, decorator, type, **opts):
+    def setUp(self):
         warnings.resetwarnings()
         warnings.resetwarnings()
-
         def with_catch_warnings(log):
         def with_catch_warnings(log):
-            return decorator(**opts)(add), log[0].message
-
+            from celery import decorators
+            return decorators
         context = catch_warnings(record=True)
         context = catch_warnings(record=True)
-        task, w = execute_context(context, with_catch_warnings)
+        self.decorators = execute_context(context, with_catch_warnings)
 
 
+    def assertCompatDecorator(self, decorator, type, **opts):
+        task = decorator(**opts)(add)
         self.assertEqual(task(8, 8), 16)
         self.assertEqual(task(8, 8), 16)
         self.assertTrue(task.accept_magic_kwargs)
         self.assertTrue(task.accept_magic_kwargs)
         self.assertIsInstance(task, type)
         self.assertIsInstance(task, type)
-        self.assertIsInstance(w, PendingDeprecationWarning)
 
 
     def test_task(self):
     def test_task(self):
-        self.assertCompatDecorator(decorators.task, base.Task)
+        self.assertCompatDecorator(self.decorators.task, base.Task)
 
 
     def test_periodic_task(self):
     def test_periodic_task(self):
-        self.assertCompatDecorator(decorators.periodic_task,
+        self.assertCompatDecorator(self.decorators.periodic_task,
                                    base.PeriodicTask,
                                    base.PeriodicTask,
                                    run_every=1)
                                    run_every=1)

+ 1 - 1
celery/tests/test_task.py

@@ -10,7 +10,7 @@ from celery.task import task as task_dec
 from celery.exceptions import RetryTaskError
 from celery.exceptions import RetryTaskError
 from celery.execute import send_task
 from celery.execute import send_task
 from celery.result import EagerResult
 from celery.result import EagerResult
-from celery.task.schedules import crontab, crontab_parser
+from celery.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from celery.utils.functional import wraps
 from celery.utils.functional import wraps

+ 1 - 1
celery/tests/test_task_builtins.py

@@ -39,7 +39,7 @@ class test_deprecated(unittest.TestCase):
 
 
         for w in execute_context(catch_warnings(record=True), block):
         for w in execute_context(catch_warnings(record=True), block):
             self.assertIsInstance(w, DeprecationWarning)
             self.assertIsInstance(w, DeprecationWarning)
-            self.assertIn("Please use", w.args[0])
+            self.assertIn("is deprecated", w.args[0])
 
 
 
 
 class test_backend_cleanup(unittest.TestCase):
 class test_backend_cleanup(unittest.TestCase):

+ 0 - 3
celery/tests/test_utils_info.py

@@ -62,6 +62,3 @@ class TestInfo(unittest.TestCase):
         celery = Celery(set_as_current=False)
         celery = Celery(set_as_current=False)
         celery.amqp.queues = QUEUES
         celery.amqp.queues = QUEUES
         self.assertEqual(celery.amqp.queues.format(), QUEUE_FORMAT)
         self.assertEqual(celery.amqp.queues.format(), QUEUE_FORMAT)
-
-    def test_broker_info(self):
-        app_or_default().amqp.format_broker_info()

+ 2 - 2
celery/tests/test_worker.py

@@ -59,8 +59,8 @@ class MyKombuConsumer(MainConsumer):
 class MockNode(object):
 class MockNode(object):
     commands = []
     commands = []
 
 
-    def handle_message(self, message_data, message):
-        self.commands.append(message.pop("command", None))
+    def handle_message(self, body, message):
+        self.commands.append(body.pop("command", None))
 
 
 
 
 class MockEventDispatcher(object):
 class MockEventDispatcher(object):

+ 3 - 3
celery/tests/test_worker_job.py

@@ -335,9 +335,9 @@ class test_TaskRequest(unittest.TestCase):
             mytask.acks_late = False
             mytask.acks_late = False
 
 
     def test_from_message_invalid_kwargs(self):
     def test_from_message_invalid_kwargs(self):
-        message_data = dict(task="foo", id=1, args=(), kwargs="foo")
-        self.assertRaises(InvalidTaskError, TaskRequest.from_message, None,
-                message_data)
+        body = dict(task="foo", id=1, args=(), kwargs="foo")
+        self.assertRaises(InvalidTaskError,
+                          TaskRequest.from_message, None, body)
 
 
     def test_on_timeout(self):
     def test_on_timeout(self):
 
 

+ 39 - 1
celery/utils/__init__.py

@@ -7,6 +7,7 @@ import importlib
 import logging
 import logging
 import threading
 import threading
 import traceback
 import traceback
+import warnings
 
 
 from inspect import getargspec
 from inspect import getargspec
 from itertools import islice
 from itertools import islice
@@ -15,13 +16,50 @@ from pprint import pprint
 from kombu.utils import gen_unique_id, rpartition
 from kombu.utils import gen_unique_id, rpartition
 
 
 from celery.utils.compat import StringIO
 from celery.utils.compat import StringIO
-from celery.utils.functional import partial
+from celery.utils.functional import partial, wraps
 
 
 
 
 LOG_LEVELS = dict(logging._levelNames)
 LOG_LEVELS = dict(logging._levelNames)
 LOG_LEVELS["FATAL"] = logging.FATAL
 LOG_LEVELS["FATAL"] = logging.FATAL
 LOG_LEVELS[logging.FATAL] = "FATAL"
 LOG_LEVELS[logging.FATAL] = "FATAL"
 
 
+PENDING_DEPRECATION_FMT = """
+    %(description)s is scheduled for deprecation in \
+    version %(deprecation)s and removal in version v%(removal)s. \
+    %(alternative)s
+"""
+
+DEPRECATION_FMT = """
+    %(description)s is deprecated and scheduled for removal in
+    version %(removal)s. %(alternative)s
+"""
+
+
+def deprecated(description=None, deprecation=None, removal=None,
+        alternative=None):
+
+    def _inner(fun):
+
+        @wraps(fun)
+        def __inner(*args, **kwargs):
+            ctx = {"description": description or get_full_cls_name(fun),
+                   "deprecation": deprecation, "removal": removal,
+                   "alternative": alternative}
+            if deprecation is not None:
+                w = PendingDeprecationWarning(PENDING_DEPRECATION_FMT % ctx)
+            else:
+                w = DeprecationWarning(DEPRECATION_FMT % ctx)
+            warnings.warn(w)
+            return fun(*args, **kwargs)
+        return __inner
+    return _inner
+
+
+def lpmerge(L, R):
+    """Left precedent dictionary merge.  Keeps values from `l`, if the value
+    in `r` is :const:`None`."""
+    return dict(L, **dict((k, v) for k, v in R.iteritems() if v is not None))
+
 
 
 class promise(object):
 class promise(object):
     """A promise.
     """A promise.

+ 12 - 12
celery/worker/consumer.py

@@ -292,47 +292,47 @@ class Consumer(object):
             state.task_reserved(task)
             state.task_reserved(task)
             self.ready_queue.put(task)
             self.ready_queue.put(task)
 
 
-    def on_control(self, message, message_data):
+    def on_control(self, body, message):
         try:
         try:
-            self.pidbox_node.handle_message(message, message_data)
+            self.pidbox_node.handle_message(body, message)
         except KeyError, exc:
         except KeyError, exc:
             self.logger.error("No such control command: %s" % exc)
             self.logger.error("No such control command: %s" % exc)
         except Exception, exc:
         except Exception, exc:
             self.logger.error(
             self.logger.error(
                 "Error occurred while handling control command: %r\n%r" % (
                 "Error occurred while handling control command: %r\n%r" % (
-                    exc, traceback.format_exc()))
+                    exc, traceback.format_exc()), exc_info=sys.exc_info())
 
 
     def apply_eta_task(self, task):
     def apply_eta_task(self, task):
         state.task_reserved(task)
         state.task_reserved(task)
         self.ready_queue.put(task)
         self.ready_queue.put(task)
         self.qos.decrement_eventually()
         self.qos.decrement_eventually()
 
 
-    def receive_message(self, message_data, message):
+    def receive_message(self, body, message):
         """The callback called when a new message is received. """
         """The callback called when a new message is received. """
 
 
         # Handle task
         # Handle task
-        if message_data.get("task"):
+        if body.get("task"):
             def ack():
             def ack():
                 try:
                 try:
                     message.ack()
                     message.ack()
                 except self.connection_errors, exc:
                 except self.connection_errors, exc:
                     self.logger.critical(
                     self.logger.critical(
                             "Couldn't ack %r: message:%r reason:%r" % (
                             "Couldn't ack %r: message:%r reason:%r" % (
-                                message.delivery_tag, message_data, exc))
+                                message.delivery_tag, body, exc))
 
 
             try:
             try:
-                task = TaskRequest.from_message(message, message_data, ack,
+                task = TaskRequest.from_message(message, body, ack,
                                                 app=self.app,
                                                 app=self.app,
                                                 logger=self.logger,
                                                 logger=self.logger,
                                                 hostname=self.hostname,
                                                 hostname=self.hostname,
                                                 eventer=self.event_dispatcher)
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
             except NotRegistered, exc:
-                self.logger.error("Unknown task ignored: %s: %s" % (
-                        str(exc), message_data), exc_info=sys.exc_info())
+                self.logger.error("Unknown task ignored: %r Body->%r" % (
+                        exc, body), exc_info=sys.exc_info())
                 message.ack()
                 message.ack()
             except InvalidTaskError, exc:
             except InvalidTaskError, exc:
                 self.logger.error("Invalid task ignored: %s: %s" % (
                 self.logger.error("Invalid task ignored: %s: %s" % (
-                        str(exc), message_data), exc_info=sys.exc_info())
+                        str(exc), body), exc_info=sys.exc_info())
                 message.ack()
                 message.ack()
             else:
             else:
                 self.on_task(task)
                 self.on_task(task)
@@ -340,7 +340,7 @@ class Consumer(object):
 
 
         warnings.warn(RuntimeWarning(
         warnings.warn(RuntimeWarning(
             "Received and deleted unknown message. Wrong destination?!? \
             "Received and deleted unknown message. Wrong destination?!? \
-             the message was: %s" % message_data))
+             the message was: %s" % body))
         message.ack()
         message.ack()
 
 
     def maybe_conn_error(self, fun):
     def maybe_conn_error(self, fun):
@@ -477,7 +477,7 @@ class Consumer(object):
     def info(self):
     def info(self):
         conninfo = {}
         conninfo = {}
         if self.connection:
         if self.connection:
-            conninfo = self.app.amqp.get_broker_info(self.connection)
+            conninfo = self.connection.info()
             conninfo.pop("password", None)  # don't send password.
             conninfo.pop("password", None)  # don't send password.
         return {"broker": conninfo,
         return {"broker": conninfo,
                 "prefetch_count": self.qos.next}
                 "prefetch_count": self.qos.next}

+ 8 - 8
celery/worker/job.py

@@ -262,7 +262,7 @@ class TaskRequest(object):
             self._store_errors = self.task.store_errors_even_if_ignored
             self._store_errors = self.task.store_errors_even_if_ignored
 
 
     @classmethod
     @classmethod
-    def from_message(cls, message, message_data, on_ack=noop, **kw):
+    def from_message(cls, message, body, on_ack=noop, **kw):
         """Create request from a task message.
         """Create request from a task message.
 
 
         :raises UnknownTaskError: if the message does not describe a task,
         :raises UnknownTaskError: if the message does not describe a task,
@@ -273,17 +273,17 @@ class TaskRequest(object):
         delivery_info = dict((key, _delivery_info.get(key))
         delivery_info = dict((key, _delivery_info.get(key))
                                 for key in WANTED_DELIVERY_INFO)
                                 for key in WANTED_DELIVERY_INFO)
 
 
-        kwargs = message_data["kwargs"]
+        kwargs = body["kwargs"]
         if not hasattr(kwargs, "items"):
         if not hasattr(kwargs, "items"):
             raise InvalidTaskError("Task keyword arguments is not a mapping.")
             raise InvalidTaskError("Task keyword arguments is not a mapping.")
 
 
-        return cls(task_name=message_data["task"],
-                   task_id=message_data["id"],
-                   args=message_data["args"],
+        return cls(task_name=body["task"],
+                   task_id=body["id"],
+                   args=body["args"],
                    kwargs=kwdict(kwargs),
                    kwargs=kwdict(kwargs),
-                   retries=message_data.get("retries", 0),
-                   eta=maybe_iso8601(message_data.get("eta")),
-                   expires=maybe_iso8601(message_data.get("expires")),
+                   retries=body.get("retries", 0),
+                   eta=maybe_iso8601(body.get("eta")),
+                   expires=maybe_iso8601(body.get("expires")),
                    on_ack=on_ack,
                    on_ack=on_ack,
                    delivery_info=delivery_info,
                    delivery_info=delivery_info,
                    **kw)
                    **kw)

+ 3 - 102
setup.py

@@ -34,120 +34,25 @@ os.environ.pop("CELERY_NO_EVAL", None)
 sys.modules.pop("celery", None)
 sys.modules.pop("celery", None)
 
 
 
 
-def with_dist_not_in_path(fun):
-
-    def _inner(*args, **kwargs):
-        cwd = os.getcwd()
-        removed = []
-        for path in (cwd, cwd + "/", "."):
-            try:
-                i = sys.path.index(path)
-            except ValueError:
-                pass
-            else:
-                removed.append((i, path))
-                sys.path.remove(path)
-
-        try:
-            dist_module = sys.modules.pop("celery", None)
-            try:
-                import celery as existing_module
-            except ImportError:
-                pass
-            else:
-                kwargs["celery"] = existing_module
-                return fun(*args, **kwargs)
-        finally:
-            for i, path in removed:
-                sys.path.insert(i, path)
-            if dist_module:
-                sys.modules["celery"] = dist_module
-
-    return _inner
-
-
-class Upgrade(object):
-    old_modules = ("platform", )
-
-    def run(self, dist=False):
-        detect_ = self.detect_existing_installation
-        if not dist:
-            detect = with_dist_not_in_path(detect_)
-        else:
-            detect = lambda: detect_(distmeta)
-        path = detect()
-        if path:
-            self.remove_modules(path)
-
-    def detect_existing_installation(self, celery=None):
-        path = os.path.dirname(celery.__file__)
-        sys.stderr.write("* Upgrading old Celery from: \n\t%r\n" % path)
-        return path
-
-    def try_remove(self, file):
-        try:
-            os.remove(file)
-        except OSError:
-            pass
-
-    def remove_modules(self, path):
-        for module_name in self.old_modules:
-            sys.stderr.write("* Removing old %s.py...\n" % module_name)
-            self.try_remove(os.path.join(path, "%s.py" % module_name))
-            self.try_remove(os.path.join(path, "%s.pyc" % module_name))
-
-
-class mytest(test):
-
-    def run(self, *args, **kwargs):
-        Upgrade().run(dist=True)
-        test.run(self, *args, **kwargs)
-
-
-class quicktest(mytest):
+class quicktest(test):
     extra_env = dict(SKIP_RLIMITS=1, QUICKTEST=1)
     extra_env = dict(SKIP_RLIMITS=1, QUICKTEST=1)
 
 
     def run(self, *args, **kwargs):
     def run(self, *args, **kwargs):
         for env_name, env_value in self.extra_env.items():
         for env_name, env_value in self.extra_env.items():
             os.environ[env_name] = str(env_value)
             os.environ[env_name] = str(env_value)
-        mytest.run(self, *args, **kwargs)
-
-
-class upgrade(Command):
-    user_options = []
-
-    def run(self, *args, **kwargs):
-        Upgrade().run()
-
-    def initialize_options(self):
-        pass
-
-    def finalize_options(self):
-        pass
-
-
-class upgrade_and_install(install):
-
-    def run(self, *args, **kwargs):
-        Upgrade().run()
-        install.run(self, *args, **kwargs)
-
+        test.run(self, *args, **kwargs)
 
 
 install_requires = []
 install_requires = []
-
 try:
 try:
     import importlib
     import importlib
 except ImportError:
 except ImportError:
     install_requires.append("importlib")
     install_requires.append("importlib")
-
-
 install_requires.extend([
 install_requires.extend([
     "python-dateutil",
     "python-dateutil",
     "anyjson",
     "anyjson",
     "kombu>=1.0.0b4",
     "kombu>=1.0.0b4",
     "pyparsing>=1.5.0",
     "pyparsing>=1.5.0",
 ])
 ])
-
 py_version = sys.version_info
 py_version = sys.version_info
 if sys.version_info < (2, 6) and not sys.platform.startswith("java"):
 if sys.version_info < (2, 6) and not sys.platform.startswith("java"):
     install_requires.append("multiprocessing")
     install_requires.append("multiprocessing")
@@ -167,8 +72,6 @@ console_scripts = [
         'celeryctl = celery.bin.celeryctl:main',
         'celeryctl = celery.bin.celeryctl:main',
         'celeryd-multi = celery.bin.celeryd_multi:main',
         'celeryd-multi = celery.bin.celeryd_multi:main',
 ]
 ]
-
-import platform
 if platform.system() == "Windows":
 if platform.system() == "Windows":
     console_scripts.append('celeryd = celery.bin.celeryd:windows_main')
     console_scripts.append('celeryd = celery.bin.celeryd:windows_main')
 else:
 else:
@@ -188,9 +91,7 @@ setup(
     zip_safe=False,
     zip_safe=False,
     install_requires=install_requires,
     install_requires=install_requires,
     tests_require=tests_require,
     tests_require=tests_require,
-    cmdclass={"install": upgrade_and_install,
-              "upgrade": upgrade,
-              "test": mytest,
+    cmdclass={"test": test,
               "quicktest": quicktest},
               "quicktest": quicktest},
     test_suite="nose.collector",
     test_suite="nose.collector",
     classifiers=[
     classifiers=[