Kaynağa Gözat

celery.messaging is now Celery().amqp

Ask Solem 14 yıl önce
ebeveyn
işleme
a65b16d477

+ 1 - 1
celery/__init__.py

@@ -11,4 +11,4 @@ __docformat__ = "restructuredtext"
 
 
 def Celery(*args, **kwargs):
 def Celery(*args, **kwargs):
     from celery import app
     from celery import app
-    return app.Celery(*args, **kwargs)
+    return app.App(*args, **kwargs)

+ 22 - 8
celery/app/__init__.py

@@ -1,4 +1,10 @@
+import os
+
+from inspect import getargspec
+
+from celery import registry
 from celery.app import base
 from celery.app import base
+from celery.utils.functional import wraps
 
 
 
 
 class App(base.BaseApp):
 class App(base.BaseApp):
@@ -79,12 +85,20 @@ class App(base.BaseApp):
 
 
 default_app = App()
 default_app = App()
 
 
-counts = [0]
-from multiprocessing import current_process
-def app_or_default(app=None):
-    if app is None:
-        if counts[0] >= 1:
+
+if os.environ.get("CELERY_TRACE_APP"):
+    from multiprocessing import current_process
+    def app_or_default(app=None):
+        if app is None:
+            if current_process()._name == "MainProcess":
+                raise Exception("DEFAULT APP")
             print("RETURNING TO DEFAULT APP")
             print("RETURNING TO DEFAULT APP")
-        counts[0] += 1
-        return default_app
-    return app
+            import traceback
+            traceback.print_stack()
+            return default_app
+        return app
+else:
+    def app_or_default(app=None):
+        if app is None:
+            return default_app
+        return app

+ 176 - 0
celery/app/amqp.py

@@ -0,0 +1,176 @@
+
+from datetime import datetime, timedelta
+
+from carrot.connection import BrokerConnection
+from carrot import messaging
+
+from celery import routes
+from celery import signals
+from celery.utils import gen_unique_id, mitemgetter
+
+
+MSG_OPTIONS = ("mandatory", "priority", "immediate",
+               "routing_key", "serializer", "delivery_mode")
+
+get_msg_options = mitemgetter(*MSG_OPTIONS)
+extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
+
+
+_queues_declared = False
+_exchanges_declared = set()
+
+
+class TaskPublisher(messaging.Publisher):
+    auto_declare = False
+
+    def declare(self):
+        if self.exchange not in _exchanges_declared:
+            super(TaskPublisher, self).declare()
+            _exchanges_declared.add(self.exchange)
+
+    def delay_task(self, task_name, task_args=None, task_kwargs=None,
+            countdown=None, eta=None, task_id=None, taskset_id=None,
+            expires=None, exchange=None, exchange_type=None, **kwargs):
+        """Delay task for execution by the celery nodes."""
+
+        task_id = task_id or gen_unique_id()
+        task_args = task_args or []
+        task_kwargs = task_kwargs or {}
+        now = None
+        if countdown: # Convert countdown to ETA.
+            now = datetime.now()
+            eta = now + timedelta(seconds=countdown)
+
+        if not isinstance(task_args, (list, tuple)):
+            raise ValueError("task args must be a list or tuple")
+        if not isinstance(task_kwargs, dict):
+            raise ValueError("task kwargs must be a dictionary")
+
+        if isinstance(expires, int):
+            now = now or datetime.now()
+            expires = now + timedelta(seconds=expires)
+
+        message_data = {
+            "task": task_name,
+            "id": task_id,
+            "args": task_args or [],
+            "kwargs": task_kwargs or {},
+            "retries": kwargs.get("retries", 0),
+            "eta": eta and eta.isoformat(),
+            "expires": expires and expires.isoformat(),
+        }
+
+        if taskset_id:
+            message_data["taskset"] = taskset_id
+
+        # FIXME (carrot Publisher.send needs to accept exchange argument)
+        if exchange:
+            self.exchange = exchange
+        if exchange_type:
+            self.exchange_type = exchange_type
+        self.send(message_data, **extract_msg_options(kwargs))
+        signals.task_sent.send(sender=task_name, **message_data)
+
+        return task_id
+
+
+class ConsumerSet(messaging.ConsumerSet):
+    """ConsumerSet with an optional decode error callback.
+
+    For more information see :class:`carrot.messaging.ConsumerSet`.
+
+    .. attribute:: on_decode_error
+
+        Callback called if a message had decoding errors.
+        The callback is called with the signature::
+
+            callback(message, exception)
+
+    """
+    on_decode_error = None
+
+    def _receive_callback(self, raw_message):
+        message = self.backend.message_to_python(raw_message)
+        if self.auto_ack and not message.acknowledged:
+            message.ack()
+        try:
+            decoded = message.decode()
+        except Exception, exc:
+            if self.on_decode_error:
+                return self.on_decode_error(message, exc)
+            else:
+                raise
+        self.receive(decoded, message)
+
+
+class AMQP(object):
+    BrokerConnection = BrokerConnection
+    Publisher = messaging.Publisher
+    Consumer = messaging.Consumer
+    ConsumerSet = ConsumerSet
+
+    def __init__(self, app):
+        self.app = app
+
+    def get_queues(self):
+        c = self.app.conf
+        queues = c.CELERY_QUEUES
+
+        def _defaults(opts):
+            opts.setdefault("exchange", c.CELERY_DEFAULT_EXCHANGE),
+            opts.setdefault("exchange_type", c.CELERY_DEFAULT_EXCHANGE_TYPE)
+            opts.setdefault("binding_key", c.CELERY_DEFAULT_EXCHANGE)
+            opts.setdefault("routing_key", opts.get("binding_key"))
+            return opts
+
+        return dict((queue, _defaults(opts))
+                    for queue, opts in queues.items())
+
+    def get_default_queue(self):
+        q = self.app.conf.CELERY_DEFAULT_QUEUE
+        return q, self.get_queues()[q]
+
+    def Router(self, queues=None, create_missing=None):
+        return routes.Router(self.app.conf.CELERY_ROUTES,
+                             queues or self.app.conf.CELERY_QUEUES,
+                             self.app.either("CELERY_CREATE_MISSING_QUEUES",
+                                             create_missing))
+
+    def TaskConsumer(self, *args, **kwargs):
+        default_queue_name, default_queue = self.get_default_queue()
+        defaults = dict({"queue": default_queue_name}, **default_queue)
+        defaults["routing_key"] = defaults.pop("binding_key", None)
+        return self.Consumer(*args,
+                             **self.app.merge(defaults, kwargs))
+
+    def TaskPublisher(self, *args, **kwargs):
+        _, default_queue = self.get_default_queue()
+        defaults = {"exchange": default_queue["exchange"],
+                    "exchange_type": default_queue["exchange_type"],
+                    "routing_key": self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
+                    "serializer": self.app.conf.CELERY_TASK_SERIALIZER}
+        publisher = TaskPublisher(*args,
+                                  **self.app.merge(defaults, kwargs))
+
+        # Make sure all queues are declared.
+        global _queues_declared
+        if not _queues_declared:
+            consumers = self.get_consumer_set(publisher.connection)
+            consumers.close()
+            _queues_declared = True
+        publisher.declare()
+
+        return publisher
+
+    def get_consumer_set(self, connection, queues=None, **options):
+        queues = queues or self.get_queues()
+
+        cset = self.ConsumerSet(connection)
+        for queue_name, queue_options in queues.items():
+            queue_options = dict(queue_options)
+            queue_options["routing_key"] = queue_options.pop("binding_key",
+                                                             None)
+            consumer = self.Consumer(connection, queue=queue_name,
+                                     backend=cset.backend, **queue_options)
+            cset.consumers.append(consumer)
+        return cset

+ 68 - 32
celery/app/base.py

@@ -3,8 +3,6 @@ import sys
 
 
 from datetime import timedelta
 from datetime import timedelta
 
 
-from carrot.connection import BrokerConnection
-
 from celery import routes
 from celery import routes
 from celery.app.defaults import DEFAULTS
 from celery.app.defaults import DEFAULTS
 from celery.datastructures import AttributeDict
 from celery.datastructures import AttributeDict
@@ -13,10 +11,12 @@ from celery.utils.functional import wraps
 
 
 
 
 class BaseApp(object):
 class BaseApp(object):
+    _amqp = None
     _backend = None
     _backend = None
     _conf = None
     _conf = None
     _control = None
     _control = None
     _loader = None
     _loader = None
+    _log = None
 
 
     def __init__(self, loader=None, backend_cls=None):
     def __init__(self, loader=None, backend_cls=None):
         self.loader_cls = loader or os.environ.get("CELERY_LOADER", "default")
         self.loader_cls = loader or os.environ.get("CELERY_LOADER", "default")
@@ -28,29 +28,66 @@ class BaseApp(object):
                 return value
                 return value
         return self.conf.get(default_key)
         return self.conf.get(default_key)
 
 
-    def get_queues(self):
-        c = self.conf
-        queues = c.CELERY_QUEUES
+    def merge(self, a, b):
+        """Like ``dict(a, **b)`` except it will keep values from ``a``
+        if the value in ``b`` is :const:`None`""".
+        b = dict(b)
+        for key, value in a.items():
+            if b.get(key) is None:
+                b[key] = value
+        return b
+
+
+    def AsyncResult(self, task_id, backend=None):
+        from celery.result import BaseAsyncResult
+        return BaseAsyncResult(task_id, app=self,
+                               backend=backend or self.backend)
+
+    def TaskSetResult(self, taskset_id, results, **kwargs):
+        from celery.result import TaskSetResult
+        return TaskSetResult(taskset_id, results, app=self)
+
 
 
-        def _defaults(opts):
-            opts.setdefault("exchange", c.CELERY_DEFAULT_EXCHANGE),
-            opts.setdefault("exchange_type", c.CELERY_DEFAULT_EXCHANGE_TYPE)
-            opts.setdefault("binding_key", c.CELERY_DEFAULT_EXCHANGE)
-            opts.setdefault("routing_key", opts.get("binding_key"))
-            return opts
+    def send_task(self, name, args=None, kwargs=None, countdown=None,
+            eta=None, task_id=None, publisher=None, connection=None,
+            connect_timeout=None, result_cls=None, expires=None,
+            **options):
+        """Send task by name.
 
 
-        return dict((queue, _defaults(opts))
-                    for queue, opts in queues.items())
+        Useful if you don't have access to the task class.
 
 
-    def get_default_queue(self):
-        q = self.conf.CELERY_DEFAULT_QUEUE
-        return q, self.get_queues()[q]
+        :param name: Name of task to execute.
+
+        Supports the same arguments as
+        :meth:`~celery.task.base.BaseTask.apply_async`.
+
+        """
+        result_cls = result_cls or self.AsyncResult
+        exchange = options.get("exchange")
+        exchange_type = options.get("exchange_type")
+
+        def _do_publish(connection=None, **_):
+            publish = publisher or self.amqp.TaskPublisher(connection,
+                                            exchange=exchange,
+                                            exchange_type=exchange_type)
+            try:
+                new_id = publish.delay_task(name, args, kwargs,
+                                            task_id=task_id,
+                                            countdown=countdown, eta=eta,
+                                            expires=expires, **options)
+            finally:
+                publisher or publish.close()
+
+            return result_cls(new_id)
+
+        return self.with_default_connection(_do_publish)(
+                connection=connection, connect_timeout=connect_timeout)
 
 
     def broker_connection(self, hostname=None, userid=None,
     def broker_connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, backend_cls=None):
             insist=None, connect_timeout=None, backend_cls=None):
         """Establish a connection to the message broker."""
         """Establish a connection to the message broker."""
-        return BrokerConnection(
+        return self.amqp.BrokerConnection(
                     hostname or self.conf.BROKER_HOST,
                     hostname or self.conf.BROKER_HOST,
                     userid or self.conf.BROKER_USER,
                     userid or self.conf.BROKER_USER,
                     password or self.conf.BROKER_PASSWORD,
                     password or self.conf.BROKER_PASSWORD,
@@ -78,21 +115,6 @@ class BaseApp(object):
                 close_connection()
                 close_connection()
         return _inner
         return _inner
 
 
-    def get_consumer_set(self, connection, queues=None, **options):
-        from celery.messaging import ConsumerSet, Consumer
-
-        queues = queues or self.get_queues()
-
-        cset = ConsumerSet(connection)
-        for queue_name, queue_options in queues.items():
-            queue_options = dict(queue_options)
-            queue_options["routing_key"] = queue_options.pop("binding_key",
-                                                             None)
-            consumer = Consumer(connection, queue=queue_name,
-                                backend=cset.backend, **queue_options)
-            cset.consumers.append(consumer)
-        return cset
-
     def pre_config_merge(self, c):
     def pre_config_merge(self, c):
         if not c.get("CELERY_RESULT_BACKEND"):
         if not c.get("CELERY_RESULT_BACKEND"):
             c["CELERY_RESULT_BACKEND"] = c.get("CELERY_BACKEND")
             c["CELERY_RESULT_BACKEND"] = c.get("CELERY_BACKEND")
@@ -135,6 +157,13 @@ class BaseApp(object):
                              self.conf.EMAIL_HOST_PASSWORD)
                              self.conf.EMAIL_HOST_PASSWORD)
         mailer.send(message, fail_silently=fail_silently)
         mailer.send(message, fail_silently=fail_silently)
 
 
+    @property
+    def amqp(self):
+        if self._amqp is None:
+            from celery.app.amqp import AMQP
+            self._amqp = AMQP(self)
+        return self._amqp
+
     @property
     @property
     def backend(self):
     def backend(self):
         if self._backend is None:
         if self._backend is None:
@@ -165,3 +194,10 @@ class BaseApp(object):
             from celery.task.control import Control
             from celery.task.control import Control
             self._control = Control(app=self)
             self._control = Control(app=self)
         return self._control
         return self._control
+
+    @property
+    def log(self):
+        if self._log is None:
+            from celery.log import Logging
+            self._log = Logging(app=self)
+        return self._log

+ 6 - 8
celery/apps/beat.py

@@ -44,13 +44,12 @@ class Beat(object):
         self.start_scheduler(logger)
         self.start_scheduler(logger)
 
 
     def setup_logging(self):
     def setup_logging(self):
-        from celery import log
-        handled = log.setup_logging_subsystem(loglevel=self.loglevel,
-                                              logfile=self.logfile,
-                                              app=self.app)
+        handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
+                                                       logfile=self.logfile)
         if not handled:
         if not handled:
-            logger = log.get_default_logger(name="celery.beat")
-            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+            logger = self.app.log.get_default_logger(name="celery.beat")
+            self.app.log.redirect_stdouts_to_logger(logger,
+                                                    loglevel=logging.WARNING)
         return logger
         return logger
 
 
     def start_scheduler(self, logger=None):
     def start_scheduler(self, logger=None):
@@ -71,8 +70,7 @@ class Beat(object):
     def init_loader(self):
     def init_loader(self):
         # Run the worker init handler.
         # Run the worker init handler.
         # (Usually imports task modules and such.)
         # (Usually imports task modules and such.)
-        from celery.loaders import current_loader
-        current_loader().init_worker()
+        self.app.loader.init_worker()
 
 
     def startup_info(self):
     def startup_info(self):
         return STARTUP_INFO_FMT % {
         return STARTUP_INFO_FMT % {

+ 8 - 10
celery/apps/worker.py

@@ -11,7 +11,6 @@ from celery import platform
 from celery import signals
 from celery import signals
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
-from celery.routes import Router
 from celery.utils import info, get_full_cls_name, LOG_LEVELS
 from celery.utils import info, get_full_cls_name, LOG_LEVELS
 from celery.worker import WorkController
 from celery.worker import WorkController
 
 
@@ -100,7 +99,8 @@ class Worker(object):
         print("celery@%s has started." % self.hostname)
         print("celery@%s has started." % self.hostname)
 
 
     def init_queues(self):
     def init_queues(self):
-        queues = self.app.get_queues()
+        amqp = self.app.amqp
+        queues = amqp.get_queues()
         if self.use_queues:
         if self.use_queues:
             queues = dict((queue, options)
             queues = dict((queue, options)
                                 for queue, options in queues.items()
                                 for queue, options in queues.items()
@@ -108,7 +108,7 @@ class Worker(object):
             for queue in self.use_queues:
             for queue in self.use_queues:
                 if queue not in queues:
                 if queue not in queues:
                     if self.app.conf.CELERY_CREATE_MISSING_QUEUES:
                     if self.app.conf.CELERY_CREATE_MISSING_QUEUES:
-                        Router(queues=queues).add_queue(queue)
+                        amqp.Router(queues=queues).add_queue(queue)
                     else:
                     else:
                         raise ImproperlyConfigured(
                         raise ImproperlyConfigured(
                             "Queue '%s' not defined in CELERY_QUEUES" % queue)
                             "Queue '%s' not defined in CELERY_QUEUES" % queue)
@@ -120,14 +120,12 @@ class Worker(object):
         map(self.loader.import_module, self.include)
         map(self.loader.import_module, self.include)
 
 
     def redirect_stdouts_to_logger(self):
     def redirect_stdouts_to_logger(self):
-        from celery import log
-        handled = log.setup_logging_subsystem(loglevel=self.loglevel,
-                                              logfile=self.logfile,
-                                              app=self.app)
-        # Redirect stdout/stderr to our logger.
+        handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
+                                                       logfile=self.logfile)
         if not handled:
         if not handled:
-            logger = log.get_default_logger()
-            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+            logger = self.app.log.get_default_logger()
+            self.app.log.redirect_stdouts_to_logger(logger,
+                                                    loglevel=logging.WARNING)
 
 
     def purge_messages(self):
     def purge_messages(self):
         count = self.app.control.discard_all()
         count = self.app.control.discard_all()

+ 1 - 2
celery/backends/amqp.py

@@ -10,7 +10,6 @@ from carrot.messaging import Consumer, Publisher
 from celery import states
 from celery import states
 from celery.backends.base import BaseDictBackend
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import TimeoutError
 from celery.exceptions import TimeoutError
-from celery.messaging import establish_connection
 from celery.utils import timeutils
 from celery.utils import timeutils
 
 
 
 
@@ -193,7 +192,7 @@ class AMQPBackend(BaseDictBackend):
     @property
     @property
     def connection(self):
     def connection(self):
         if not self._connection:
         if not self._connection:
-            self._connection = establish_connection()
+            self._connection = self.app.broker_connection()
         return self._connection
         return self._connection
 
 
     def reload_task_result(self, task_id):
     def reload_task_result(self, task_id):

+ 2 - 3
celery/backends/cassandra.py

@@ -15,7 +15,6 @@ from datetime import datetime
 
 
 from celery.backends.base import BaseDictBackend
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
-from celery.log import setup_logger
 from celery.serialization import pickle
 from celery.serialization import pickle
 from celery import states
 from celery import states
 
 
@@ -49,8 +48,8 @@ class CassandraBackend(BaseDictBackend):
 
 
         """
         """
         super(CassandraBackend, self).__init__(**kwargs)
         super(CassandraBackend, self).__init__(**kwargs)
-        self.logger = setup_logger(name="celery.backends.cassandra",
-                                   app=self.app)
+        self.logger = self.app.log.setup_logger(
+                            name="celery.backends.cassandra")
 
 
         self.result_expires = kwargs.get("result_expires") or \
         self.result_expires = kwargs.get("result_expires") or \
                                 self.app.conf.CELERY_TASK_RESULT_EXPIRES
                                 self.app.conf.CELERY_TASK_RESULT_EXPIRES

+ 9 - 9
celery/beat.py

@@ -10,12 +10,10 @@ import multiprocessing
 from datetime import datetime
 from datetime import datetime
 from UserDict import UserDict
 from UserDict import UserDict
 
 
-from celery import log
 from celery import platform
 from celery import platform
 from celery.app import app_or_default
 from celery.app import app_or_default
-from celery.execute import send_task
+from celery.log import SilenceRepeated
 from celery.schedules import maybe_schedule
 from celery.schedules import maybe_schedule
-from celery.messaging import establish_connection
 from celery.utils import instantiate
 from celery.utils import instantiate
 from celery.utils.info import humanize_seconds
 from celery.utils.info import humanize_seconds
 
 
@@ -139,7 +137,8 @@ class Scheduler(UserDict):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         conf = self.app.conf
         conf = self.app.conf
         self.data = schedule
         self.data = schedule
-        self.logger = logger or log.get_default_logger(name="celery.beat")
+        self.logger = logger or self.app.log.get_default_logger(
+                                                name="celery.beat")
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.setup_schedule()
         self.setup_schedule()
 
 
@@ -164,7 +163,7 @@ class Scheduler(UserDict):
 
 
         """
         """
         remaining_times = []
         remaining_times = []
-        connection = establish_connection()
+        connection = self.app.broker_connection()
         try:
         try:
             try:
             try:
                 for entry in self.schedule.itervalues():
                 for entry in self.schedule.itervalues():
@@ -197,7 +196,7 @@ class Scheduler(UserDict):
         return result
         return result
 
 
     def send_task(self, *args, **kwargs): # pragma: no cover
     def send_task(self, *args, **kwargs): # pragma: no cover
-        return send_task(*args, **kwargs)
+        return self.app.send_task(*args, **kwargs)
 
 
     def setup_schedule(self):
     def setup_schedule(self):
         pass
         pass
@@ -276,7 +275,8 @@ class Service(object):
         self.max_interval = max_interval or \
         self.max_interval = max_interval or \
                             self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
                             self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
-        self.logger = logger or log.get_default_logger(name="celery.beat")
+        self.logger = logger or self.app.log.get_default_logger(
+                                                name="celery.beat")
         self.schedule = schedule or self.app.conf.CELERYBEAT_SCHEDULE
         self.schedule = schedule or self.app.conf.CELERYBEAT_SCHEDULE
         self.schedule_filename = schedule_filename or \
         self.schedule_filename = schedule_filename or \
                                     self.app.conf.CELERYBEAT_SCHEDULE_FILENAME
                                     self.app.conf.CELERYBEAT_SCHEDULE_FILENAME
@@ -285,8 +285,8 @@ class Service(object):
         self._shutdown = threading.Event()
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self._stopped = threading.Event()
         silence = self.max_interval < 60 and 10 or 1
         silence = self.max_interval < 60 and 10 or 1
-        self.debug = log.SilenceRepeated(self.logger.debug,
-                                         max_iterations=silence)
+        self.debug = SilenceRepeated(self.logger.debug,
+                                     max_iterations=silence)
 
 
     def start(self, embedded_process=False):
     def start(self, embedded_process=False):
         self.logger.info("Celerybeat: Starting...")
         self.logger.info("Celerybeat: Starting...")

+ 7 - 3
celery/bin/camqadm.py

@@ -14,9 +14,10 @@ from itertools import count
 from amqplib import client_0_8 as amqp
 from amqplib import client_0_8 as amqp
 from carrot.utils import partition
 from carrot.utils import partition
 
 
+from celery import Celery
+from celery.app import app_or_default
 from celery.utils import info
 from celery.utils import info
 from celery.utils import padlist
 from celery.utils import padlist
-from celery.messaging import establish_connection
 
 
 # Valid string -> bool coercions.
 # Valid string -> bool coercions.
 BOOLS = {"1": True, "0": False,
 BOOLS = {"1": True, "0": False,
@@ -331,6 +332,7 @@ class AMQPAdmin(object):
     """The celery ``camqadm`` utility."""
     """The celery ``camqadm`` utility."""
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
+        self.app = app_or_default(kwargs.get("app"))
         self.silent = bool(args)
         self.silent = bool(args)
         if "silent" in kwargs:
         if "silent" in kwargs:
             self.silent = kwargs["silent"]
             self.silent = kwargs["silent"]
@@ -339,8 +341,9 @@ class AMQPAdmin(object):
     def connect(self, conn=None):
     def connect(self, conn=None):
         if conn:
         if conn:
             conn.close()
             conn.close()
-        self.say("-> connecting to %s." % info.format_broker_info())
-        conn = establish_connection()
+        self.say("-> connecting to %s." % (
+                    info.format_broker_info(app=self.app)))
+        conn = self.app.broker_connection()
         conn.connect()
         conn.connect()
         self.say("-> connected.")
         self.say("-> connected.")
         return conn
         return conn
@@ -364,6 +367,7 @@ def parse_options(arguments):
 
 
 
 
 def camqadm(*args, **options):
 def camqadm(*args, **options):
+    options["app"] = Celery()
     return AMQPAdmin(*args, **options).run()
     return AMQPAdmin(*args, **options).run()
 
 
 
 

+ 3 - 1
celery/bin/celerybeat.py

@@ -22,6 +22,7 @@
     ``ERROR``, ``CRITICAL``, or ``FATAL``.
     ``ERROR``, ``CRITICAL``, or ``FATAL``.
 
 
 """
 """
+from celery import Celery
 from celery.bin.base import Command, Option
 from celery.bin.base import Command, Option
 
 
 
 
@@ -61,7 +62,8 @@ class BeatCommand(Command):
 
 
 
 
 def main():
 def main():
-    beat = BeatCommand()
+    app = Celery()
+    beat = BeatCommand(app=app)
     beat.execute_from_commandline()
     beat.execute_from_commandline()
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":

+ 22 - 20
celery/bin/celeryctl.py

@@ -8,6 +8,8 @@ from textwrap import wrap
 from anyjson import deserialize
 from anyjson import deserialize
 
 
 from celery import __version__
 from celery import __version__
+from celery import Celery
+from celery.app import app_or_default
 from celery.utils import term
 from celery.utils import term
 
 
 
 
@@ -39,7 +41,8 @@ class Command(object):
             help="Don't colorize output."),
             help="Don't colorize output."),
     )
     )
 
 
-    def __init__(self, no_color=False):
+    def __init__(self, app=None, no_color=False):
+        self.app = app_or_default(app)
         self.colored = term.colored(enabled=not no_color)
         self.colored = term.colored(enabled=not no_color)
 
 
     def __call__(self, *args, **kwargs):
     def __call__(self, *args, **kwargs):
@@ -125,8 +128,6 @@ class apply(Command):
     )
     )
 
 
     def run(self, name, *_, **kw):
     def run(self, name, *_, **kw):
-        from celery.execute import send_task
-
         # Positional args.
         # Positional args.
         args = kw.get("args") or ()
         args = kw.get("args") or ()
         if isinstance(args, basestring):
         if isinstance(args, basestring):
@@ -144,15 +145,14 @@ class apply(Command):
         except (TypeError, ValueError):
         except (TypeError, ValueError):
             pass
             pass
 
 
-        res = send_task(name, args=args, kwargs=kwargs,
-                        countdown=kw.get("countdown"),
-                        serializer=kw.get("serializer"),
-                        queue=kw.get("queue"),
-                        exchange=kw.get("exchange"),
-                        routing_key=kw.get("routing_key"),
-                        eta=kw.get("eta"),
-                        expires=expires)
-
+        res = self.app.send_task(name, args=args, kwargs=kwargs,
+                                 countdown=kw.get("countdown"),
+                                 serializer=kw.get("serializer"),
+                                 queue=kw.get("queue"),
+                                 exchange=kw.get("exchange"),
+                                 routing_key=kw.get("routing_key"),
+                                 eta=kw.get("eta"),
+                                 expires=expires)
         self.out(res.task_id)
         self.out(res.task_id)
 apply = command(apply)
 apply = command(apply)
 
 
@@ -165,8 +165,7 @@ class result(Command):
 
 
     def run(self, task_id, *args, **kwargs):
     def run(self, task_id, *args, **kwargs):
         from celery import registry
         from celery import registry
-        from celery.result import AsyncResult
-        result_cls = AsyncResult
+        result_cls = self.app.AsyncResult
         task = kwargs.get("task")
         task = kwargs.get("task")
 
 
         if task:
         if task:
@@ -206,7 +205,6 @@ class inspect(Command):
             raise Error("Did you mean 'inspect --help'?")
             raise Error("Did you mean 'inspect --help'?")
         if command not in self.choices:
         if command not in self.choices:
             raise Error("Unknown inspect command: %s" % command)
             raise Error("Unknown inspect command: %s" % command)
-        from celery.task.control import inspect
 
 
         destination = kwargs.get("destination")
         destination = kwargs.get("destination")
         timeout = kwargs.get("timeout") or self.choices[command]
         timeout = kwargs.get("timeout") or self.choices[command]
@@ -221,9 +219,9 @@ class inspect(Command):
             self.say("->", c.cyan(node, ": ") + status, indent(preply))
             self.say("->", c.cyan(node, ": ") + status, indent(preply))
 
 
         self.say("<-", command)
         self.say("<-", command)
-        i = inspect(destination=destination,
-                    timeout=timeout,
-                    callback=on_reply)
+        i = self.app.control.inspect(destination=destination,
+                                     timeout=timeout,
+                                     callback=on_reply)
         replies = getattr(i, command)()
         replies = getattr(i, command)()
         if not replies:
         if not replies:
             raise Error("No nodes replied within time constraint.")
             raise Error("No nodes replied within time constraint.")
@@ -281,6 +279,9 @@ help = command(help)
 class celeryctl(object):
 class celeryctl(object):
     commands = commands
     commands = commands
 
 
+    def __init__(self, app=None):
+        self.app = app_or_default(app)
+
     def execute(self, command, argv=None):
     def execute(self, command, argv=None):
         if argv is None:
         if argv is None:
             argv = sys.arg
             argv = sys.arg
@@ -292,7 +293,7 @@ class celeryctl(object):
             argv.insert(1, "help")
             argv.insert(1, "help")
         cls = self.commands.get(command) or self.commands["help"]
         cls = self.commands.get(command) or self.commands["help"]
         try:
         try:
-            cls().run_from_argv(argv)
+            cls(app=self.app).run_from_argv(argv)
         except Error:
         except Error:
             return self.execute("help", argv)
             return self.execute("help", argv)
 
 
@@ -309,7 +310,8 @@ class celeryctl(object):
 
 
 def main():
 def main():
     try:
     try:
-        celeryctl().execute_from_commandline()
+        app = Celery()
+        celeryctl(app).execute_from_commandline()
     except KeyboardInterrupt:
     except KeyboardInterrupt:
         pass
         pass
 
 

+ 3 - 1
celery/bin/celeryd.py

@@ -70,6 +70,7 @@
 import multiprocessing
 import multiprocessing
 
 
 from celery import __version__
 from celery import __version__
+from celery import Celery
 from celery.bin.base import Command, Option
 from celery.bin.base import Command, Option
 
 
 
 
@@ -150,7 +151,8 @@ class WorkerCommand(Command):
 
 
 def main():
 def main():
     multiprocessing.freeze_support()
     multiprocessing.freeze_support()
-    worker = WorkerCommand()
+    app = Celery()
+    worker = WorkerCommand(app=app)
     worker.execute_from_commandline()
     worker.execute_from_commandline()
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":

+ 9 - 5
celery/bin/celeryev.py

@@ -3,6 +3,8 @@ import sys
 
 
 from optparse import OptionParser, make_option as Option
 from optparse import OptionParser, make_option as Option
 
 
+from celery import Celery
+from celery.app import app_or_default
 from celery.events.cursesmon import evtop
 from celery.events.cursesmon import evtop
 from celery.events.dumper import evdump
 from celery.events.dumper import evdump
 from celery.events.snapshot import evcam
 from celery.events.snapshot import evcam
@@ -31,13 +33,14 @@ OPTION_LIST = (
 
 
 
 
 def run_celeryev(dump=False, camera=None, frequency=1.0, maxrate=None,
 def run_celeryev(dump=False, camera=None, frequency=1.0, maxrate=None,
-        loglevel=logging.WARNING, logfile=None, **kwargs):
+        loglevel=logging.WARNING, logfile=None, app=None, **kwargs):
+    app = app_or_default(app)
     if dump:
     if dump:
-        return evdump()
+        return evdump(app=app)
     if camera:
     if camera:
-        return evcam(camera, frequency, maxrate,
+        return evcam(camera, frequency, maxrate, app=app,
                      loglevel=loglevel, logfile=logfile)
                      loglevel=loglevel, logfile=logfile)
-    return evtop()
+    return evtop(app=app)
 
 
 
 
 def parse_options(arguments):
 def parse_options(arguments):
@@ -49,7 +52,8 @@ def parse_options(arguments):
 
 
 def main():
 def main():
     options = parse_options(sys.argv[1:])
     options = parse_options(sys.argv[1:])
-    return run_celeryev(**vars(options))
+    app = Celery()
+    return run_celeryev(app=app, **vars(options))
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     main()
     main()

+ 3 - 60
celery/decorators.py

@@ -5,68 +5,11 @@ Decorators
 """
 """
 from inspect import getargspec
 from inspect import getargspec
 
 
-from celery import registry
-from celery.task.base import Task, PeriodicTask
-from celery.utils.functional import wraps
+from celery.app import default_app
+from celery.task.base import PeriodicTask
 
 
 
 
-def task(*args, **options):
-    """Decorator to create a task class out of any callable.
-
-    Examples:
-
-    .. code-block:: python
-
-        @task()
-        def refresh_feed(url):
-            return Feed.objects.get(url=url).refresh()
-
-    With setting extra options and using retry.
-
-    .. code-block:: python
-
-        @task(exchange="feeds")
-        def refresh_feed(url, **kwargs):
-            try:
-                return Feed.objects.get(url=url).refresh()
-            except socket.error, exc:
-                refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
-
-    Calling the resulting task:
-
-        >>> refresh_feed("http://example.com/rss") # Regular
-        <Feed: http://example.com/rss>
-        >>> refresh_feed.delay("http://example.com/rss") # Async
-        <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
-
-
-    """
-
-    def inner_create_task_cls(**options):
-
-        def _create_task_cls(fun):
-            base = options.pop("base", Task)
-
-            @wraps(fun, assigned=("__module__", "__name__"))
-            def run(self, *args, **kwargs):
-                return fun(*args, **kwargs)
-
-            # Save the argspec for this task so we can recognize
-            # which default task kwargs we're going to pass to it later.
-            # (this happens in celery.utils.fun_takes_kwargs)
-            run.argspec = getargspec(fun)
-
-            cls_dict = dict(options, run=run,
-                            __module__=fun.__module__,
-                            __doc__=fun.__doc__)
-            T = type(fun.__name__, (base, ), cls_dict)()
-            return registry.tasks[T.name] # global instance.
-
-        return _create_task_cls
-
-    if len(args) == 1 and callable(args[0]):
-        return inner_create_task_cls()(*args)
-    return inner_create_task_cls(**options)
+task = default_app.task
 
 
 
 
 def periodic_task(**options):
 def periodic_task(**options):

+ 10 - 8
celery/events/cursesmon.py

@@ -9,10 +9,9 @@ from itertools import count
 from textwrap import wrap
 from textwrap import wrap
 
 
 from celery import states
 from celery import states
+from celery.app import app_or_default
 from celery.events import EventReceiver
 from celery.events import EventReceiver
 from celery.events.state import State
 from celery.events.state import State
-from celery.messaging import establish_connection
-from celery.task import control
 from celery.utils import abbr, abbrtask
 from celery.utils import abbr, abbrtask
 
 
 
 
@@ -33,7 +32,8 @@ class CursesMonitor(object):
     greet = "celeryev %s" % celery.__version__
     greet = "celeryev %s" % celery.__version__
     info_str = "Info: "
     info_str = "Info: "
 
 
-    def __init__(self, state, keymap=None):
+    def __init__(self, state, keymap=None, app=None):
+        self.app = app_or_default(app)
         self.keymap = keymap or self.keymap
         self.keymap = keymap or self.keymap
         self.state = state
         self.state = state
         default_keymap = {"J": self.move_selection_down,
         default_keymap = {"J": self.move_selection_down,
@@ -129,7 +129,8 @@ class CursesMonitor(object):
         rlimit = self.readline(my - 2, 3 + len(r))
         rlimit = self.readline(my - 2, 3 + len(r))
 
 
         if rlimit:
         if rlimit:
-            reply = control.rate_limit(task.name, rlimit.strip(), reply=True)
+            reply = self.app.control.rate_limit(task.name,
+                                                rlimit.strip(), reply=True)
             self.alert_remote_control_reply(reply)
             self.alert_remote_control_reply(reply)
 
 
     def alert_remote_control_reply(self, reply):
     def alert_remote_control_reply(self, reply):
@@ -181,7 +182,7 @@ class CursesMonitor(object):
     def revoke_selection(self):
     def revoke_selection(self):
         if not self.selected_task:
         if not self.selected_task:
             return curses.beep()
             return curses.beep()
-        reply = control.revoke(self.selected_task, reply=True)
+        reply = self.app.control.revoke(self.selected_task, reply=True)
         self.alert_remote_control_reply(reply)
         self.alert_remote_control_reply(reply)
 
 
     def selection_info(self):
     def selection_info(self):
@@ -384,15 +385,16 @@ class DisplayThread(threading.Thread):
             self.display.nap()
             self.display.nap()
 
 
 
 
-def evtop():
+def evtop(app=None):
     sys.stderr.write("-> evtop: starting capture...\n")
     sys.stderr.write("-> evtop: starting capture...\n")
+    app = app_or_default(app)
     state = State()
     state = State()
     display = CursesMonitor(state)
     display = CursesMonitor(state)
     display.init_screen()
     display.init_screen()
     refresher = DisplayThread(display)
     refresher = DisplayThread(display)
     refresher.start()
     refresher.start()
-    conn = establish_connection()
-    recv = EventReceiver(conn, handlers={"*": state.event})
+    conn = app.broker_connection()
+    recv = EventReceiver(conn, app=app, handlers={"*": state.event})
     try:
     try:
         recv.capture(limit=None)
         recv.capture(limit=None)
     except Exception:
     except Exception:

+ 5 - 4
celery/events/dumper.py

@@ -2,9 +2,9 @@ import sys
 
 
 from datetime import datetime
 from datetime import datetime
 
 
+from celery.app import app_or_default
 from celery.datastructures import LocalCache
 from celery.datastructures import LocalCache
 from celery.events import EventReceiver
 from celery.events import EventReceiver
-from celery.messaging import establish_connection
 
 
 
 
 TASK_NAMES = LocalCache(0xFFF)
 TASK_NAMES = LocalCache(0xFFF)
@@ -52,11 +52,12 @@ class Dumper(object):
                                     humanize_type(type), sep, task, fields))
                                     humanize_type(type), sep, task, fields))
 
 
 
 
-def evdump():
+def evdump(app=None):
     sys.stderr.write("-> evdump: starting capture...\n")
     sys.stderr.write("-> evdump: starting capture...\n")
+    app = app_or_default(app)
     dumper = Dumper()
     dumper = Dumper()
-    conn = establish_connection()
-    recv = EventReceiver(conn, handlers={"*": dumper.on_event})
+    conn = app.broker_connection()
+    recv = EventReceiver(conn, app=app, handlers={"*": dumper.on_event})
     try:
     try:
         recv.capture()
         recv.capture()
     except (KeyboardInterrupt, SystemExit):
     except (KeyboardInterrupt, SystemExit):

+ 11 - 11
celery/events/snapshot.py

@@ -1,10 +1,8 @@
 from celery.utils import timer2
 from celery.utils import timer2
 
 
-from celery import log
 from celery.datastructures import TokenBucket
 from celery.datastructures import TokenBucket
 from celery.events import EventReceiver
 from celery.events import EventReceiver
 from celery.events.state import State
 from celery.events.state import State
-from celery.messaging import establish_connection
 from celery.utils import instantiate, LOG_LEVELS
 from celery.utils import instantiate, LOG_LEVELS
 from celery.utils.dispatch import Signal
 from celery.utils.dispatch import Signal
 from celery.utils.timeutils import rate
 from celery.utils.timeutils import rate
@@ -17,11 +15,12 @@ class Polaroid(object):
     _tref = None
     _tref = None
 
 
     def __init__(self, state, freq=1.0, maxrate=None,
     def __init__(self, state, freq=1.0, maxrate=None,
-            cleanup_freq=3600.0, logger=None):
+            cleanup_freq=3600.0, logger=None, app=None):
+        self.app = app_or_default(app)
         self.state = state
         self.state = state
         self.freq = freq
         self.freq = freq
         self.cleanup_freq = cleanup_freq
         self.cleanup_freq = cleanup_freq
-        self.logger = logger or log.get_default_logger(name="celery.cam")
+        self.logger = logger or app.log.get_default_logger(name="celery.cam")
         self.maxrate = maxrate and TokenBucket(rate(maxrate))
         self.maxrate = maxrate and TokenBucket(rate(maxrate))
 
 
     def install(self):
     def install(self):
@@ -71,21 +70,22 @@ class Polaroid(object):
 
 
 
 
 def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
 def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
-        logfile=None):
+        logfile=None, app=None):
+    app = app_or_default(app)
     if not isinstance(loglevel, int):
     if not isinstance(loglevel, int):
         loglevel = LOG_LEVELS[loglevel.upper()]
         loglevel = LOG_LEVELS[loglevel.upper()]
-    logger = log.setup_logger(loglevel=loglevel,
-                              logfile=logfile,
-                              name="celery.evcam")
+    logger = app.log.setup_logger(loglevel=loglevel,
+                                  logfile=logfile,
+                                  name="celery.evcam")
     logger.info(
     logger.info(
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
             camera, freq))
             camera, freq))
     state = State()
     state = State()
-    cam = instantiate(camera, state,
+    cam = instantiate(camera, state, app=app,
                       freq=freq, maxrate=maxrate, logger=logger)
                       freq=freq, maxrate=maxrate, logger=logger)
     cam.install()
     cam.install()
-    conn = establish_connection()
-    recv = EventReceiver(conn, handlers={"*": state.event})
+    conn = app.broker_connection()
+    recv = EventReceiver(conn, app=app, handlers={"*": state.event})
     try:
     try:
         try:
         try:
             recv.capture(limit=None)
             recv.capture(limit=None)

+ 15 - 193
celery/execute/__init__.py

@@ -1,203 +1,25 @@
-from celery.app import app_or_default, default_app
-from celery.datastructures import ExceptionInfo
-from celery.execute.trace import TaskTrace
-from celery.messaging import TaskPublisher
+from celery.app import default_app
 from celery.registry import tasks
 from celery.registry import tasks
-from celery.result import AsyncResult, EagerResult
-from celery.routes import Router
-from celery.utils import gen_unique_id, fun_takes_kwargs, mattrgetter
 
 
-extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
-                                   "immediate", "mandatory",
-                                   "priority", "serializer",
-                                   "delivery_mode")
 
 
+def apply_async(task, *args, **kwargs):
+    """Deprecated. See :meth:`celery.task.base.BaseTask.apply_async`."""
+    # FIXME Deprecate!
+    return task.apply_async(*args, **kwargs)
 
 
-@default_app.with_default_connection
-def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
-        task_id=None, publisher=None, connection=None, connect_timeout=None,
-        router=None, expires=None, queues=None, app=None, **options):
-    """Run a task asynchronously by the celery daemon(s).
 
 
-    :param task: The :class:`~celery.task.base.Task` to run.
+def apply(task, *args, **kwargs):
+    """Deprecated. See :meth:`celery.task.base.BaseTask.apply`."""
+    # FIXME Deprecate!
+    return task.apply(*args, **kwargs)
 
 
-    :keyword args: The positional arguments to pass on to the
-      task (a :class:`list` or :class:`tuple`).
 
 
-    :keyword kwargs: The keyword arguments to pass on to the
-      task (a :class:`dict`)
-
-    :keyword countdown: Number of seconds into the future that the task should
-      execute. Defaults to immediate delivery (Do not confuse that with
-      the ``immediate`` setting, they are unrelated).
-
-    :keyword eta: A :class:`~datetime.datetime` object that describes the
-      absolute time and date of when the task should execute. May not be
-      specified if ``countdown`` is also supplied. (Do not confuse this
-      with the ``immediate`` setting, they are unrelated).
-
-    :keyword expires: Either a :class:`int`, describing the number of seconds,
-      or a :class:`~datetime.datetime` object that describes the absolute time
-      and date of when the task should expire.
-      The task will not be executed after the expiration time.
-
-    :keyword connection: Re-use existing broker connection instead
-      of establishing a new one. The ``connect_timeout`` argument is
-      not respected if this is set.
-
-    :keyword connect_timeout: The timeout in seconds, before we give up
-      on establishing a connection to the AMQP server.
-
-    :keyword routing_key: The routing key used to route the task to a worker
-      server. Defaults to the tasks :attr:`~celery.task.base.Task.exchange`
-      attribute.
-
-    :keyword exchange: The named exchange to send the task to. Defaults to
-      the tasks :attr:`~celery.task.base.Task.exchange` attribute.
-
-    :keyword exchange_type: The exchange type to initalize the exchange as
-      if not already declared. Defaults to the tasks
-      :attr:`~celery.task.base.Task.exchange_type` attribute.
-
-    :keyword immediate: Request immediate delivery. Will raise an exception
-      if the task cannot be routed to a worker immediately.
-      (Do not confuse this parameter with the ``countdown`` and ``eta``
-      settings, as they are unrelated). Defaults to the tasks
-      :attr:`~celery.task.base.Task.immediate` attribute.
-
-    :keyword mandatory: Mandatory routing. Raises an exception if there's
-      no running workers able to take on this task. Defaults to the tasks
-      :attr:`~celery.task.base.Task.mandatory` attribute.
-
-    :keyword priority: The task priority, a number between ``0`` and ``9``.
-      Defaults to the tasks :attr:`~celery.task.base.Task.priority` attribute.
-
-    :keyword serializer: A string identifying the default serialization
-      method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
-      Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
-      methods that have been registered with
-      :mod:`carrot.serialization.registry`. Defaults to the tasks
-      :attr:`~celery.task.base.Task.serializer` attribute.
-
-    **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will be
-    replaced by a local :func:`apply` call instead.
-
-    """
-    app = app_or_default(app)
-    queues = queues or app.get_queues()
-    router = router or Router(app.conf.CELERY_ROUTES, queues,
-                              app.conf.CELERY_CREATE_MISSING_QUEUES)
-
-    if app.conf.CELERY_ALWAYS_EAGER:
-        return apply(task, args, kwargs, task_id=task_id)
-
-    task = tasks[task.name] # get instance from registry
-
-    options = dict(extract_exec_options(task), **options)
-    options = router.route(options, task.name, args, kwargs)
-    exchange = options.get("exchange")
-    exchange_type = options.get("exchange_type")
-
-    publish = publisher or task.get_publisher(connection, exchange=exchange,
-                                              exchange_type=exchange_type)
-    try:
-        task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
-                                     countdown=countdown, eta=eta,
-                                     expires=expires, app=app, **options)
-    finally:
-        publisher or publish.close()
-
-    return task.AsyncResult(task_id)
-
-
-@default_app.with_default_connection
-def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
-        task_id=None, publisher=None, connection=None, connect_timeout=None,
-        result_cls=AsyncResult, expires=None, **options):
-    """Send task by name.
-
-    Useful if you don't have access to the :class:`~celery.task.base.Task`
-    class.
-
-    :param name: Name of task to execute.
-
-    Supports the same arguments as :func:`apply_async`.
-
-    """
-    exchange = options.get("exchange")
-    exchange_type = options.get("exchange_type")
-
-    publish = publisher or TaskPublisher(connection, exchange=exchange,
-                                         exchange_type=exchange_type)
-    try:
-        task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
-                                     countdown=countdown, eta=eta,
-                                     expires=expires, **options)
-    finally:
-        publisher or publish.close()
-
-    return result_cls(task_id)
+def send_task(*args, **kwargs):
+    """Deprecated. See :meth:`celery.app.App.send_task`."""
+    # FIXME Deprecate!
+    return default_app.send_task(*args, **kwargs)
 
 
 
 
 def delay_task(task_name, *args, **kwargs):
 def delay_task(task_name, *args, **kwargs):
-    """Delay a task for execution by the ``celery`` daemon.
-
-    :param task_name: the name of a task registered in the task registry.
-    :param \*args: positional arguments to pass on to the task.
-    :param \*\*kwargs: keyword arguments to pass on to the task.
-
-    :raises celery.exceptions.NotRegistered: exception if no such task
-        has been registered in the task registry.
-
-    :returns :class:`celery.result.AsyncResult`:
-
-    Example
-
-        >>> r = delay_task("update_record", name="George Costanza", age=32)
-        >>> r.ready()
-        True
-        >>> r.result
-        "Record was updated"
-
-    """
-    return apply_async(tasks[task_name], args, kwargs)
-
-
-def apply(task, args, kwargs, app=None, **options):
-    """Apply the task locally.
-
-    :keyword throw: Re-raise task exceptions. Defaults to
-        the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
-
-    This will block until the task completes, and returns a
-    :class:`celery.result.EagerResult` instance.
-
-    """
-    app = app_or_default(app)
-    args = args or []
-    kwargs = kwargs or {}
-    task_id = options.get("task_id") or gen_unique_id()
-    retries = options.get("retries", 0)
-    throw = options.pop("throw", app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS)
-
-    task = tasks[task.name] # Make sure we get the instance, not class.
-
-    default_kwargs = {"task_name": task.name,
-                      "task_id": task_id,
-                      "task_retries": retries,
-                      "task_is_eager": True,
-                      "logfile": options.get("logfile"),
-                      "delivery_info": {"is_eager": True},
-                      "loglevel": options.get("loglevel", 0)}
-    supported_keys = fun_takes_kwargs(task.run, default_kwargs)
-    extend_with = dict((key, val) for key, val in default_kwargs.items()
-                            if key in supported_keys)
-    kwargs.update(extend_with)
-
-    trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
-    retval = trace.execute()
-    if isinstance(retval, ExceptionInfo):
-        if throw:
-            raise retval.exception
-        retval = retval.exception
-    return EagerResult(task_id, retval, trace.status, traceback=trace.strtb)
+    # FIXME Deprecate!
+    return tasks[task_name].apply_async(args, kwargs)

+ 99 - 91
celery/log.py

@@ -10,7 +10,7 @@ from multiprocessing import current_process
 from multiprocessing import util as mputil
 from multiprocessing import util as mputil
 
 
 from celery import signals
 from celery import signals
-from celery.app import app_or_default
+from celery.app import default_app
 from celery.utils import noop
 from celery.utils import noop
 from celery.utils.compat import LoggerAdapter
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger
 from celery.utils.patch import ensure_process_aware_logger
@@ -47,25 +47,29 @@ class ColorFormatter(logging.Formatter):
         return logging.Formatter.format(self, record)
         return logging.Formatter.format(self, record)
 
 
 
 
-def get_task_logger(loglevel=None, name=None):
-    logger = logging.getLogger(name or "celery.task.default")
-    if loglevel is not None:
-        logger.setLevel(loglevel)
-    return logger
+class Logging(object):
+    _setup = False
 
 
+    def __init__(self, app):
+        self.app = app
+        self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
+        self.format = self.app.conf.CELERYD_LOG_FORMAT
 
 
-def setup_logging_subsystem(loglevel=None, logfile=None,
-        format=None, colorize=None, **kwargs):
-    app = app_or_default(kwargs.get("app"))
-    loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
-    format = format or app.conf.CELERYD_LOG_FORMAT
-    if colorize is None:
-        colorize = app.conf.CELERYD_LOG_COLOR
+    def get_task_logger(self, loglevel=None, name=None):
+        logger = logging.getLogger(name or "celery.task.default")
+        if loglevel is not None:
+            logger.setLevel(loglevel)
+        return logger
+
+    def setup_logging_subsystem(self, loglevel=None, logfile=None,
+            format=None, colorize=None, **kwargs):
+        loglevel = loglevel or self.loglevel
+        format = format or self.format
+        colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
 
 
-    print("COLORIZE: %s" % (app.conf.CELERYD_LOG_COLOR, ))
+        if self.__class__._setup:
+            return
 
 
-    global _setup
-    if not _setup:
         try:
         try:
             mputil._logger = None
             mputil._logger = None
         except AttributeError:
         except AttributeError:
@@ -81,88 +85,104 @@ def setup_logging_subsystem(loglevel=None, logfile=None,
             root = logging.getLogger()
             root = logging.getLogger()
             mp = mputil.get_logger()
             mp = mputil.get_logger()
             for logger in (root, mp):
             for logger in (root, mp):
-                _setup_logger(logger, logfile, format, colorize, **kwargs)
+                self._setup_logger(logger, logfile,
+                                   format, colorize, **kwargs)
                 logger.setLevel(loglevel)
                 logger.setLevel(loglevel)
-        _setup = True
+        self.__class__._setup = True
         return receivers
         return receivers
 
 
+    def _detect_handler(self, logfile=None):
+        """Create log handler with either a filename, an open stream
+        or ``None`` (stderr)."""
+        if not logfile or hasattr(logfile, "write"):
+            return logging.StreamHandler(logfile)
+        return logging.FileHandler(logfile)
 
 
-def _detect_handler(logfile=None):
-    """Create log handler with either a filename, an open stream
-    or ``None`` (stderr)."""
-    if not logfile or hasattr(logfile, "write"):
-        return logging.StreamHandler(logfile)
-    return logging.FileHandler(logfile)
+    def get_default_logger(self, loglevel=None, name="celery"):
+        """Get default logger instance.
 
 
+        :keyword loglevel: Initial log level.
 
 
-def get_default_logger(loglevel=None, name="celery"):
-    """Get default logger instance.
+        """
+        logger = logging.getLogger(name)
+        if loglevel is not None:
+            logger.setLevel(loglevel)
+        return logger
 
 
-    :keyword loglevel: Initial log level.
+    def setup_logger(self, loglevel=None, logfile=None,
+            format=None, colorize=None, name="celery", root=True,
+            app=None, **kwargs):
+        """Setup the ``multiprocessing`` logger.
 
 
-    """
-    logger = logging.getLogger(name)
-    if loglevel is not None:
-        logger.setLevel(loglevel)
-    return logger
+        If ``logfile`` is not specified, then ``sys.stderr`` is used.
+
+        Returns logger object.
 
 
+        """
+        loglevel = loglevel or self.loglevel
+        format = format or self.format
+        colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
 
 
-def setup_logger(loglevel=None, logfile=None,
-        format=None, colorize=None, name="celery", root=True,
-        app=None, **kwargs):
-    """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
-    then ``stderr`` is used.
+        if not root:
+            return self._setup_logger(self.get_default_logger(loglevel, name),
+                                      logfile, format, colorize, **kwargs)
+        self.setup_logging_subsystem(loglevel, logfile,
+                                     format, colorize, **kwargs)
+        return self.get_default_logger(name=name)
 
 
-    Returns logger object.
+    def setup_task_logger(self, loglevel=None, logfile=None, format=None,
+            colorize=None, task_kwargs=None, app=None, **kwargs):
+        """Setup the task logger.
 
 
-    """
-    app = app_or_default(app)
-    loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
-    format = format or app.conf.CELERYD_LOG_FORMAT
-    if colorize is None:
-        colorize = app.conf.CELERYD_LOG_COLOR
+        If ``logfile`` is not specified, then ``sys.stderr`` is used.
 
 
-    if not root:
-        return _setup_logger(get_default_logger(loglevel, name),
-                             logfile, format, colorize, **kwargs)
-    setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
-    return get_default_logger(name=name)
+        Returns logger object.
 
 
+        """
+        loglevel = loglevel or self.loglevel
+        format = format or self.format
+        colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
+
+        if task_kwargs is None:
+            task_kwargs = {}
+        task_kwargs.setdefault("task_id", "-?-")
+        task_name = task_kwargs.get("task_name")
+        task_kwargs.setdefault("task_name", "-?-")
+        logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
+                                    logfile, format, colorize, **kwargs)
+        return LoggerAdapter(logger, task_kwargs)
+
+    def redirect_stdouts_to_logger(self, logger, loglevel=None):
+        """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
+        logging instance.
+
+        :param logger: The :class:`logging.Logger` instance to redirect to.
+        :param loglevel: The loglevel redirected messages will be logged as.
 
 
-def setup_task_logger(loglevel=None, logfile=None, format=None, colorize=None,
-        task_kwargs=None, app=None, **kwargs):
-    """Setup the task logger. If ``logfile`` is not specified, then
-    ``stderr`` is used.
+        """
+        proxy = LoggingProxy(logger, loglevel)
+        sys.stdout = sys.stderr = proxy
+        return proxy
 
 
-    Returns logger object.
+    def _setup_logger(self, logger, logfile, format, colorize,
+            formatter=ColorFormatter, **kwargs):
 
 
-    """
-    app = app_or_default(app)
-    loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
-    format = format or app.conf.CELERYD_LOG_FORMAT
-    if colorize is None:
-        colorize = app.conf.CELERYD_LOG_COLOR
-
-    if task_kwargs is None:
-        task_kwargs = {}
-    task_kwargs.setdefault("task_id", "-?-")
-    task_name = task_kwargs.get("task_name")
-    task_kwargs.setdefault("task_name", "-?-")
-    logger = _setup_logger(get_task_logger(loglevel, task_name),
-                            logfile, format, colorize, **kwargs)
-    return LoggerAdapter(logger, task_kwargs)
-
-
-def _setup_logger(logger, logfile, format, colorize,
-        formatter=ColorFormatter, **kwargs):
-
-    if logger.handlers: # Logger already configured
+        if logger.handlers: # Logger already configured
+            return logger
+
+        handler = self._detect_handler(logfile)
+        handler.setFormatter(formatter(format, use_color=colorize))
+        logger.addHandler(handler)
         return logger
         return logger
 
 
-    handler = _detect_handler(logfile)
-    handler.setFormatter(formatter(format, use_color=colorize))
-    logger.addHandler(handler)
-    return logger
+
+_default_logging = Logging(default_app)
+setup_logging_subsystem = _default_logging.setup_logging_subsystem
+get_default_logger = _default_logging.get_default_logger
+setup_logger = _default_logging.setup_logger
+setup_task_logger = _default_logging.setup_task_logger
+get_task_logger = _default_logging.get_task_logger
+redirect_stdouts_to_logger = _default_logging.redirect_stdouts_to_logger
 
 
 
 
 def emergency_error(logfile, message):
 def emergency_error(logfile, message):
@@ -185,18 +205,6 @@ def emergency_error(logfile, message):
         closefh()
         closefh()
 
 
 
 
-def redirect_stdouts_to_logger(logger, loglevel=None):
-    """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
-    logging instance.
-
-    :param logger: The :class:`logging.Logger` instance to redirect to.
-    :param loglevel: The loglevel redirected messages will be logged as.
-
-    """
-    proxy = LoggingProxy(logger, loglevel)
-    sys.stdout = sys.stderr = proxy
-    return proxy
-
 
 
 class LoggingProxy(object):
 class LoggingProxy(object):
     """Forward file object to :class:`logging.Logger` instance.
     """Forward file object to :class:`logging.Logger` instance.

+ 7 - 134
celery/messaging.py

@@ -3,145 +3,16 @@
 Sending and Receiving Messages
 Sending and Receiving Messages
 
 
 """
 """
-from datetime import datetime, timedelta
 
 
-from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
-
-from celery import signals
 from celery.app import app_or_default, default_app
 from celery.app import app_or_default, default_app
-from celery.utils import gen_unique_id, mitemgetter, noop
-from celery.utils.functional import wraps
-
-
-MSG_OPTIONS = ("mandatory", "priority", "immediate",
-               "routing_key", "serializer", "delivery_mode")
-
-get_msg_options = mitemgetter(*MSG_OPTIONS)
-extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
-
-_queues_declared = False
-_exchanges_declared = set()
-
-
-class TaskPublisher(Publisher):
-    """Publish tasks."""
-    auto_declare = False
-
-    def __init__(self, *args, **kwargs):
-        self.app = app = app_or_default(kwargs.get("app"))
-        _, default_queue = app.get_default_queue()
-        kwargs["exchange"] = kwargs.get("exchange") or \
-                                    default_queue["exchange"]
-        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
-                                    default_queue["exchange_type"]
-        kwargs["routing_key"] = kwargs.get("routing_key") or \
-                                    app.conf.CELERY_DEFAULT_ROUTING_KEY
-        kwargs["serializer"] = kwargs.get("serializer") or \
-                                    app.conf.CELERY_TASK_SERIALIZER
-        super(TaskPublisher, self).__init__(*args, **kwargs)
-
-        # Make sure all queues are declared.
-        global _queues_declared
-        if not _queues_declared:
-            consumers = self.app.get_consumer_set(self.connection,
-                                                  self.app.get_queues())
-            consumers.close()
-            _queues_declared = True
-        self.declare()
-
-    def declare(self):
-        if self.exchange not in _exchanges_declared:
-            super(TaskPublisher, self).declare()
-            _exchanges_declared.add(self.exchange)
-
-    def delay_task(self, task_name, task_args=None, task_kwargs=None,
-            countdown=None, eta=None, task_id=None, taskset_id=None,
-            expires=None, **kwargs):
-        """Delay task for execution by the celery nodes."""
-
-        task_id = task_id or gen_unique_id()
-        task_args = task_args or []
-        task_kwargs = task_kwargs or {}
-        now = None
-        if countdown: # Convert countdown to ETA.
-            now = datetime.now()
-            eta = now + timedelta(seconds=countdown)
-
-        if not isinstance(task_args, (list, tuple)):
-            raise ValueError("task args must be a list or tuple")
-        if not isinstance(task_kwargs, dict):
-            raise ValueError("task kwargs must be a dictionary")
-
-        if isinstance(expires, int):
-            now = now or datetime.now()
-            expires = now + timedelta(seconds=expires)
-
-        message_data = {
-            "task": task_name,
-            "id": task_id,
-            "args": task_args or [],
-            "kwargs": task_kwargs or {},
-            "retries": kwargs.get("retries", 0),
-            "eta": eta and eta.isoformat(),
-            "expires": expires and expires.isoformat(),
-        }
-
-        if taskset_id:
-            message_data["taskset"] = taskset_id
-
-        self.send(message_data, **extract_msg_options(kwargs))
-        signals.task_sent.send(sender=task_name, **message_data)
-
-        return task_id
-
-
-class ConsumerSet(_ConsumerSet):
-    """ConsumerSet with an optional decode error callback.
-
-    For more information see :class:`carrot.messaging.ConsumerSet`.
-
-    .. attribute:: on_decode_error
-
-        Callback called if a message had decoding errors.
-        The callback is called with the signature::
-
-            callback(message, exception)
-
-    """
-    on_decode_error = None
-
-    def _receive_callback(self, raw_message):
-        message = self.backend.message_to_python(raw_message)
-        if self.auto_ack and not message.acknowledged:
-            message.ack()
-        try:
-            decoded = message.decode()
-        except Exception, exc:
-            if self.on_decode_error:
-                return self.on_decode_error(message, exc)
-            else:
-                raise
-        self.receive(decoded, message)
-
-
-class TaskConsumer(Consumer):
-    """Consume tasks"""
-
-    def __init__(self, *args, **kwargs):
-        app = app_or_default(kwargs.get("app"))
-        default_queue_name, default_queue = app.get_default_queue()
-        kwargs["queue"] = kwargs.get("queue") or default_queue_name
-        kwargs["exchange"] = kwargs.get("exchange") or \
-                                default_queue["exchange"]
-        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
-                                default_queue["exchange_type"]
-        kwargs["routing_key"] = kwargs.get("routing_key") or \
-                                    default_queue["binding_key"]
-        super(TaskConsumer, self).__init__(*args, **kwargs)
 
 
+TaskPublisher = default_app.amqp.TaskPublisher
+ConsumerSet = default_app.amqp.ConsumerSet
+TaskConsumer = default_app.amqp.TaskConsumer
 
 
 def establish_connection(**kwargs):
 def establish_connection(**kwargs):
     """Establish a connection to the message broker."""
     """Establish a connection to the message broker."""
+    # FIXME: # Deprecate
     app = app_or_default(kwargs.pop("app", None))
     app = app_or_default(kwargs.pop("app", None))
     return app.broker_connection(**kwargs)
     return app.broker_connection(**kwargs)
 
 
@@ -150,6 +21,7 @@ def with_connection(fun):
     """Decorator for providing default message broker connection for functions
     """Decorator for providing default message broker connection for functions
     supporting the ``connection`` and ``connect_timeout`` keyword
     supporting the ``connection`` and ``connect_timeout`` keyword
     arguments."""
     arguments."""
+    # FIXME: Deprecate!
     return default_app.with_default_connection(fun)
     return default_app.with_default_connection(fun)
 
 
 
 
@@ -160,4 +32,5 @@ def get_consumer_set(connection, queues=None, **options):
     Defaults to the queues in ``CELERY_QUEUES``.
     Defaults to the queues in ``CELERY_QUEUES``.
 
 
     """
     """
-    return default_app.get_consumer_set(connection, queues, **options)
+    # FIXME: Deprecate!
+    return default_app.amqp.get_consumer_set(connection, queues, **options)

+ 2 - 7
celery/result.py

@@ -41,13 +41,8 @@ class BaseAsyncResult(object):
         The workers will ignore the task if received.
         The workers will ignore the task if received.
 
 
         """
         """
-
-        def _do_revoke(connection=None, connect_timeout=None):
-            from celery.task import control
-            control.revoke(self.task_id, connection=connection,
-                           connect_timeout=connect_timeout)
-        self.app.with_default_connection(_do_revoke)(
-                connection=connection, connect_timeout=connect_timeout)
+        self.app.control.revoke(self.task_id, connection=connection,
+                                connect_timeout=connect_timeout)
 
 
     def wait(self, timeout=None):
     def wait(self, timeout=None):
         """Wait for task, and return the result when it arrives.
         """Wait for task, and return the result when it arrives.

+ 537 - 414
celery/task/base.py

@@ -2,13 +2,13 @@ import sys
 import warnings
 import warnings
 
 
 from celery.app import default_app
 from celery.app import default_app
+from celery.datastructures import ExceptionInfo
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
-from celery.execute import apply_async, apply
-from celery.log import setup_task_logger
-from celery.messaging import TaskPublisher, TaskConsumer
+from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.registry import tasks
-from celery.result import BaseAsyncResult, EagerResult
+from celery.result import EagerResult
 from celery.schedules import maybe_schedule
 from celery.schedules import maybe_schedule
+from celery.utils import mattrgetter, gen_unique_id, fun_takes_kwargs
 from celery.utils.timeutils import timedelta_seconds
 from celery.utils.timeutils import timedelta_seconds
 
 
 from celery.task.sets import TaskSet, subtask
 from celery.task.sets import TaskSet, subtask
@@ -25,7 +25,10 @@ Please use the CELERYBEAT_SCHEDULE setting instead:
     }
     }
 
 
 """
 """
-
+extract_exec_options = mattrgetter("queue", "routing_key",
+                                   "exchange", "immediate",
+                                   "mandatory", "priority",
+                                   "serializer", "delivery_mode")
 
 
 def _unpickle_task(name):
 def _unpickle_task(name):
     return tasks[name]
     return tasks[name]
@@ -69,518 +72,638 @@ class TaskType(type):
         return tasks[task_name].__class__
         return tasks[task_name].__class__
 
 
 
 
-def create_task_cls(app):
+class BaseTask(object):
+    """A celery task.
+
+    All subclasses of :class:`Task` must define the :meth:`run` method,
+    which is the actual method the ``celery`` daemon executes.
+
+    The :meth:`run` method can take use of the default keyword arguments,
+    as listed in the :meth:`run` documentation.
 
 
-    class Task(object):
-        """A celery task.
+    The resulting class is callable, which if called will apply the
+    :meth:`run` method.
 
 
-        All subclasses of :class:`Task` must define the :meth:`run` method,
-        which is the actual method the ``celery`` daemon executes.
+    .. attribute:: app
 
 
-        The :meth:`run` method can take use of the default keyword arguments,
-        as listed in the :meth:`run` documentation.
+        The application instance associated with this task class.
 
 
-        The resulting class is callable, which if called will apply the
-        :meth:`run` method.
+    .. attribute:: name
 
 
-        .. attribute:: name
+        Name of the task.
 
 
-            Name of the task.
+    .. attribute:: abstract
 
 
-        .. attribute:: abstract
+        If ``True`` the task is an abstract base class.
 
 
-            If ``True`` the task is an abstract base class.
+    .. attribute:: type
 
 
-        .. attribute:: type
+        The type of task, currently unused.
 
 
-            The type of task, currently unused.
+    .. attribute:: queue
 
 
-        .. attribute:: queue
+        Select a destination queue for this task. The queue needs to exist
+        in ``CELERY_QUEUES``. The ``routing_key``, ``exchange`` and
+        ``exchange_type`` attributes will be ignored if this is set.
 
 
-            Select a destination queue for this task. The queue needs to exist
-            in ``CELERY_QUEUES``. The ``routing_key``, ``exchange`` and
-            ``exchange_type`` attributes will be ignored if this is set.
+    .. attribute:: routing_key
 
 
-        .. attribute:: routing_key
+        Override the global default ``routing_key`` for this task.
 
 
-            Override the global default ``routing_key`` for this task.
+    .. attribute:: exchange
 
 
-        .. attribute:: exchange
+        Override the global default ``exchange`` for this task.
 
 
-            Override the global default ``exchange`` for this task.
+    .. attribute:: exchange_type
 
 
-        .. attribute:: exchange_type
+        Override the global default exchange type for this task.
 
 
-            Override the global default exchange type for this task.
+    .. attribute:: delivery_mode
 
 
-        .. attribute:: delivery_mode
+        Override the global default delivery mode for this task.
+        By default this is set to ``2`` (persistent). You can change this
+        to ``1`` to get non-persistent behavior, which means the messages
+        are lost if the broker is restarted.
 
 
-            Override the global default delivery mode for this task.
-            By default this is set to ``2`` (persistent). You can change this
-            to ``1`` to get non-persistent behavior, which means the messages
-            are lost if the broker is restarted.
+    .. attribute:: mandatory
 
 
-        .. attribute:: mandatory
+        Mandatory message routing. An exception will be raised if the task
+        can't be routed to a queue.
 
 
-            Mandatory message routing. An exception will be raised if the task
-            can't be routed to a queue.
+    .. attribute:: immediate:
 
 
-        .. attribute:: immediate:
+        Request immediate delivery. An exception will be raised if the task
+        can't be routed to a worker immediately.
 
 
-            Request immediate delivery. An exception will be raised if the task
-            can't be routed to a worker immediately.
+    .. attribute:: priority:
 
 
-        .. attribute:: priority:
+        The message priority. A number from ``0`` to ``9``, where ``0``
+        is the highest. Note that RabbitMQ doesn't support priorities yet.
 
 
-            The message priority. A number from ``0`` to ``9``, where ``0``
-            is the highest. Note that RabbitMQ doesn't support priorities yet.
+    .. attribute:: max_retries
 
 
-        .. attribute:: max_retries
+        Maximum number of retries before giving up.
+        If set to ``None``, it will never stop retrying.
 
 
-            Maximum number of retries before giving up.
-            If set to ``None``, it will never stop retrying.
+    .. attribute:: default_retry_delay
 
 
-        .. attribute:: default_retry_delay
+        Default time in seconds before a retry of the task should be
+        executed. Default is a 3 minute delay.
 
 
-            Default time in seconds before a retry of the task should be
-            executed. Default is a 3 minute delay.
+    .. attribute:: rate_limit
 
 
-        .. attribute:: rate_limit
+        Set the rate limit for this task type, Examples: ``None`` (no rate
+        limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
+        tasks a minute), ``"100/h"`` (hundred tasks an hour)
 
 
-            Set the rate limit for this task type, Examples: ``None`` (no rate
-            limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
-            tasks a minute), ``"100/h"`` (hundred tasks an hour)
+    .. attribute:: ignore_result
 
 
-        .. attribute:: ignore_result
+        Don't store the return value of this task.
 
 
-            Don't store the return value of this task.
+    .. attribute:: store_errors_even_if_ignored
 
 
-        .. attribute:: store_errors_even_if_ignored
+        If true, errors will be stored even if the task is configured
+        to ignore results.
 
 
-            If true, errors will be stored even if the task is configured
-            to ignore results.
+    .. attribute:: send_error_emails
 
 
-        .. attribute:: send_error_emails
+        If true, an e-mail will be sent to the admins whenever
+        a task of this type raises an exception.
 
 
-            If true, an e-mail will be sent to the admins whenever
-            a task of this type raises an exception.
+    .. attribute:: error_whitelist
 
 
-        .. attribute:: error_whitelist
+        List of exception types to send error e-mails for.
 
 
-            List of exception types to send error e-mails for.
+    .. attribute:: serializer
 
 
-        .. attribute:: serializer
+        The name of a serializer that has been registered with
+        :mod:`carrot.serialization.registry`. Example: ``"json"``.
 
 
-            The name of a serializer that has been registered with
-            :mod:`carrot.serialization.registry`. Example: ``"json"``.
+    .. attribute:: backend
 
 
-        .. attribute:: backend
+        The result store backend used for this task.
 
 
-            The result store backend used for this task.
+    .. attribute:: autoregister
 
 
-        .. attribute:: autoregister
+        If ``True`` the task is automatically registered in the task
+        registry, which is the default behaviour.
 
 
-            If ``True`` the task is automatically registered in the task
-            registry, which is the default behaviour.
+    .. attribute:: track_started
 
 
-        .. attribute:: track_started
+        If ``True`` the task will report its status as "started"
+        when the task is executed by a worker.
+        The default value is ``False`` as the normal behaviour is to not
+        report that level of granularity. Tasks are either pending,
+        finished, or waiting to be retried.
 
 
-            If ``True`` the task will report its status as "started"
-            when the task is executed by a worker.
-            The default value is ``False`` as the normal behaviour is to not
-            report that level of granularity. Tasks are either pending,
-            finished, or waiting to be retried.
+        Having a "started" status can be useful for when there are long
+        running tasks and there is a need to report which task is
+        currently running.
 
 
-            Having a "started" status can be useful for when there are long
-            running tasks and there is a need to report which task is
-            currently running.
+        The global default can be overridden with the
+        ``CELERY_TRACK_STARTED`` setting.
 
 
-            The global default can be overridden with the
-            ``CELERY_TRACK_STARTED`` setting.
+    .. attribute:: acks_late
 
 
-        .. attribute:: acks_late
+        If set to ``True`` messages for this task will be acknowledged
+        **after** the task has been executed, not *just before*, which is
+        the default behavior.
 
 
-            If set to ``True`` messages for this task will be acknowledged
-            **after** the task has been executed, not *just before*, which is
-            the default behavior.
+        Note that this means the task may be executed twice if the worker
+        crashes in the middle of execution, which may be acceptable for some
+        applications.
 
 
-            Note that this means the task may be executed twice if the worker
-            crashes in the middle of execution, which may be acceptable for some
-            applications.
+        The global default can be overriden by the ``CELERY_ACKS_LATE``
+        setting.
 
 
-            The global default can be overriden by the ``CELERY_ACKS_LATE``
-            setting.
+    """
+    __metaclass__ = TaskType
+
+    app = None
+    name = None
+    abstract = True
+    autoregister = True
+    type = "regular"
+
+    queue = None
+    routing_key = None
+    exchange = None
+    exchange_type = None
+    delivery_mode = None
+    immediate = False
+    mandatory = False
+    priority = None
+
+    ignore_result = False
+    store_errors_even_if_ignored = False
+    send_error_emails = False
+    error_whitelist = ()
+    disable_error_emails = False # FIXME
+    max_retries = 3
+    default_retry_delay = 3 * 60
+    serializer = "pickle"
+    rate_limit = None
+    backend = None
+    track_started = False
+    acks_late = False
+
+    MaxRetriesExceededError = MaxRetriesExceededError
+
+    def __call__(self, *args, **kwargs):
+        return self.run(*args, **kwargs)
+
+    def __reduce__(self):
+        return (_unpickle_task, (self.name, ), None)
+
+    def run(self, *args, **kwargs):
+        """The body of the task executed by the worker.
+
+        The following standard keyword arguments are reserved and is
+        automatically passed by the worker if the function/method
+        supports them:
+
+            * task_id
+            * task_name
+            * task_retries
+            * task_is_eager
+            * logfile
+            * loglevel
+            * delivery_info
+
+        Additional standard keyword arguments may be added in the future.
+        To take these default arguments, the task can either list the ones
+        it wants explicitly or just take an arbitrary list of keyword
+        arguments (\*\*kwargs).
 
 
         """
         """
-        __metaclass__ = TaskType
+        raise NotImplementedError("Tasks must define the run method.")
 
 
-        name = None
-        abstract = True
-        autoregister = True
-        type = "regular"
+    @classmethod
+    def get_logger(self, loglevel=None, logfile=None, **kwargs):
+        """Get task-aware logger object.
 
 
-        queue = None
-        routing_key = None
-        exchange = None
-        exchange_type = app.conf.CELERY_DEFAULT_EXCHANGE_TYPE
-        delivery_mode = app.conf.CELERY_DEFAULT_DELIVERY_MODE
-        immediate = False
-        mandatory = False
-        priority = None
+        See :func:`celery.log.setup_task_logger`.
 
 
-        ignore_result = app.conf.CELERY_IGNORE_RESULT
-        store_errors_even_if_ignored = \
-                app.conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
-        send_error_emails = app.conf.CELERY_SEND_TASK_ERROR_EMAILS
-        error_whitelist = app.conf.CELERY_TASK_ERROR_WHITELIST
-        disable_error_emails = False # FIXME
-        max_retries = 3
-        default_retry_delay = 3 * 60
-        serializer = app.conf.CELERY_TASK_SERIALIZER
-        rate_limit = app.conf.CELERY_DEFAULT_RATE_LIMIT
-        backend = app.backend
-        track_started = app.conf.CELERY_TRACK_STARTED
-        acks_late = app.conf.CELERY_ACKS_LATE
+        """
+        return self.app.log.setup_task_logger(loglevel=loglevel,
+                                              logfile=logfile,
+                                              task_kwargs=kwargs)
 
 
-        MaxRetriesExceededError = MaxRetriesExceededError
-
-        def __call__(self, *args, **kwargs):
-            return self.run(*args, **kwargs)
+    @classmethod
+    def establish_connection(self, connect_timeout=None):
+        """Establish a connection to the message broker."""
+        return self.app.broker_connection(connect_timeout=connect_timeout)
 
 
-        def __reduce__(self):
-            return (_unpickle_task, (self.name, ), None)
+    @classmethod
+    def get_publisher(self, connection=None, exchange=None,
+            connect_timeout=None, exchange_type=None):
+        """Get a celery task message publisher.
 
 
-        def run(self, *args, **kwargs):
-            """The body of the task executed by the worker.
-
-            The following standard keyword arguments are reserved and is
-            automatically passed by the worker if the function/method
-            supports them:
-
-                * task_id
-                * task_name
-                * task_retries
-                * task_is_eager
-                * logfile
-                * loglevel
-                * delivery_info
+        :rtype :class:`~celery.app.amqp.TaskPublisher`:
 
 
-            Additional standard keyword arguments may be added in the future.
-            To take these default arguments, the task can either list the ones
-            it wants explicitly or just take an arbitrary list of keyword
-            arguments (\*\*kwargs).
-
-            """
-            raise NotImplementedError("Tasks must define the run method.")
-
-        @classmethod
-        def get_logger(self, loglevel=None, logfile=None, **kwargs):
-            """Get task-aware logger object.
-
-            See :func:`celery.log.setup_task_logger`.
-
-            """
-            return setup_task_logger(loglevel=loglevel, logfile=logfile,
-                                     task_kwargs=kwargs)
-
-        @classmethod
-        def establish_connection(self,
-            connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT):
-            """Establish a connection to the message broker."""
-            return app.broker_connection(connect_timeout=connect_timeout)
-
-        @classmethod
-        def get_publisher(self, connection=None, exchange=None,
-                connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT,
-                exchange_type=None):
-            """Get a celery task message publisher.
-
-            :rtype :class:`celery.messaging.TaskPublisher`:
-
-            Please be sure to close the AMQP connection when you're done
-            with this object, i.e.:
-
-                >>> publisher = self.get_publisher()
-                >>> # do something with publisher
-                >>> publisher.connection.close()
-
-            """
-            if exchange is None:
-                exchange = self.exchange
-            if exchange_type is None:
-                exchange_type = self.exchange_type
-            connection = connection or \
-                    self.establish_connection(connect_timeout)
-            return TaskPublisher(connection=connection,
-                                 exchange=exchange,
-                                 exchange_type=exchange_type,
-                                 routing_key=self.routing_key)
-
-        @classmethod
-        def get_consumer(self, connection=None,
-            connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT):
-            """Get a celery task message consumer.
-
-            :rtype :class:`celery.messaging.TaskConsumer`:
-
-            Please be sure to close the AMQP connection when you're done
-            with this object. i.e.:
-
-                >>> consumer = self.get_consumer()
-                >>> # do something with consumer
-                >>> consumer.connection.close()
-
-            """
-            connection = connection or \
-                         self.establish_connection(connect_timeout)
-            return TaskConsumer(connection=connection,
-                                exchange=self.exchange,
-                                routing_key=self.routing_key)
-
-        @classmethod
-        def delay(self, *args, **kwargs):
-            """Shortcut to :meth:`apply_async`, with star arguments,
-            but doesn't support the extra options.
-
-            :param \*args: positional arguments passed on to the task.
-            :param \*\*kwargs: keyword arguments passed on to the task.
-
-            :returns :class:`celery.result.AsyncResult`:
-
-            """
-            return self.apply_async(args, kwargs)
-
-        @classmethod
-        def apply_async(self, args=None, kwargs=None, **options):
-            """Delay this task for execution by the ``celery`` daemon(s).
-
-            :param args: positional arguments passed on to the task.
-            :param kwargs: keyword arguments passed on to the task.
-            :keyword \*\*options: Any keyword arguments to pass on to
-                :func:`celery.execute.apply_async`.
-
-            See :func:`celery.execute.apply_async` for more information.
-
-            :returns :class:`celery.result.AsyncResult`:
-
-            """
-            conn = None
-            if not options.get("connection") or options.get("publisher"):
-                conn = options["connection"] = self.establish_connection(
-                            connect_timeout=options.get("connect_timeout"))
-            try:
-                return apply_async(self, args, kwargs, **options)
-            finally:
-                if conn:
-                    conn.close()
-
-        @classmethod
-        def retry(self, args=None, kwargs=None, exc=None, throw=True,
-                **options):
-            """Retry the task.
-
-            :param args: Positional arguments to retry with.
-            :param kwargs: Keyword arguments to retry with.
-            :keyword exc: Optional exception to raise instead of
-                :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
-                restart limit has been exceeded.
-            :keyword countdown: Time in seconds to delay the retry for.
-            :keyword eta: Explicit time and date to run the retry at
-            (must be a :class:`~datetime.datetime` instance).
-            :keyword \*\*options: Any extra options to pass on to
-                meth:`apply_async`. See :func:`celery.execute.apply_async`.
-            :keyword throw: If this is ``False``, do not raise the
-                :exc:`~celery.exceptions.RetryTaskError` exception,
-                that tells the worker to mark the task as being retried.
-                Note that this means the task will be marked as failed
-                if the task raises an exception, or successful if it
-                returns.
-
-            :raises celery.exceptions.RetryTaskError: To tell the worker that
-                the task has been re-sent for retry. This always happens,
-                unless the ``throw`` keyword argument has been explicitly set
-                to ``False``, and is considered normal operation.
-
-            Example
-
-                >>> class TwitterPostStatusTask(Task):
-                ...
-                ...     def run(self, username, password, message, **kwargs):
-                ...         twitter = Twitter(username, password)
-                ...         try:
-                ...             twitter.post_status(message)
-                ...         except twitter.FailWhale, exc:
-                ...             # Retry in 5 minutes.
-                ...             self.retry([username, password, message],
-                ...                        kwargs,
-                ...                        countdown=60 * 5, exc=exc)
-
-            """
-            if not kwargs:
-                raise TypeError(
-                        "kwargs argument to retries can't be empty. "
-                        "Task must accept **kwargs, see http://bit.ly/cAx3Bg")
-
-            delivery_info = kwargs.pop("delivery_info", {})
-            options.setdefault("exchange", delivery_info.get("exchange"))
-            options.setdefault("routing_key", delivery_info.get("routing_key"))
-
-            options["retries"] = kwargs.pop("task_retries", 0) + 1
-            options["task_id"] = kwargs.pop("task_id", None)
-            options["countdown"] = options.get("countdown",
-                                            self.default_retry_delay)
-            max_exc = exc or self.MaxRetriesExceededError(
-                    "Can't retry %s[%s] args:%s kwargs:%s" % (
-                        self.name, options["task_id"], args, kwargs))
-            max_retries = self.max_retries
-            if max_retries is not None and options["retries"] > max_retries:
-                raise max_exc
-
-            # If task was executed eagerly using apply(),
-            # then the retry must also be executed eagerly.
-            if kwargs.get("task_is_eager", False):
-                result = self.apply(args=args, kwargs=kwargs, **options)
-                if isinstance(result, EagerResult):
-                    return result.get() # propogates exceptions.
-                return result
-
-            self.apply_async(args=args, kwargs=kwargs, **options)
+        Please be sure to close the AMQP connection when you're done
+        with this object, i.e.:
 
 
-            if throw:
-                message = "Retry in %d seconds." % options["countdown"]
-                raise RetryTaskError(message, exc)
+            >>> publisher = self.get_publisher()
+            >>> # do something with publisher
+            >>> publisher.connection.close()
 
 
-        @classmethod
-        def apply(self, args=None, kwargs=None, **options):
-            """Execute this task locally, by blocking until the task
-            has finished executing.
+        """
+        if exchange is None:
+            exchange = self.exchange
+        if exchange_type is None:
+            exchange_type = self.exchange_type
+        connection = connection or self.establish_connection(connect_timeout)
+        return self.app.amqp.TaskPublisher(connection=connection,
+                                           exchange=exchange,
+                                           exchange_type=exchange_type,
+                                           routing_key=self.routing_key)
 
 
-            :param args: positional arguments passed on to the task.
-            :param kwargs: keyword arguments passed on to the task.
-            :keyword throw: Re-raise task exceptions. Defaults to
-                the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
+    @classmethod
+    def get_consumer(self, connection=None, connect_timeout=None):
+        """Get a celery task message consumer.
 
 
-            :rtype :class:`celery.result.EagerResult`:
+        :rtype :class:`~celery.app.amqp.TaskConsumer`:
 
 
-            See :func:`celery.execute.apply`.
+        Please be sure to close the AMQP connection when you're done
+        with this object. i.e.:
 
 
-            """
-            return apply(self, args, kwargs, **options)
+            >>> consumer = self.get_consumer()
+            >>> # do something with consumer
+            >>> consumer.connection.close()
 
 
-        @classmethod
-        def AsyncResult(self, task_id):
-            """Get AsyncResult instance for this kind of task.
+        """
+        connection = connection or self.establish_connection(connect_timeout)
+        return self.app.amqp.TaskConsumer(connection=connection,
+                                          exchange=self.exchange,
+                                          routing_key=self.routing_key)
 
 
-            :param task_id: Task id to get result for.
+    @classmethod
+    def delay(self, *args, **kwargs):
+        """Shortcut to :meth:`apply_async`, with star arguments,
+        but doesn't support the extra options.
 
 
-            """
-            return BaseAsyncResult(task_id,
-                                   backend=self.backend, app=self.app)
+        :param \*args: positional arguments passed on to the task.
+        :param \*\*kwargs: keyword arguments passed on to the task.
 
 
-        def on_retry(self, exc, task_id, args, kwargs, einfo=None):
-            """Retry handler.
+        :returns :class:`celery.result.AsyncResult`:
 
 
-            This is run by the worker when the task is to be retried.
+        """
+        return self.apply_async(args, kwargs)
+
+    @classmethod
+    def apply_async(self, args=None, kwargs=None, countdown=None,
+            eta=None, task_id=None, publisher=None, connection=None,
+            connect_timeout=None, router=None, expires=None, queues=None,
+            **options):
+        """Run a task asynchronously by the celery daemon(s).
+
+        :keyword args: The positional arguments to pass on to the
+            task (a :class:`list` or :class:`tuple`).
+
+        :keyword kwargs: The keyword arguments to pass on to the
+            task (a :class:`dict`)
+
+        :keyword countdown: Number of seconds into the future that the
+            task should execute. Defaults to immediate delivery (Do not
+            confuse that with the ``immediate`` setting, they are
+            unrelated).
+
+        :keyword eta: A :class:`~datetime.datetime` object that describes
+            the absolute time and date of when the task should execute.
+            May not be specified if ``countdown`` is also supplied. (Do
+            not confuse this with the ``immediate`` setting, they are
+            unrelated).
+
+        :keyword expires: Either a :class:`int`, describing the number of
+            seconds, or a :class:`~datetime.datetime` object that
+            describes the absolute time and date of when the task should
+            expire. The task will not be executed after the
+            expiration time.
+
+        :keyword connection: Re-use existing broker connection instead
+            of establishing a new one. The ``connect_timeout`` argument
+            is not respected if this is set.
+
+        :keyword connect_timeout: The timeout in seconds, before we give
+            up on establishing a connection to the AMQP server.
+
+        :keyword routing_key: The routing key used to route the task to a
+            worker server. Defaults to the tasks
+            :attr:`routing_key` attribute.
+
+        :keyword exchange: The named exchange to send the task to.
+            Defaults to the tasks :attr:`exchange` attribute.
+
+        :keyword exchange_type: The exchange type to initalize the
+            exchange if not already declared. Defaults to the tasks
+            :attr:`exchange_type` attribute.
+
+        :keyword immediate: Request immediate delivery. Will raise an
+            exception if the task cannot be routed to a worker
+            immediately.  (Do not confuse this parameter with
+            the ``countdown`` and ``eta`` settings, as they are
+            unrelated). Defaults to the tasks :attr:`immediate` attribute.
+
+        :keyword mandatory: Mandatory routing. Raises an exception if
+            there's no running workers able to take on this task.
+            Defaults to the tasks :attr:`mandatory` attribute.
+
+        :keyword priority: The task priority, a number between 0 and 9.
+            Defaults to the tasks :attr:`priority` attribute.
+
+        :keyword serializer: A string identifying the default
+            serialization method to use. Defaults to the
+            ``CELERY_TASK_SERIALIZER`` setting. Can be ``pickle``,
+            ``json``, ``yaml``, or any custom serialization method
+            that has been registered with
+            :mod:`carrot.serialization.registry`. Defaults to the tasks
+            :attr:`serializer` attribute.
+
+        **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will
+            be replaced by a local :func:`apply` call instead.
 
 
-            :param exc: The exception sent to :meth:`retry`.
-            :param task_id: Unique id of the retried task.
-            :param args: Original arguments for the retried task.
-            :param kwargs: Original keyword arguments for the retried task.
+        """
+        router = self.app.amqp.Router(queues)
+
+        if self.app.conf.CELERY_ALWAYS_EAGER:
+            return apply(self, args, kwargs, task_id=task_id)
+
+        options = dict(extract_exec_options(self), **options)
+        options = router.route(options, self.name, args, kwargs)
+        exchange = options.get("exchange")
+        exchange_type = options.get("exchange_type")
+
+        publish = publisher or self.get_publisher(connection,
+                                                  exchange=exchange,
+                                                  exchange_type=exchange_type)
+        try:
+            task_id = publish.delay_task(self.name, args, kwargs,
+                                         task_id=task_id,
+                                         countdown=countdown,
+                                         eta=eta, expires=expires,
+                                         **options)
+        finally:
+            publisher or publish.close()
+            if not connection:
+                # close automatically created connection
+                publish.connection.close()
+
+        return self.AsyncResult(task_id)
+
+    @classmethod
+    def retry(self, args=None, kwargs=None, exc=None, throw=True,
+            **options):
+        """Retry the task.
+
+        :param args: Positional arguments to retry with.
+        :param kwargs: Keyword arguments to retry with.
+        :keyword exc: Optional exception to raise instead of
+            :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
+            restart limit has been exceeded.
+        :keyword countdown: Time in seconds to delay the retry for.
+        :keyword eta: Explicit time and date to run the retry at
+        (must be a :class:`~datetime.datetime` instance).
+        :keyword \*\*options: Any extra options to pass on to
+            meth:`apply_async`. See :func:`celery.execute.apply_async`.
+        :keyword throw: If this is ``False``, do not raise the
+            :exc:`~celery.exceptions.RetryTaskError` exception,
+            that tells the worker to mark the task as being retried.
+            Note that this means the task will be marked as failed
+            if the task raises an exception, or successful if it
+            returns.
+
+        :raises celery.exceptions.RetryTaskError: To tell the worker that
+            the task has been re-sent for retry. This always happens,
+            unless the ``throw`` keyword argument has been explicitly set
+            to ``False``, and is considered normal operation.
+
+        Example
+
+            >>> class TwitterPostStatusTask(Task):
+            ...
+            ...     def run(self, username, password, message, **kwargs):
+            ...         twitter = Twitter(username, password)
+            ...         try:
+            ...             twitter.post_status(message)
+            ...         except twitter.FailWhale, exc:
+            ...             # Retry in 5 minutes.
+            ...             self.retry([username, password, message],
+            ...                        kwargs,
+            ...                        countdown=60 * 5, exc=exc)
 
 
-            :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
-            instance, containing the traceback.
+        """
+        if not kwargs:
+            raise TypeError(
+                    "kwargs argument to retries can't be empty. "
+                    "Task must accept **kwargs, see http://bit.ly/cAx3Bg")
+
+        delivery_info = kwargs.pop("delivery_info", {})
+        options.setdefault("exchange", delivery_info.get("exchange"))
+        options.setdefault("routing_key", delivery_info.get("routing_key"))
+
+        options["retries"] = kwargs.pop("task_retries", 0) + 1
+        options["task_id"] = kwargs.pop("task_id", None)
+        options["countdown"] = options.get("countdown",
+                                        self.default_retry_delay)
+        max_exc = exc or self.MaxRetriesExceededError(
+                "Can't retry %s[%s] args:%s kwargs:%s" % (
+                    self.name, options["task_id"], args, kwargs))
+        max_retries = self.max_retries
+        if max_retries is not None and options["retries"] > max_retries:
+            raise max_exc
+
+        # If task was executed eagerly using apply(),
+        # then the retry must also be executed eagerly.
+        if kwargs.get("task_is_eager", False):
+            result = self.apply(args=args, kwargs=kwargs, **options)
+            if isinstance(result, EagerResult):
+                return result.get() # propogates exceptions.
+            return result
+
+        self.apply_async(args=args, kwargs=kwargs, **options)
+
+        if throw:
+            message = "Retry in %d seconds." % options["countdown"]
+            raise RetryTaskError(message, exc)
+
+    @classmethod
+    def apply(self, args=None, kwargs=None, **options):
+        """Execute this task locally, by blocking until the task
+        returns.
+
+        :param args: positional arguments passed on to the task.
+        :param kwargs: keyword arguments passed on to the task.
+        :keyword throw: Re-raise task exceptions. Defaults to
+            the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
+
+        :rtype :class:`celery.result.EagerResult`:
+
+        See :func:`celery.execute.apply`.
 
 
-            The return value of this handler is ignored.
+        """
+        args = args or []
+        kwargs = kwargs or {}
+        task_id = options.get("task_id") or gen_unique_id()
+        retries = options.get("retries", 0)
+        throw = self.app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
+                                options.pop("throw", None))
+
+        task = tasks[self.name] # Make sure we get the instance, not class.
+
+        default_kwargs = {"task_name": task.name,
+                          "task_id": task_id,
+                          "task_retries": retries,
+                          "task_is_eager": True,
+                          "logfile": options.get("logfile"),
+                          "delivery_info": {"is_eager": True},
+                          "loglevel": options.get("loglevel", 0)}
+        supported_keys = fun_takes_kwargs(task.run, default_kwargs)
+        extend_with = dict((key, val) for key, val in default_kwargs.items()
+                            if key in supported_keys)
+        kwargs.update(extend_with)
+
+        trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
+        retval = trace.execute()
+        if isinstance(retval, ExceptionInfo):
+            if throw:
+                raise retval.exception
+            retval = retval.exception
+        return EagerResult(task_id, retval, trace.status,
+                           traceback=trace.strtb)
 
 
-            """
-            pass
+    @classmethod
+    def AsyncResult(self, task_id):
+        """Get AsyncResult instance for this kind of task.
 
 
-        def after_return(self, status, retval, task_id, args,
-                kwargs, einfo=None):
-            """Handler called after the task returns.
+        :param task_id: Task id to get result for.
 
 
-            :param status: Current task state.
-            :param retval: Task return value/exception.
-            :param task_id: Unique id of the task.
-            :param args: Original arguments for the task that failed.
-            :param kwargs: Original keyword arguments for the task
-              that failed.
+        """
+        return self.app.AsyncResult(task_id, backend=self.backend)
 
 
-            :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
-            instance, containing the traceback (if any).
+    def on_retry(self, exc, task_id, args, kwargs, einfo=None):
+        """Retry handler.
 
 
-            The return value of this handler is ignored.
+        This is run by the worker when the task is to be retried.
 
 
-            """
-            pass
+        :param exc: The exception sent to :meth:`retry`.
+        :param task_id: Unique id of the retried task.
+        :param args: Original arguments for the retried task.
+        :param kwargs: Original keyword arguments for the retried task.
 
 
-        def on_failure(self, exc, task_id, args, kwargs, einfo=None):
-            """Error handler.
+        :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
+        instance, containing the traceback.
 
 
-            This is run by the worker when the task fails.
+        The return value of this handler is ignored.
 
 
-            :param exc: The exception raised by the task.
-            :param task_id: Unique id of the failed task.
-            :param args: Original arguments for the task that failed.
-            :param kwargs: Original keyword arguments for the task
-              that failed.
+        """
+        pass
 
 
-            :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
-                instance, containing the traceback.
+    def after_return(self, status, retval, task_id, args,
+            kwargs, einfo=None):
+        """Handler called after the task returns.
 
 
-            The return value of this handler is ignored.
+        :param status: Current task state.
+        :param retval: Task return value/exception.
+        :param task_id: Unique id of the task.
+        :param args: Original arguments for the task that failed.
+        :param kwargs: Original keyword arguments for the task
+            that failed.
 
 
-            """
-            pass
+        :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
+        instance, containing the traceback (if any).
 
 
-        def on_success(self, retval, task_id, args, kwargs):
-            """Success handler.
+        The return value of this handler is ignored.
 
 
-            Run by the worker if the task executes successfully.
+        """
+        pass
+
+    def on_failure(self, exc, task_id, args, kwargs, einfo=None):
+        """Error handler.
+
+        This is run by the worker when the task fails.
+
+        :param exc: The exception raised by the task.
+        :param task_id: Unique id of the failed task.
+        :param args: Original arguments for the task that failed.
+        :param kwargs: Original keyword arguments for the task
+            that failed.
+
+        :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
+            instance, containing the traceback.
+
+        The return value of this handler is ignored.
+
+        """
+        pass
+
+    def on_success(self, retval, task_id, args, kwargs):
+        """Success handler.
+
+        Run by the worker if the task executes successfully.
+
+        :param retval: The return value of the task.
+        :param task_id: Unique id of the executed task.
+        :param args: Original arguments for the executed task.
+        :param kwargs: Original keyword arguments for the executed task.
+
+        The return value of this handler is ignored.
+
+        """
+        pass
 
 
-            :param retval: The return value of the task.
-            :param task_id: Unique id of the executed task.
-            :param args: Original arguments for the executed task.
-            :param kwargs: Original keyword arguments for the executed task.
+    def execute(self, wrapper, pool, loglevel, logfile):
+        """The method the worker calls to execute the task.
 
 
-            The return value of this handler is ignored.
+        :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
+        :param pool: A task pool.
+        :param loglevel: Current loglevel.
+        :param logfile: Name of the currently used logfile.
 
 
-            """
-            pass
+        """
+        wrapper.execute_using_pool(pool, loglevel, logfile)
 
 
-        def execute(self, wrapper, pool, loglevel, logfile):
-            """The method the worker calls to execute the task.
+    def __repr__(self):
+        """repr(task)"""
+        try:
+            kind = self.__class__.mro()[1].__name__
+        except (AttributeError, IndexError):
+            kind = "%s(Task)" % self.__class__.__name__
+        return "<%s: %s (%s)>" % (kind, self.name, self.type)
 
 
-            :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
-            :param pool: A task pool.
-            :param loglevel: Current loglevel.
-            :param logfile: Name of the currently used logfile.
+    @classmethod
+    def subtask(cls, *args, **kwargs):
+        """Returns a :class:`~celery.task.sets.subtask` object for
+        this task that wraps arguments and execution options
+        for a single task invocation."""
+        return subtask(cls, *args, **kwargs)
 
 
-            """
-            wrapper.execute_using_pool(pool, loglevel, logfile)
+    @property
+    def __name__(self):
+        return self.__class__.__name__
 
 
-        def __repr__(self):
-            """repr(task)"""
-            try:
-                kind = self.__class__.mro()[1].__name__
-            except (AttributeError, IndexError):
-                kind = "%s(Task)" % self.__class__.__name__
-            return "<%s: %s (%s)>" % (kind, self.name, self.type)
 
 
-        @classmethod
-        def subtask(cls, *args, **kwargs):
-            """Returns a :class:`~celery.task.sets.subtask` object for
-            this task that wraps arguments and execution options
-            for a single task invocation."""
-            return subtask(cls, *args, **kwargs)
+def create_task_cls(app):
+    apps = [app]
 
 
-        @property
-        def __name__(self):
-            return self.__class__.__name__
+    class Task(BaseTask):
+        app = apps[0]
+        backend = app.backend
+        exchange_type = app.conf.CELERY_DEFAULT_EXCHANGE_TYPE
+        delivery_mode = app.conf.CELERY_DEFAULT_DELIVERY_MODE
+        send_error_emails = app.conf.CELERY_SEND_TASK_ERROR_EMAILS
+        error_whitelist = app.conf.CELERY_TASK_ERROR_WHITELIST
+        serializer = app.conf.CELERY_TASK_SERIALIZER
+        rate_limit = app.conf.CELERY_DEFAULT_RATE_LIMIT
+        track_started = app.conf.CELERY_TRACK_STARTED
+        acks_late = app.conf.CELERY_ACKS_LATE
+        ignore_result = app.conf.CELERY_IGNORE_RESULT
+        store_errors_even_if_ignored = \
+                app.conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
 
 
-    Task.app = app
     return Task
     return Task
 
 
+
 Task = create_task_cls(default_app)
 Task = create_task_cls(default_app)
 
 
 
 

+ 4 - 4
celery/task/control.py

@@ -68,7 +68,7 @@ class Inspect(object):
 
 
 class Control(object):
 class Control(object):
 
 
-    def __init__(self, app=None):
+    def __init__(self, app):
         self.app = app
         self.app = app
 
 
     def inspect(self, destination=None, timeout=1, callback=None):
     def inspect(self, destination=None, timeout=1, callback=None):
@@ -86,7 +86,7 @@ class Control(object):
         """
         """
 
 
         def _do_discard(connection=None, connect_timeout=None):
         def _do_discard(connection=None, connect_timeout=None):
-            consumers = self.app.get_consumer_set(connection=connection)
+            consumers = self.app.amqp.get_consumer_set(connection=connection)
             try:
             try:
                 return consumers.discard_all()
                 return consumers.discard_all()
             finally:
             finally:
@@ -196,10 +196,10 @@ class Control(object):
 
 
         def _do_broadcast(connection=None, connect_timeout=None):
         def _do_broadcast(connection=None, connect_timeout=None):
 
 
-            broadcaster = BroadcastPublisher(connection)
+            broadcaster = BroadcastPublisher(connection, app=self.app)
             try:
             try:
                 broadcaster.send(command, arguments, destination=destination,
                 broadcaster.send(command, arguments, destination=destination,
-                               reply_ticket=reply_ticket)
+                                 reply_ticket=reply_ticket)
             finally:
             finally:
                 broadcaster.close()
                 broadcaster.close()
 
 

+ 2 - 2
celery/task/sets.py

@@ -183,7 +183,7 @@ class TaskSet(UserList):
             return self.apply()
             return self.apply()
 
 
         taskset_id = gen_unique_id()
         taskset_id = gen_unique_id()
-        publisher = TaskPublisher(connection=connection)
+        publisher = self.app.amqp.TaskPublisher(connection=connection)
         try:
         try:
             results = [task.apply_async(taskset_id=taskset_id,
             results = [task.apply_async(taskset_id=taskset_id,
                                         publisher=publisher)
                                         publisher=publisher)
@@ -191,7 +191,7 @@ class TaskSet(UserList):
         finally:
         finally:
             publisher.close()
             publisher.close()
 
 
-        return TaskSetResult(taskset_id, results, app=self.app)
+        return self.app.TaskSetResult(taskset_id, results)
 
 
     def apply(self):
     def apply(self):
         """Applies the taskset locally."""
         """Applies the taskset locally."""

+ 1 - 1
celery/tests/test_messaging.py → celery/tests/test_app_amqp.py

@@ -1,6 +1,6 @@
 import unittest2 as unittest
 import unittest2 as unittest
 
 
-from celery.messaging import MSG_OPTIONS, extract_msg_options
+from celery.app.amqp import MSG_OPTIONS, extract_msg_options
 
 
 
 
 class TestMsgOptions(unittest.TestCase):
 class TestMsgOptions(unittest.TestCase):

+ 12 - 6
celery/tests/test_task.py

@@ -6,7 +6,6 @@ from pyparsing import ParseException
 
 
 
 
 from celery import task
 from celery import task
-from celery import messaging
 from celery.app import default_app
 from celery.app import default_app
 from celery.task.schedules import crontab, crontab_parser
 from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import timeutils
@@ -200,9 +199,14 @@ class TestTaskRetries(unittest.TestCase):
 
 
 
 
 class MockPublisher(object):
 class MockPublisher(object):
+    _declared = False
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         self.kwargs = kwargs
         self.kwargs = kwargs
+        self.connection = default_app.broker_connection()
+
+    def declare(self):
+        self._declared = True
 
 
 
 
 class TestCeleryTasks(unittest.TestCase):
 class TestCeleryTasks(unittest.TestCase):
@@ -330,18 +334,20 @@ class TestCeleryTasks(unittest.TestCase):
         self.assertTrue(presult.successful())
         self.assertTrue(presult.successful())
 
 
         publisher = t1.get_publisher()
         publisher = t1.get_publisher()
-        self.assertIsInstance(publisher, messaging.TaskPublisher)
+        print("EXCHANGE IS: %r" % (task.Task.exchange, ))
+        self.assertTrue(publisher.exchange)
 
 
     def test_get_publisher(self):
     def test_get_publisher(self):
-        from celery.task import base
-        old_pub = base.TaskPublisher
-        base.TaskPublisher = MockPublisher
+        from celery.app import amqp
+        old_pub = amqp.TaskPublisher
+        amqp.TaskPublisher = MockPublisher
         try:
         try:
             p = IncrementCounterTask.get_publisher(exchange="foo",
             p = IncrementCounterTask.get_publisher(exchange="foo",
                                                    connection="bar")
                                                    connection="bar")
             self.assertEqual(p.kwargs["exchange"], "foo")
             self.assertEqual(p.kwargs["exchange"], "foo")
+            self.assertTrue(p._declared)
         finally:
         finally:
-            base.TaskPublisher = old_pub
+            amqp.TaskPublisher = old_pub
 
 
     def test_get_logger(self):
     def test_get_logger(self):
         T1 = self.createTaskCls("T1", "c.unittest.t.t1")
         T1 = self.createTaskCls("T1", "c.unittest.t.t1")

+ 5 - 5
celery/worker/__init__.py

@@ -9,11 +9,11 @@ import traceback
 from multiprocessing.util import Finalize
 from multiprocessing.util import Finalize
 
 
 from celery import beat
 from celery import beat
-from celery import log
 from celery import registry
 from celery import registry
 from celery import platform
 from celery import platform
 from celery import signals
 from celery import signals
 from celery.app import app_or_default
 from celery.app import app_or_default
+from celery.log import SilenceRepeated
 from celery.utils import noop, instantiate
 from celery.utils import noop, instantiate
 
 
 from celery.worker import state
 from celery.worker import state
@@ -66,13 +66,12 @@ class WorkController(object):
 
 
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         conf = self.app.conf
         conf = self.app.conf
-        queues = queues or self.app.get_queues()
 
 
         # Options
         # Options
         self.loglevel = loglevel or self.loglevel
         self.loglevel = loglevel or self.loglevel
         self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
         self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
         self.logfile = logfile or conf.CELERYD_LOG_FILE
         self.logfile = logfile or conf.CELERYD_LOG_FILE
-        self.logger = log.get_default_logger()
+        self.logger = self.app.log.get_default_logger()
         if send_events is None:
         if send_events is None:
             send_events = conf.CELERY_SEND_EVENTS
             send_events = conf.CELERY_SEND_EVENTS
         self.send_events = send_events
         self.send_events = send_events
@@ -98,8 +97,8 @@ class WorkController(object):
                                 conf.CELERYD_ETA_SCHEDULER_PRECISION
                                 conf.CELERYD_ETA_SCHEDULER_PRECISION
         self.prefetch_multiplier = prefetch_multiplier or \
         self.prefetch_multiplier = prefetch_multiplier or \
                                 conf.CELERYD_PREFETCH_MULTIPLIER
                                 conf.CELERYD_PREFETCH_MULTIPLIER
-        self.timer_debug = log.SilenceRepeated(self.logger.debug,
-                                               max_iterations=10)
+        self.timer_debug = SilenceRepeated(self.logger.debug,
+                                           max_iterations=10)
         self.db = db or conf.CELERYD_STATE_DB
         self.db = db or conf.CELERYD_STATE_DB
         self.disable_rate_limits = disable_rate_limits or \
         self.disable_rate_limits = disable_rate_limits or \
                                 conf.CELERY_DISABLE_RATE_LIMITS
                                 conf.CELERY_DISABLE_RATE_LIMITS
@@ -128,6 +127,7 @@ class WorkController(object):
                                 soft_timeout=self.task_soft_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 putlocks=self.pool_putlocks)
                                 putlocks=self.pool_putlocks)
         self.mediator = instantiate(self.mediator_cls, self.ready_queue,
         self.mediator = instantiate(self.mediator_cls, self.ready_queue,
+                                    app=self.app,
                                     callback=self.process_task,
                                     callback=self.process_task,
                                     logger=self.logger)
                                     logger=self.logger)
         self.scheduler = instantiate(self.eta_scheduler_cls,
         self.scheduler = instantiate(self.eta_scheduler_cls,

+ 3 - 3
celery/worker/control/__init__.py

@@ -1,4 +1,3 @@
-from celery import log
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.pidbox import ControlReplyPublisher
 from celery.pidbox import ControlReplyPublisher
 from celery.utils import kwdict
 from celery.utils import kwdict
@@ -13,10 +12,11 @@ class ControlDispatch(object):
 
 
     def __init__(self, logger=None, hostname=None, listener=None, app=None):
     def __init__(self, logger=None, hostname=None, listener=None, app=None):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
-        self.logger = logger or log.get_default_logger()
+        self.logger = logger or self.app.log.get_default_logger()
         self.hostname = hostname
         self.hostname = hostname
         self.listener = listener
         self.listener = listener
-        self.panel = self.Panel(self.logger, self.listener, self.hostname)
+        self.panel = self.Panel(self.logger, self.listener, self.hostname,
+                                app=self.app)
 
 
     def reply(self, data, exchange, routing_key, **kwargs):
     def reply(self, data, exchange, routing_key, **kwargs):
 
 

+ 2 - 3
celery/worker/control/builtins.py

@@ -1,6 +1,5 @@
 from datetime import datetime
 from datetime import datetime
 
 
-from celery import log
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.utils import timeutils, LOG_LEVELS
 from celery.utils import timeutils, LOG_LEVELS
 from celery.worker import state
 from celery.worker import state
@@ -25,7 +24,7 @@ def diagnose(panel, timeout=None, **kwargs):
 @Panel.register
 @Panel.register
 def revoke(panel, task_id, task_name=None, **kwargs):
 def revoke(panel, task_id, task_name=None, **kwargs):
     """Revoke task by task id."""
     """Revoke task by task id."""
-    app = panel.listener.app
+    app = panel.app
     revoked.add(task_id)
     revoked.add(task_id)
     backend = app.backend
     backend = app.backend
     if task_name: # Use custom task backend (if any)
     if task_name: # Use custom task backend (if any)
@@ -65,7 +64,7 @@ def set_loglevel(panel, loglevel=None):
     if loglevel is not None:
     if loglevel is not None:
         if not isinstance(loglevel, int):
         if not isinstance(loglevel, int):
             loglevel = LOG_LEVELS[loglevel.upper()]
             loglevel = LOG_LEVELS[loglevel.upper()]
-        log.get_default_logger(loglevel=loglevel)
+        panel.app.log.get_default_logger(loglevel=loglevel)
     return {"ok": loglevel}
     return {"ok": loglevel}
 
 
 
 

+ 4 - 1
celery/worker/control/registry.py

@@ -1,10 +1,13 @@
 from UserDict import UserDict
 from UserDict import UserDict
 
 
+from celery.app import app_or_default
+
 
 
 class Panel(UserDict):
 class Panel(UserDict):
     data = dict() # Global registry.
     data = dict() # Global registry.
 
 
-    def __init__(self, logger, listener, hostname=None):
+    def __init__(self, logger, listener, hostname=None, app=None):
+        self.app = app_or_default(app)
         self.logger = logger
         self.logger = logger
         self.hostname = hostname
         self.hostname = hostname
         self.listener = listener
         self.listener = listener

+ 5 - 3
celery/worker/controllers.py

@@ -7,7 +7,7 @@ import time
 import threading
 import threading
 from Queue import Empty as QueueEmpty
 from Queue import Empty as QueueEmpty
 
 
-from celery import log
+from celery.app import app_or_default
 
 
 
 
 class Mediator(threading.Thread):
 class Mediator(threading.Thread):
@@ -24,9 +24,11 @@ class Mediator(threading.Thread):
 
 
     """
     """
 
 
-    def __init__(self, ready_queue, callback, logger=None):
+    def __init__(self, ready_queue, callback, logger=None,
+            app=None):
         threading.Thread.__init__(self)
         threading.Thread.__init__(self)
-        self.logger = logger or log.get_default_logger()
+        self.app = app_or_default(app)
+        self.logger = logger or self.app.log.get_default_logger()
         self.ready_queue = ready_queue
         self.ready_queue = ready_queue
         self.callback = callback
         self.callback = callback
         self._shutdown = threading.Event()
         self._shutdown = threading.Event()

+ 2 - 3
celery/worker/job.py

@@ -6,7 +6,6 @@ import warnings
 
 
 from datetime import datetime
 from datetime import datetime
 
 
-from celery import log
 from celery import platform
 from celery import platform
 from celery.app import app_or_default, default_app
 from celery.app import app_or_default, default_app
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
@@ -224,7 +223,7 @@ class TaskRequest(object):
         self.on_ack = on_ack
         self.on_ack = on_ack
         self.delivery_info = delivery_info or {}
         self.delivery_info = delivery_info or {}
         self.hostname = hostname or socket.gethostname()
         self.hostname = hostname or socket.gethostname()
-        self.logger = logger or log.get_default_logger()
+        self.logger = logger or self.app.log.get_default_logger()
         self.eventer = eventer
         self.eventer = eventer
         self.email_subject = email_subject or self.email_subject
         self.email_subject = email_subject or self.email_subject
         self.email_body = email_body or self.email_body
         self.email_body = email_body or self.email_body
@@ -254,7 +253,7 @@ class TaskRequest(object):
     def from_message(cls, message, message_data, logger=None, eventer=None,
     def from_message(cls, message, message_data, logger=None, eventer=None,
             hostname=None, app=None):
             hostname=None, app=None):
         """Create a :class:`TaskRequest` from a task message sent by
         """Create a :class:`TaskRequest` from a task message sent by
-        :class:`celery.messaging.TaskPublisher`.
+        :class:`celery.app.amqp.TaskPublisher`.
 
 
         :raises UnknownTaskError: if the message does not describe a task,
         :raises UnknownTaskError: if the message does not describe a task,
             the message is also rejected.
             the message is also rejected.

+ 1 - 1
celery/worker/listener.py

@@ -381,7 +381,7 @@ class CarrotListener(object):
 
 
         self.connection = self._open_connection()
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
         self.logger.debug("CarrotListener: Connection Established.")
-        self.task_consumer = self.app.get_consumer_set(
+        self.task_consumer = self.app.amqp.get_consumer_set(
                                         connection=self.connection,
                                         connection=self.connection,
                                         queues=self.queues)
                                         queues=self.queues)
         # QoS: Reset prefetch window.
         # QoS: Reset prefetch window.