Browse Source

Intense refactoring

Ask Solem 14 years ago
parent
commit
7f082a8ea8

+ 5 - 0
celery/__init__.py

@@ -7,3 +7,8 @@ __author__ = "Ask Solem"
 __contact__ = "ask@celeryproject.org"
 __homepage__ = "http://github.com/ask/celery/"
 __docformat__ = "restructuredtext"
+
+
+def Celery(*args, **kwargs):
+    from celery import app
+    return app.Celery(*args, **kwargs)

+ 90 - 0
celery/app/__init__.py

@@ -0,0 +1,90 @@
+from celery.app import base
+
+
+class App(base.BaseApp):
+
+    def create_task_cls(self):
+        from celery.task.base import create_task_cls
+        return create_task_cls(app=self)
+
+    def Worker(self, **kwargs):
+        from celery.apps.worker import Worker
+        return Worker(app=self, **kwargs)
+
+    def Beat(self, **kwargs):
+        from celery.apps.beat import Beat
+        return Beat(app=self, **kwargs)
+
+    def TaskSet(self, *args, **kwargs):
+        from celery.task.sets import TaskSet
+        kwargs["app"] = self
+        return TaskSet(*args, **kwargs)
+
+    def task(self, *args, **options):
+        """Decorator to create a task class out of any callable.
+
+        Examples:
+
+        .. code-block:: python
+
+            @task()
+            def refresh_feed(url):
+                return Feed.objects.get(url=url).refresh()
+
+        With setting extra options and using retry.
+
+        .. code-block:: python
+
+            @task(exchange="feeds")
+            def refresh_feed(url, **kwargs):
+                try:
+                    return Feed.objects.get(url=url).refresh()
+                except socket.error, exc:
+                    refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
+
+        Calling the resulting task:
+
+            >>> refresh_feed("http://example.com/rss") # Regular
+            <Feed: http://example.com/rss>
+            >>> refresh_feed.delay("http://example.com/rss") # Async
+            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
+
+        """
+
+        def inner_create_task_cls(**options):
+
+            def _create_task_cls(fun):
+                base = options.pop("base", None) or self.create_task_cls()
+
+                @wraps(fun, assigned=("__module__", "__name__"))
+                def run(self, *args, **kwargs):
+                    return fun(*args, **kwargs)
+
+                # Save the argspec for this task so we can recognize
+                # which default task kwargs we're going to pass to it later.
+                # (this happens in celery.utils.fun_takes_kwargs)
+                run.argspec = getargspec(fun)
+
+                cls_dict = dict(options, run=run,
+                                __module__=fun.__module__,
+                                __doc__=fun.__doc__)
+                T = type(fun.__name__, (base, ), cls_dict)()
+                return registry.tasks[T.name] # global instance.
+
+            return _create_task_cls
+
+        if len(args) == 1 and callable(args[0]):
+            return inner_create_task_cls()(*args)
+        return inner_create_task_cls(**options)
+
+default_app = App()
+
+counts = [0]
+from multiprocessing import current_process
+def app_or_default(app=None):
+    if app is None:
+        if counts[0] >= 1:
+            print("RETURNING TO DEFAULT APP")
+        counts[0] += 1
+        return default_app
+    return app

+ 60 - 27
celery/defaults/__init__.py → celery/app/base.py

@@ -3,25 +3,31 @@ import sys
 
 from datetime import timedelta
 
+from carrot.connection import BrokerConnection
+
 from celery import routes
+from celery.app.defaults import DEFAULTS
 from celery.datastructures import AttributeDict
-from celery.defaults.conf import DEFAULTS
-
-def isatty(fh):
-    # Fixes bug with mod_wsgi:
-    #   mod_wsgi.Log object has no attribute isatty.
-    return getattr(fh, "isatty", None) and fh.isatty()
+from celery.utils import noop, isatty
+from celery.utils.functional import wraps
 
 
-class DefaultApp(object):
+class BaseApp(object):
     _backend = None
     _conf = None
+    _control = None
     _loader = None
 
     def __init__(self, loader=None, backend_cls=None):
         self.loader_cls = loader or os.environ.get("CELERY_LOADER", "default")
         self.backend_cls = backend_cls
 
+    def either(self, default_key, *values):
+        for value in values:
+            if value is not None:
+                return value
+        return self.conf.get(default_key)
+
     def get_queues(self):
         c = self.conf
         queues = c.CELERY_QUEUES
@@ -40,19 +46,37 @@ class DefaultApp(object):
         q = self.conf.CELERY_DEFAULT_QUEUE
         return q, self.get_queues()[q]
 
-    def broker_connection(self, **kwargs):
-        from celery.messaging import establish_connection
-        return establish_connection(app=self, **kwargs)
-
-    def pre_config_merge(self, c):
-        if not c.get("CELERY_RESULT_BACKEND"):
-            c["CELERY_RESULT_BACKEND"] = c.get("CELERY_BACKEND")
-        if not c.get("BROKER_BACKEND"):
-            c["BROKER_BACKEND"] = c.get("BROKER_TRANSPORT")  or \
-                                    c.get("CARROT_BACKEND")
-        c.setdefault("CELERY_SEND_TASK_ERROR_EMAILS",
-                     c.get("SEND_CELERY_TASK_ERROR_EMAILS"))
-        return c
+    def broker_connection(self, hostname=None, userid=None,
+            password=None, virtual_host=None, port=None, ssl=None,
+            insist=None, connect_timeout=None, backend_cls=None):
+        """Establish a connection to the message broker."""
+        return BrokerConnection(
+                    hostname or self.conf.BROKER_HOST,
+                    userid or self.conf.BROKER_USER,
+                    password or self.conf.BROKER_PASSWORD,
+                    virtual_host or self.conf.BROKER_VHOST,
+                    port or self.conf.BROKER_PORT,
+                    backend_cls=backend_cls or self.conf.BROKER_BACKEND,
+                    insist=self.either("BROKER_INSIST", insist),
+                    ssl=self.either("BROKER_USE_SSL", ssl),
+                    connect_timeout=self.either(
+                                "BROKER_CONNECTION_TIMEOUT", connect_timeout))
+
+    def with_default_connection(self, fun):
+
+        @wraps(fun)
+        def _inner(*args, **kwargs):
+            connection = kwargs.get("connection")
+            timeout = kwargs.get("connect_timeout")
+            kwargs["connection"] = conn = connection or \
+                    self.broker_connection(connect_timeout=timeout)
+            close_connection = not connection and conn.close or noop
+
+            try:
+                return fun(*args, **kwargs)
+            finally:
+                close_connection()
+        return _inner
 
     def get_consumer_set(self, connection, queues=None, **options):
         from celery.messaging import ConsumerSet, Consumer
@@ -69,6 +93,16 @@ class DefaultApp(object):
             cset.consumers.append(consumer)
         return cset
 
+    def pre_config_merge(self, c):
+        if not c.get("CELERY_RESULT_BACKEND"):
+            c["CELERY_RESULT_BACKEND"] = c.get("CELERY_BACKEND")
+        if not c.get("BROKER_BACKEND"):
+            c["BROKER_BACKEND"] = c.get("BROKER_TRANSPORT")  or \
+                                    c.get("CARROT_BACKEND")
+        c.setdefault("CELERY_SEND_TASK_ERROR_EMAILS",
+                     c.get("SEND_CELERY_TASK_ERROR_EMAILS"))
+        return c
+
     def post_config_merge(self, c):
         if not c.get("CELERY_QUEUES"):
             c["CELERY_QUEUES"] = {c.CELERY_DEFAULT_QUEUE: {
@@ -125,10 +159,9 @@ class DefaultApp(object):
                             AttributeDict(DEFAULTS, **config))
         return self._conf
 
-default_app = DefaultApp()
-
-
-def app_or_default(app=None):
-    if app is None:
-        return default_app
-    return app
+    @property
+    def control(self):
+        if self._control is None:
+            from celery.task.control import Control
+            self._control = Control(app=self)
+        return self._control

+ 0 - 0
celery/defaults/conf.py → celery/app/defaults.py


+ 2 - 2
celery/apps/beat.py

@@ -5,7 +5,7 @@ import traceback
 from celery import __version__
 from celery import beat
 from celery import platform
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 from celery.log import emergency_error
 from celery.utils import info, LOG_LEVELS
 
@@ -76,7 +76,7 @@ class Beat(object):
 
     def startup_info(self):
         return STARTUP_INFO_FMT % {
-            "conninfo": info.format_broker_info(),
+            "conninfo": info.format_broker_info(app=self.app),
             "logfile": self.logfile or "@stderr",
             "loglevel": LOG_LEVELS[self.loglevel],
             "schedule": self.schedule,

+ 6 - 8
celery/apps/worker.py

@@ -9,11 +9,10 @@ import warnings
 from celery import __version__
 from celery import platform
 from celery import signals
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 from celery.exceptions import ImproperlyConfigured
 from celery.routes import Router
-from celery.task import discard_all
-from celery.utils import info,get_full_cls_name, LOG_LEVELS
+from celery.utils import info, get_full_cls_name, LOG_LEVELS
 from celery.worker import WorkController
 
 
@@ -131,10 +130,9 @@ class Worker(object):
             log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
 
     def purge_messages(self):
-        discarded_count = discard_all()
-        what = discarded_count > 1 and "messages" or "message"
-        print("discard: Erased %d %s from the queue.\n" % (
-            discarded_count, what))
+        count = self.app.control.discard_all()
+        what = (not count or count > 1) and "messages" or "message"
+        print("discard: Erased %d %s from the queue.\n" % (count, what))
 
     def worker_init(self):
         # Run the worker init handler.
@@ -157,7 +155,7 @@ class Worker(object):
             tasklist = self.tasklist(include_builtins=include_builtins)
 
         return STARTUP_INFO_FMT % {
-            "conninfo": info.format_broker_info(),
+            "conninfo": info.format_broker_info(app=self.app),
             "queues": info.format_queues(self.queues, indent=8),
             "concurrency": self.concurrency,
             "loglevel": LOG_LEVELS[self.loglevel],

+ 1 - 1
celery/backends/__init__.py

@@ -1,4 +1,4 @@
-from celery.defaults import default_app
+from celery.app import default_app
 from celery.utils import get_cls_by_name
 from celery.utils.functional import curry
 

+ 1 - 1
celery/backends/base.py

@@ -18,7 +18,7 @@ class BaseBackend(object):
     TimeoutError = TimeoutError
 
     def __init__(self, *args, **kwargs):
-        from celery.defaults import app_or_default
+        from celery.app import app_or_default
         self.app = app_or_default(kwargs.get("app"))
 
     def encode_result(self, result, status):

+ 1 - 1
celery/beat.py

@@ -12,7 +12,7 @@ from UserDict import UserDict
 
 from celery import log
 from celery import platform
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 from celery.execute import send_task
 from celery.schedules import maybe_schedule
 from celery.messaging import establish_connection

+ 1 - 1
celery/bin/base.py

@@ -4,7 +4,7 @@ import sys
 from optparse import OptionParser, make_option as Option
 
 from celery import __version__
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 
 
 class Command(object):

+ 2 - 2
celery/conf.py

@@ -6,8 +6,8 @@ Use :mod:`celery.defaults` instead.
 
 
 """
-from celery.defaults.conf import DEFAULTS as _DEFAULTS
-from celery.defaults import default_app
+from celery.app import default_app
+from celery.app.defaults import DEFAULTS as _DEFAULTS
 
 conf = default_app.conf
 

+ 1 - 1
celery/events/__init__.py

@@ -7,7 +7,7 @@ from itertools import count
 
 from carrot.messaging import Publisher, Consumer
 
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 
 
 def create_event(type, fields):

+ 3 - 4
celery/execute/__init__.py

@@ -1,7 +1,6 @@
+from celery.app import app_or_default, default_app
 from celery.datastructures import ExceptionInfo
-from celery.defaults import app_or_default
 from celery.execute.trace import TaskTrace
-from celery.messaging import with_connection
 from celery.messaging import TaskPublisher
 from celery.registry import tasks
 from celery.result import AsyncResult, EagerResult
@@ -14,7 +13,7 @@ extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
                                    "delivery_mode")
 
 
-@with_connection
+@default_app.with_default_connection
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
         router=None, expires=None, queues=None, app=None, **options):
@@ -111,7 +110,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     return task.AsyncResult(task_id)
 
 
-@with_connection
+@default_app.with_default_connection
 def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
         result_cls=AsyncResult, expires=None, **options):

+ 1 - 1
celery/loaders/base.py

@@ -25,7 +25,7 @@ class BaseLoader(object):
     configured = False
 
     def __init__(self, app=None, **kwargs):
-        from celery.defaults import app_or_default
+        from celery.app import app_or_default
         self.app = app_or_default(app)
 
     def on_task_init(self, task_id, task):

+ 1 - 1
celery/log.py

@@ -10,7 +10,7 @@ from multiprocessing import current_process
 from multiprocessing import util as mputil
 
 from celery import signals
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 from celery.utils import noop
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger

+ 5 - 146
celery/messaging.py

@@ -3,17 +3,12 @@
 Sending and Receiving Messages
 
 """
-import socket
-import warnings
-
 from datetime import datetime, timedelta
-from itertools import count
 
-from carrot.connection import BrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
 
 from celery import signals
-from celery.defaults import app_or_default, default_app
+from celery.app import app_or_default, default_app
 from celery.utils import gen_unique_id, mitemgetter, noop
 from celery.utils.functional import wraps
 
@@ -145,153 +140,17 @@ class TaskConsumer(Consumer):
         super(TaskConsumer, self).__init__(*args, **kwargs)
 
 
-class ControlReplyConsumer(Consumer):
-    exchange = "celerycrq"
-    exchange_type = "direct"
-    durable = False
-    exclusive = False
-    auto_delete = True
-    no_ack = True
-
-    def __init__(self, connection, ticket, **kwargs):
-        self.ticket = ticket
-        queue = "%s.%s" % (self.exchange, ticket)
-        super(ControlReplyConsumer, self).__init__(connection,
-                                                   queue=queue,
-                                                   routing_key=ticket,
-                                                   **kwargs)
-
-    def collect(self, limit=None, timeout=1, callback=None):
-        responses = []
-
-        def on_message(message_data, message):
-            if callback:
-                callback(message_data)
-            responses.append(message_data)
-
-        self.callbacks = [on_message]
-        self.consume()
-        for i in limit and range(limit) or count():
-            try:
-                self.connection.drain_events(timeout=timeout)
-            except socket.timeout:
-                break
-
-        return responses
-
-
-class ControlReplyPublisher(Publisher):
-    exchange = "celerycrq"
-    exchange_type = "direct"
-    delivery_mode = "non-persistent"
-    durable = False
-    auto_delete = True
-
-
-class BroadcastPublisher(Publisher):
-    """Publish broadcast commands"""
-
-    ReplyTo = ControlReplyConsumer
-
-    def __init__(self, *args, **kwargs):
-        app = self.app = app_or_default(kwargs.get("app"))
-        kwargs["exchange"] = kwargs.get("exchange") or \
-                                app.conf.CELERY_BROADCAST_EXCHANGE
-        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
-                                app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
-        super(BroadcastPublisher, self).__init__(*args, **kwargs)
-
-    def send(self, type, arguments, destination=None, reply_ticket=None):
-        """Send broadcast command."""
-        arguments["command"] = type
-        arguments["destination"] = destination
-        reply_to = self.ReplyTo(self.connection, None, app=self.app,
-                                auto_declare=False)
-        if reply_ticket:
-            arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
-                                     "routing_key": reply_ticket}
-        super(BroadcastPublisher, self).send({"control": arguments})
-
-
-class BroadcastConsumer(Consumer):
-    """Consume broadcast commands"""
-    no_ack = True
-
-    def __init__(self, *args, **kwargs):
-        self.app = app = app_or_default(kwargs.get("app"))
-        kwargs["queue"] = kwargs.get("queue") or \
-                            app.conf.CELERY_BROADCAST_QUEUE
-        kwargs["exchange"] = kwargs.get("exchange") or \
-                            app.conf.CELERY_BROADCAST_EXCHANGE
-        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
-                            app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
-        self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
-        self.queue = "%s_%s" % (self.queue, self.hostname)
-        super(BroadcastConsumer, self).__init__(*args, **kwargs)
-
-    def verify_exclusive(self):
-        # XXX Kombu material
-        channel = getattr(self.backend, "channel")
-        if channel and hasattr(channel, "queue_declare"):
-            try:
-                _, _, consumers = channel.queue_declare(self.queue,
-                                                        passive=True)
-            except ValueError:
-                pass
-            else:
-                if consumers:
-                    warnings.warn(UserWarning(
-                        "A node named %s is already using this process "
-                        "mailbox. Maybe you should specify a custom name "
-                        "for this node with the -n argument?" % self.hostname))
-
-    def consume(self, *args, **kwargs):
-        self.verify_exclusive()
-        return super(BroadcastConsumer, self).consume(*args, **kwargs)
-
-
-def establish_connection(hostname=None, userid=None, password=None,
-        virtual_host=None, port=None, ssl=None, insist=None,
-        connect_timeout=None, backend_cls=None, app=None):
+def establish_connection(**kwargs):
     """Establish a connection to the message broker."""
-    app = app_or_default(app)
-    if insist is None:
-        insist = app.conf.get("BROKER_INSIST")
-    if ssl is None:
-        ssl = app.conf.get("BROKER_USE_SSL")
-    if connect_timeout is None:
-        connect_timeout = app.conf.get("BROKER_CONNECTION_TIMEOUT")
-
-    return BrokerConnection(hostname or app.conf.BROKER_HOST,
-                            userid or app.conf.BROKER_USER,
-                            password or app.conf.BROKER_PASSWORD,
-                            virtual_host or app.conf.BROKER_VHOST,
-                            port or app.conf.BROKER_PORT,
-                            backend_cls=backend_cls or app.conf.BROKER_BACKEND,
-                            insist=insist, ssl=ssl,
-                            connect_timeout=connect_timeout)
+    app = app_or_default(kwargs.pop("app", None))
+    return app.broker_connection(**kwargs)
 
 
 def with_connection(fun):
     """Decorator for providing default message broker connection for functions
     supporting the ``connection`` and ``connect_timeout`` keyword
     arguments."""
-
-    @wraps(fun)
-    def _inner(*args, **kwargs):
-        connection = kwargs.get("connection")
-        timeout = kwargs.get("connect_timeout",
-                    default_app.conf.get("BROKER_CONNECTION_TIMEOUT"))
-        kwargs["connection"] = conn = connection or \
-                establish_connection(connect_timeout=timeout)
-        close_connection = not connection and conn.close or noop
-
-        try:
-            return fun(*args, **kwargs)
-        finally:
-            close_connection()
-
-    return _inner
+    return default_app.with_default_connection(fun)
 
 
 def get_consumer_set(connection, queues=None, **options):

+ 113 - 0
celery/pidbox.py

@@ -0,0 +1,113 @@
+import socket
+import warnings
+
+from itertools import count
+
+from carrot.messaging import Consumer, Publisher
+
+from celery.app import app_or_default
+
+
+class ControlReplyConsumer(Consumer):
+    exchange = "celerycrq"
+    exchange_type = "direct"
+    durable = False
+    exclusive = False
+    auto_delete = True
+    no_ack = True
+
+    def __init__(self, connection, ticket, **kwargs):
+        self.ticket = ticket
+        queue = "%s.%s" % (self.exchange, ticket)
+        super(ControlReplyConsumer, self).__init__(connection,
+                                                   queue=queue,
+                                                   routing_key=ticket,
+                                                   **kwargs)
+
+    def collect(self, limit=None, timeout=1, callback=None):
+        responses = []
+
+        def on_message(message_data, message):
+            if callback:
+                callback(message_data)
+            responses.append(message_data)
+
+        self.callbacks = [on_message]
+        self.consume()
+        for i in limit and range(limit) or count():
+            try:
+                self.connection.drain_events(timeout=timeout)
+            except socket.timeout:
+                break
+
+        return responses
+
+
+class ControlReplyPublisher(Publisher):
+    exchange = "celerycrq"
+    exchange_type = "direct"
+    delivery_mode = "non-persistent"
+    durable = False
+    auto_delete = True
+
+
+class BroadcastPublisher(Publisher):
+    """Publish broadcast commands"""
+
+    ReplyTo = ControlReplyConsumer
+
+    def __init__(self, *args, **kwargs):
+        app = self.app = app_or_default(kwargs.get("app"))
+        kwargs["exchange"] = kwargs.get("exchange") or \
+                                app.conf.CELERY_BROADCAST_EXCHANGE
+        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
+                                app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
+        super(BroadcastPublisher, self).__init__(*args, **kwargs)
+
+    def send(self, type, arguments, destination=None, reply_ticket=None):
+        """Send broadcast command."""
+        arguments["command"] = type
+        arguments["destination"] = destination
+        reply_to = self.ReplyTo(self.connection, None, app=self.app,
+                                auto_declare=False)
+        if reply_ticket:
+            arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
+                                     "routing_key": reply_ticket}
+        super(BroadcastPublisher, self).send({"control": arguments})
+
+
+class BroadcastConsumer(Consumer):
+    """Consume broadcast commands"""
+    no_ack = True
+
+    def __init__(self, *args, **kwargs):
+        self.app = app = app_or_default(kwargs.get("app"))
+        kwargs["queue"] = kwargs.get("queue") or \
+                            app.conf.CELERY_BROADCAST_QUEUE
+        kwargs["exchange"] = kwargs.get("exchange") or \
+                            app.conf.CELERY_BROADCAST_EXCHANGE
+        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
+                            app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
+        self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
+        self.queue = "%s_%s" % (self.queue, self.hostname)
+        super(BroadcastConsumer, self).__init__(*args, **kwargs)
+
+    def verify_exclusive(self):
+        # XXX Kombu material
+        channel = getattr(self.backend, "channel")
+        if channel and hasattr(channel, "queue_declare"):
+            try:
+                _, _, consumers = channel.queue_declare(self.queue,
+                                                        passive=True)
+            except ValueError:
+                pass
+            else:
+                if consumers:
+                    warnings.warn(UserWarning(
+                        "A node named %s is already using this process "
+                        "mailbox. Maybe you should specify a custom name "
+                        "for this node with the -n argument?" % self.hostname))
+
+    def consume(self, *args, **kwargs):
+        self.verify_exclusive()
+        return super(BroadcastConsumer, self).consume(*args, **kwargs)

+ 25 - 15
celery/result.py

@@ -6,10 +6,9 @@ from copy import copy
 from itertools import imap
 
 from celery import states
-from celery.defaults import default_app
+from celery.app import app_or_default
 from celery.datastructures import PositionQueue
 from celery.exceptions import TimeoutError
-from celery.messaging import with_connection
 from celery.utils import any, all
 
 
@@ -31,9 +30,10 @@ class BaseAsyncResult(object):
 
     TimeoutError = TimeoutError
 
-    def __init__(self, task_id, backend):
+    def __init__(self, task_id, backend, app=None):
         self.task_id = task_id
         self.backend = backend
+        self.app = app_or_default(app)
 
     def revoke(self, connection=None, connect_timeout=None):
         """Send revoke signal to all workers.
@@ -41,9 +41,13 @@ class BaseAsyncResult(object):
         The workers will ignore the task if received.
 
         """
-        from celery.task import control
-        control.revoke(self.task_id, connection=connection,
-                       connect_timeout=connect_timeout)
+
+        def _do_revoke(connection=None, connect_timeout=None):
+            from celery.task import control
+            control.revoke(self.task_id, connection=connection,
+                           connect_timeout=connect_timeout)
+        self.app.with_default_connection(_do_revoke)(
+                connection=connection, connect_timeout=connect_timeout)
 
     def wait(self, timeout=None):
         """Wait for task, and return the result when it arrives.
@@ -164,9 +168,10 @@ class AsyncResult(BaseAsyncResult):
 
     """
 
-    def __init__(self, task_id, backend=None):
-        backend = backend or default_app.backend
-        super(AsyncResult, self).__init__(task_id, backend)
+    def __init__(self, task_id, backend=None, app=None):
+        app = app_or_default(app)
+        backend = backend or app.backend
+        super(AsyncResult, self).__init__(task_id, backend, app=app)
 
 
 class TaskSetResult(object):
@@ -189,9 +194,10 @@ class TaskSetResult(object):
 
     """
 
-    def __init__(self, taskset_id, subtasks):
+    def __init__(self, taskset_id, subtasks, app=None):
         self.taskset_id = taskset_id
         self.subtasks = subtasks
+        self.app = app_or_default(app)
 
     def itersubtasks(self):
         """Taskset subtask iterator.
@@ -251,10 +257,14 @@ class TaskSetResult(object):
         return sum(imap(int, (subtask.successful()
                                 for subtask in self.itersubtasks())))
 
-    @with_connection
     def revoke(self, connection=None, connect_timeout=None):
-        for subtask in self.subtasks:
-            subtask.revoke(connection=connection)
+
+        def _do_revoke(connection=None, connect_timeout=None):
+            for subtask in self.subtasks:
+                subtask.revoke(connection=connection)
+
+        return self.app.with_default_connection(_do_revoke)(
+                connection=connection, connect_timeout=connect_timeout)
 
     def __iter__(self):
         """``iter(res)`` -> ``res.iterate()``."""
@@ -335,14 +345,14 @@ class TaskSetResult(object):
 
         """
         if backend is None:
-            backend = default_app.backend
+            backend = self.app.backend
         backend.save_taskset(self.taskset_id, self)
 
     @classmethod
     def restore(self, taskset_id, backend=None):
         """Restore previously saved taskset result."""
         if backend is None:
-            backend = default_app.backend
+            backend = self.app.backend
         return backend.restore_taskset(taskset_id)
 
     @property

+ 14 - 6
celery/task/base.py

@@ -1,12 +1,11 @@
 import sys
 import warnings
 
-from celery.defaults import default_app
+from celery.app import default_app
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.execute import apply_async, apply
 from celery.log import setup_task_logger
 from celery.messaging import TaskPublisher, TaskConsumer
-from celery.messaging import establish_connection as _establish_connection
 from celery.registry import tasks
 from celery.result import BaseAsyncResult, EagerResult
 from celery.schedules import maybe_schedule
@@ -72,7 +71,6 @@ class TaskType(type):
 
 def create_task_cls(app):
 
-
     class Task(object):
         """A celery task.
 
@@ -289,7 +287,7 @@ def create_task_cls(app):
         def establish_connection(self,
             connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT):
             """Establish a connection to the message broker."""
-            return _establish_connection(connect_timeout=connect_timeout)
+            return app.broker_connection(connect_timeout=connect_timeout)
 
         @classmethod
         def get_publisher(self, connection=None, exchange=None,
@@ -366,7 +364,15 @@ def create_task_cls(app):
             :returns :class:`celery.result.AsyncResult`:
 
             """
-            return apply_async(self, args, kwargs, **options)
+            conn = None
+            if not options.get("connection") or options.get("publisher"):
+                conn = options["connection"] = self.establish_connection(
+                            connect_timeout=options.get("connect_timeout"))
+            try:
+                return apply_async(self, args, kwargs, **options)
+            finally:
+                if conn:
+                    conn.close()
 
         @classmethod
         def retry(self, args=None, kwargs=None, exc=None, throw=True,
@@ -468,7 +474,8 @@ def create_task_cls(app):
             :param task_id: Task id to get result for.
 
             """
-            return BaseAsyncResult(task_id, backend=self.backend)
+            return BaseAsyncResult(task_id,
+                                   backend=self.backend, app=self.app)
 
         def on_retry(self, exc, task_id, args, kwargs, einfo=None):
             """Retry handler.
@@ -571,6 +578,7 @@ def create_task_cls(app):
         def __name__(self):
             return self.__class__.__name__
 
+    Task.app = app
     return Task
 
 Task = create_task_cls(default_app)

+ 163 - 134
celery/task/control.py

@@ -1,89 +1,6 @@
+from celery.app import default_app
+from celery.pidbox import BroadcastPublisher, ControlReplyConsumer
 from celery.utils import gen_unique_id
-from celery.messaging import BroadcastPublisher, ControlReplyConsumer
-from celery.messaging import with_connection, get_consumer_set
-
-
-@with_connection
-def discard_all(connection=None, connect_timeout=None):
-    """Discard all waiting tasks.
-
-    This will ignore all tasks waiting for execution, and they will
-    be deleted from the messaging server.
-
-    :returns: the number of tasks discarded.
-
-    """
-    consumers = get_consumer_set(connection=connection)
-    try:
-        return consumers.discard_all()
-    finally:
-        consumers.close()
-
-
-def revoke(task_id, destination=None, **kwargs):
-    """Revoke a task by id.
-
-    If a task is revoked, the workers will ignore the task and not execute
-    it after all.
-
-    :param task_id: Id of the task to revoke.
-    :keyword destination: If set, a list of the hosts to send the command to,
-        when empty broadcast to all workers.
-    :keyword connection: Custom broker connection to use, if not set,
-        a connection will be established automatically.
-    :keyword connect_timeout: Timeout for new connection if a custom
-        connection is not provided.
-    :keyword reply: Wait for and return the reply.
-    :keyword timeout: Timeout in seconds to wait for the reply.
-    :keyword limit: Limit number of replies.
-
-    """
-    return broadcast("revoke", destination=destination,
-                               arguments={"task_id": task_id}, **kwargs)
-
-
-def ping(destination=None, timeout=1, **kwargs):
-    """Ping workers.
-
-    Returns answer from alive workers.
-
-    :keyword destination: If set, a list of the hosts to send the command to,
-        when empty broadcast to all workers.
-    :keyword connection: Custom broker connection to use, if not set,
-        a connection will be established automatically.
-    :keyword connect_timeout: Timeout for new connection if a custom
-        connection is not provided.
-    :keyword reply: Wait for and return the reply.
-    :keyword timeout: Timeout in seconds to wait for the reply.
-    :keyword limit: Limit number of replies.
-
-    """
-    return broadcast("ping", reply=True, destination=destination,
-                     timeout=timeout, **kwargs)
-
-
-def rate_limit(task_name, rate_limit, destination=None, **kwargs):
-    """Set rate limit for task by type.
-
-    :param task_name: Type of task to change rate limit for.
-    :param rate_limit: The rate limit as tasks per second, or a rate limit
-      string (``"100/m"``, etc. see :attr:`celery.task.base.Task.rate_limit`
-      for more information).
-    :keyword destination: If set, a list of the hosts to send the command to,
-        when empty broadcast to all workers.
-    :keyword connection: Custom broker connection to use, if not set,
-        a connection will be established automatically.
-    :keyword connect_timeout: Timeout for new connection if a custom
-        connection is not provided.
-    :keyword reply: Wait for and return the reply.
-    :keyword timeout: Timeout in seconds to wait for the reply.
-    :keyword limit: Limit number of replies.
-
-    """
-    return broadcast("rate_limit", destination=destination,
-                                   arguments={"task_name": task_name,
-                                              "rate_limit": rate_limit},
-                                   **kwargs)
 
 
 def flatten_reply(reply):
@@ -93,12 +10,13 @@ def flatten_reply(reply):
     return nodes
 
 
-class inspect(object):
+class Inspect(object):
 
-    def __init__(self, destination=None, timeout=1, callback=None):
+    def __init__(self, control, destination=None, timeout=1, callback=None,):
         self.destination = destination
         self.timeout = timeout
         self.callback = callback
+        self.control = control
 
     def _prepare(self, reply):
         if not reply:
@@ -110,7 +28,8 @@ class inspect(object):
         return by_node
 
     def _request(self, command, **kwargs):
-        return self._prepare(broadcast(command, arguments=kwargs,
+        return self._prepare(self.control.broadcast(command,
+                                      arguments=kwargs,
                                       destination=self.destination,
                                       callback=self.callback,
                                       timeout=self.timeout, reply=True))
@@ -147,49 +66,159 @@ class inspect(object):
         return self._request("ping")
 
 
-@with_connection
-def broadcast(command, arguments=None, destination=None, connection=None,
-        connect_timeout=None, reply=False, timeout=1, limit=None,
-        callback=None):
-    """Broadcast a control command to the celery workers.
-
-    :param command: Name of command to send.
-    :param arguments: Keyword arguments for the command.
-    :keyword destination: If set, a list of the hosts to send the command to,
-        when empty broadcast to all workers.
-    :keyword connection: Custom broker connection to use, if not set,
-        a connection will be established automatically.
-    :keyword connect_timeout: Timeout for new connection if a custom
-        connection is not provided.
-    :keyword reply: Wait for and return the reply.
-    :keyword timeout: Timeout in seconds to wait for the reply.
-    :keyword limit: Limit number of replies.
-    :keyword callback: Callback called immediately for each reply
-        received.
-
-    """
-    arguments = arguments or {}
-    reply_ticket = reply and gen_unique_id() or None
-
-    if destination is not None and not isinstance(destination, (list, tuple)):
-        raise ValueError("destination must be a list/tuple not %s" % (
-                type(destination)))
-
-    # Set reply limit to number of destinations (if specificed)
-    if limit is None and destination:
-        limit = destination and len(destination) or None
-
-    broadcast = BroadcastPublisher(connection)
-    try:
-        broadcast.send(command, arguments, destination=destination,
-                       reply_ticket=reply_ticket)
-    finally:
-        broadcast.close()
-
-    if reply_ticket:
-        crq = ControlReplyConsumer(connection, reply_ticket)
-        try:
-            return crq.collect(limit=limit, timeout=timeout,
-                               callback=callback)
-        finally:
-            crq.close()
+class Control(object):
+
+    def __init__(self, app=None):
+        self.app = app
+
+    def inspect(self, destination=None, timeout=1, callback=None):
+        return Inspect(self, destination=destination, timeout=timeout,
+                             callback=callback)
+
+    def discard_all(self, connection=None, connect_timeout=None):
+        """Discard all waiting tasks.
+
+        This will ignore all tasks waiting for execution, and they will
+        be deleted from the messaging server.
+
+        :returns: the number of tasks discarded.
+
+        """
+
+        def _do_discard(connection=None, connect_timeout=None):
+            consumers = self.app.get_consumer_set(connection=connection)
+            try:
+                return consumers.discard_all()
+            finally:
+                consumers.close()
+
+        return self.app.with_default_connection(_do_discard)(
+                connection=connection, connect_timeout=connect_timeout)
+
+
+    def revoke(self, task_id, destination=None, **kwargs):
+        """Revoke a task by id.
+
+        If a task is revoked, the workers will ignore the task and
+        not execute it after all.
+
+        :param task_id: Id of the task to revoke.
+        :keyword destination: If set, a list of the hosts to send the
+            command to, when empty broadcast to all workers.
+        :keyword connection: Custom broker connection to use, if not set,
+            a connection will be established automatically.
+        :keyword connect_timeout: Timeout for new connection if a custom
+            connection is not provided.
+        :keyword reply: Wait for and return the reply.
+        :keyword timeout: Timeout in seconds to wait for the reply.
+        :keyword limit: Limit number of replies.
+
+        """
+        return self.broadcast("revoke", destination=destination,
+                              arguments={"task_id": task_id}, **kwargs)
+
+
+    def ping(self, destination=None, timeout=1, **kwargs):
+        """Ping workers.
+
+        Returns answer from alive workers.
+
+        :keyword destination: If set, a list of the hosts to send the
+            command to, when empty broadcast to all workers.
+        :keyword connection: Custom broker connection to use, if not set,
+            a connection will be established automatically.
+        :keyword connect_timeout: Timeout for new connection if a custom
+            connection is not provided.
+        :keyword reply: Wait for and return the reply.
+        :keyword timeout: Timeout in seconds to wait for the reply.
+        :keyword limit: Limit number of replies.
+
+        """
+        return self.broadcast("ping", reply=True, destination=destination,
+                              timeout=timeout, **kwargs)
+
+
+    def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
+        """Set rate limit for task by type.
+
+        :param task_name: Type of task to change rate limit for.
+        :param rate_limit: The rate limit as tasks per second, or a rate limit
+            string (``"100/m"``, etc.
+            see :attr:`celery.task.base.Task.rate_limit` for
+            more information).
+        :keyword destination: If set, a list of the hosts to send the
+            command to, when empty broadcast to all workers.
+        :keyword connection: Custom broker connection to use, if not set,
+            a connection will be established automatically.
+        :keyword connect_timeout: Timeout for new connection if a custom
+            connection is not provided.
+        :keyword reply: Wait for and return the reply.
+        :keyword timeout: Timeout in seconds to wait for the reply.
+        :keyword limit: Limit number of replies.
+
+        """
+        return self.broadcast("rate_limit", destination=destination,
+                              arguments={"task_name": task_name,
+                                         "rate_limit": rate_limit},
+                              **kwargs)
+
+    def broadcast(self, command, arguments=None, destination=None,
+            connection=None, connect_timeout=None, reply=False, timeout=1,
+            limit=None, callback=None):
+        """Broadcast a control command to the celery workers.
+
+        :param command: Name of command to send.
+        :param arguments: Keyword arguments for the command.
+        :keyword destination: If set, a list of the hosts to send the
+            command to, when empty broadcast to all workers.
+        :keyword connection: Custom broker connection to use, if not set,
+            a connection will be established automatically.
+        :keyword connect_timeout: Timeout for new connection if a custom
+            connection is not provided.
+        :keyword reply: Wait for and return the reply.
+        :keyword timeout: Timeout in seconds to wait for the reply.
+        :keyword limit: Limit number of replies.
+        :keyword callback: Callback called immediately for each reply
+            received.
+
+        """
+        arguments = arguments or {}
+        reply_ticket = reply and gen_unique_id() or None
+
+        if destination is not None and \
+                not isinstance(destination, (list, tuple)):
+            raise ValueError("destination must be a list/tuple not %s" % (
+                    type(destination)))
+
+        # Set reply limit to number of destinations (if specificed)
+        if limit is None and destination:
+            limit = destination and len(destination) or None
+
+        def _do_broadcast(connection=None, connect_timeout=None):
+
+            broadcaster = BroadcastPublisher(connection)
+            try:
+                broadcaster.send(command, arguments, destination=destination,
+                               reply_ticket=reply_ticket)
+            finally:
+                broadcaster.close()
+
+            if reply_ticket:
+                crq = ControlReplyConsumer(connection, reply_ticket)
+                try:
+                    return crq.collect(limit=limit, timeout=timeout,
+                                       callback=callback)
+                finally:
+                    crq.close()
+
+        return self.app.with_default_connection(_do_broadcast)(
+                connection=connection, connect_timeout=connect_timeout)
+
+
+_default_control = Control(default_app)
+broadcast = _default_control.broadcast
+rate_limit = _default_control.rate_limit
+ping = _default_control.ping
+revoke = _default_control.revoke
+discard_all = _default_control.discard_all
+inspect = _default_control.inspect

+ 12 - 8
celery/task/sets.py

@@ -3,9 +3,8 @@ import warnings
 from UserList import UserList
 
 from celery import registry
+from celery.app import app_or_default
 from celery.datastructures import AttributeDict
-from celery.defaults import app_or_default
-from celery.messaging import with_connection
 from celery.messaging import TaskPublisher
 from celery.result import TaskSetResult
 from celery.utils import gen_unique_id
@@ -127,7 +126,7 @@ class TaskSet(UserList):
     _task = None # compat
     _task_name = None # compat
 
-    def __init__(self, task=None, tasks=None):
+    def __init__(self, task=None, tasks=None, app=None):
         if task is not None:
             if hasattr(task, "__iter__"):
                 tasks = task
@@ -142,11 +141,15 @@ class TaskSet(UserList):
                                 "cls": task.__class__.__name__},
                               DeprecationWarning)
 
+        self.app = app_or_default(app)
         self.data = list(tasks)
         self.total = len(self.tasks)
 
-    @with_connection
-    def apply_async(self, connection=None, connect_timeout=None, app=None):
+    def apply_async(self, connection=None, connect_timeout=None):
+        return self.app.with_default_connection(self._apply_async)(
+                connection=connection, connect_timeout=connect_timeout)
+
+    def _apply_async(self, connection=None, connect_timeout=None):
         """Run all tasks in the taskset.
 
         Returns a :class:`celery.result.TaskSetResult` instance.
@@ -176,7 +179,7 @@ class TaskSet(UserList):
             [True, True]
 
         """
-        if app_or_default(app).conf.CELERY_ALWAYS_EAGER:
+        if self.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply()
 
         taskset_id = gen_unique_id()
@@ -188,7 +191,7 @@ class TaskSet(UserList):
         finally:
             publisher.close()
 
-        return TaskSetResult(taskset_id, results)
+        return TaskSetResult(taskset_id, results, app=self.app)
 
     def apply(self):
         """Applies the taskset locally."""
@@ -196,7 +199,8 @@ class TaskSet(UserList):
 
         # This will be filled with EagerResults.
         return TaskSetResult(taskset_id, [task.apply(taskset_id=taskset_id)
-                                            for task in self.tasks])
+                                            for task in self.tasks],
+                             app=self.app)
 
     @property
     def tasks(self):

+ 1 - 1
celery/tests/test_backends/test_database.py

@@ -5,8 +5,8 @@ from datetime import datetime
 from celery.exceptions import ImproperlyConfigured
 
 from celery import states
+from celery.app import default_app
 from celery.db.models import Task, TaskSet
-from celery.defaults import default_app
 from celery.utils import gen_unique_id
 from celery.backends.database import DatabaseBackend
 

+ 1 - 1
celery/tests/test_bin/test_celeryd.py

@@ -8,7 +8,7 @@ from StringIO import StringIO
 
 from celery import platform
 from celery import signals
-from celery.defaults import default_app
+from celery.app import default_app
 from celery.apps import worker as cd
 from celery.bin.celeryd import WorkerCommand, main as celeryd_main
 from celery.exceptions import ImproperlyConfigured

+ 1 - 1
celery/tests/test_result.py

@@ -3,7 +3,7 @@ from __future__ import generators
 import unittest2 as unittest
 
 from celery import states
-from celery.defaults import default_app
+from celery.app import default_app
 from celery.utils import gen_unique_id
 from celery.utils.compat import all
 from celery.result import AsyncResult, TaskSetResult

+ 9 - 10
celery/tests/test_routes.py

@@ -1,8 +1,7 @@
 import unittest2 as unittest
 
-
-from celery import conf
 from celery import routes
+from celery.app import default_app
 from celery.utils import maybe_promise
 from celery.utils.functional import wraps
 from celery.exceptions import QueueNotFound
@@ -19,12 +18,12 @@ def with_queues(**queues):
     def patch_fun(fun):
         @wraps(fun)
         def __inner(*args, **kwargs):
-            prev_queues = conf.QUEUES
-            conf.QUEUES = queues
+            prev_queues = default_app.conf.CELERY_QUEUES
+            default_app.conf.CELERY_QUEUES = queues
             try:
                 return fun(*args, **kwargs)
             finally:
-                conf.QUEUES = prev_queues
+                default_app.conf.CELERY_QUEUES = prev_queues
         return __inner
     return patch_fun
 
@@ -41,7 +40,7 @@ class test_MapRoute(unittest.TestCase):
 
     @with_queues(foo=a_queue, bar=b_queue)
     def test_route_for_task_expanded_route(self):
-        expand = E(conf.QUEUES)
+        expand = E(default_app.conf.CELERY_QUEUES)
         route = routes.MapRoute({"celery.ping": {"queue": "foo"}})
         self.assertDictContainsSubset(a_queue,
                              expand(route.route_for_task("celery.ping")))
@@ -49,14 +48,14 @@ class test_MapRoute(unittest.TestCase):
 
     @with_queues(foo=a_queue, bar=b_queue)
     def test_route_for_task(self):
-        expand = E(conf.QUEUES)
+        expand = E(default_app.conf.CELERY_QUEUES)
         route = routes.MapRoute({"celery.ping": b_queue})
         self.assertDictContainsSubset(b_queue,
                              expand(route.route_for_task("celery.ping")))
         self.assertIsNone(route.route_for_task("celery.awesome"))
 
     def test_expand_route_not_found(self):
-        expand = E(conf.QUEUES)
+        expand = E(default_app.conf.CELERY_QUEUES)
         route = routes.MapRoute({"a": {"queue": "x"}})
         self.assertRaises(QueueNotFound, expand, route.route_for_task("a"))
 
@@ -71,7 +70,7 @@ class test_lookup_route(unittest.TestCase):
     def test_lookup_takes_first(self):
         R = routes.prepare(({"celery.ping": {"queue": "bar"}},
                             {"celery.ping": {"queue": "foo"}}))
-        router = routes.Router(R, conf.QUEUES)
+        router = routes.Router(R, default_app.conf.CELERY_QUEUES)
         self.assertDictContainsSubset(b_queue,
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))
@@ -80,7 +79,7 @@ class test_lookup_route(unittest.TestCase):
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
                             {"celery.ping": {"queue": "foo"}}))
-        router = routes.Router(R, conf.QUEUES)
+        router = routes.Router(R, default_app.conf.CELERY_QUEUES)
         self.assertDictContainsSubset(a_queue,
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))

+ 1 - 2
celery/tests/test_task.py

@@ -5,10 +5,9 @@ from datetime import datetime, timedelta
 from pyparsing import ParseException
 
 
-from celery import conf
 from celery import task
 from celery import messaging
-from celery.defaults import default_app
+from celery.app import default_app
 from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import gen_unique_id, parse_iso8601

+ 1 - 2
celery/tests/test_task_sets.py

@@ -2,8 +2,7 @@ import unittest2 as unittest
 
 import simplejson
 
-from celery import conf
-from celery.defaults import default_app
+from celery.app import default_app
 from celery.task import Task
 from celery.task.sets import subtask, TaskSet
 

+ 3 - 3
celery/tests/test_worker.py

@@ -9,7 +9,6 @@ from carrot.backends.base import BaseMessage
 from carrot.connection import BrokerConnection
 from celery.utils.timer2 import Timer
 
-from celery import conf
 from celery.decorators import task as task_dec
 from celery.decorators import periodic_task as periodic_task_dec
 from celery.serialization import pickle
@@ -380,11 +379,12 @@ class test_CarrotListener(unittest.TestCase):
                                timedelta(days=1)).isoformat())
 
         l.reset_connection()
-        p, conf.BROKER_CONNECTION_RETRY = conf.BROKER_CONNECTION_RETRY, False
+        p = l.app.conf.BROKER_CONNECTION_RETRY
+        l.app.conf.BROKER_CONNECTION_RETRY = False
         try:
             l.reset_connection()
         finally:
-            conf.BROKER_CONNECTION_RETRY = p
+            l.app.conf.BROKER_CONNECTION_RETRY = p
         l.receive_message(m.decode(), m)
 
         in_hold = self.eta_schedule.queue[0]

+ 3 - 4
celery/tests/test_worker_control.py

@@ -3,9 +3,8 @@ import unittest2 as unittest
 
 from celery.utils.timer2 import Timer
 
-from celery import conf
+from celery.app import default_app
 from celery.decorators import task
-from celery.defaults import default_app
 from celery.registry import tasks
 from celery.task.builtins import PingTask
 from celery.utils import gen_unique_id
@@ -101,13 +100,13 @@ class test_ControlPanel(unittest.TestCase):
         self.assertFalse(panel.execute("dump_reserved"))
 
     def test_rate_limit_when_disabled(self):
-        conf.DISABLE_RATE_LIMITS = True
+        default_app.conf.CELERY_DISABLE_RATE_LIMITS = True
         try:
             e = self.panel.execute("rate_limit", kwargs=dict(
                  task_name=mytask.name, rate_limit="100/m"))
             self.assertIn("rate limits disabled", e.get("error"))
         finally:
-            conf.DISABLE_RATE_LIMITS = False
+            default_app.conf.CELERY_DISABLE_RATE_LIMITS = False
 
     def test_rate_limit_invalid_rate_limit_string(self):
         e = self.panel.execute("rate_limit", kwargs=dict(

+ 3 - 5
celery/tests/test_worker_job.py

@@ -9,8 +9,8 @@ from StringIO import StringIO
 from carrot.backends.base import BaseMessage
 
 from celery import states
+from celery.app import default_app
 from celery.datastructures import ExceptionInfo
-from celery.defaults import default_app
 from celery.decorators import task as task_dec
 from celery.exceptions import RetryTaskError, NotRegistered
 from celery.log import setup_logger
@@ -125,7 +125,6 @@ class test_TaskRequest(unittest.TestCase):
         self.assertIn("task-frobulated", tw.eventer.sent)
 
     def test_send_email(self):
-        from celery import conf
         from celery.worker import job
         old_mail_admins = default_app.mail_admins
         old_enable_mails = mytask.send_error_emails
@@ -458,8 +457,7 @@ class test_TaskRequest(unittest.TestCase):
         tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
                                  root=False)
 
-        from celery import conf
-        conf.CELERY_SEND_TASK_ERROR_EMAILS = True
+        default_app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
 
         tw.on_failure(exc_info)
         logvalue = logfh.getvalue()
@@ -467,7 +465,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertIn(tid, logvalue)
         self.assertIn("ERROR", logvalue)
 
-        conf.CELERY_SEND_TASK_ERROR_EMAILS = False
+        default_app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
 
     def test_on_failure(self):
         self._test_on_failure(Exception("Inside unit tests"))

+ 1 - 2
celery/tests/utils.py

@@ -7,6 +7,7 @@ from StringIO import StringIO
 
 from nose import SkipTest
 
+from celery.app import default_app
 from celery.utils.functional import wraps
 
 
@@ -70,7 +71,6 @@ from celery.utils import noop
 @contextmanager
 def eager_tasks():
 
-    from celery.defaults import default_app
     prev = default_app.conf.CELERY_ALWAYS_EAGER
     default_app.conf.CELERY_ALWAYS_EAGER = True
 
@@ -83,7 +83,6 @@ def with_eager_tasks(fun):
 
     @wraps(fun)
     def _inner(*args, **kwargs):
-        from celery.defaults import default_app
         prev = default_app.conf.CELERY_ALWAYS_EAGER
         default_app.conf.CELERY_ALWAYS_EAGER = True
         try:

+ 6 - 1
celery/utils/__init__.py

@@ -27,7 +27,6 @@ LOG_LEVELS["FATAL"] = logging.FATAL
 LOG_LEVELS[logging.FATAL] = "FATAL"
 
 
-
 class promise(object):
     """A promise.
 
@@ -386,3 +385,9 @@ def abbrtask(S, max):
         module = abbr(module, max - len(cls), False)
         return module + "[.]" + cls
     return S
+
+
+def isatty(fh):
+    # Fixes bug with mod_wsgi:
+    #   mod_wsgi.Log object has no attribute isatty.
+    return getattr(fh, "isatty", None) and fh.isatty()

+ 6 - 5
celery/utils/info.py

@@ -1,6 +1,6 @@
 import math
 
-from celery.messaging import establish_connection
+from celery.app import app_or_default
 
 QUEUE_FORMAT = """
 . %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
@@ -38,8 +38,9 @@ def format_queues(queues, indent=0):
     return textindent(info, indent=indent)
 
 
-def get_broker_info():
-    broker_connection = establish_connection()
+def get_broker_info(app=None):
+    app = app_or_default(app)
+    broker_connection = app.broker_connection()
 
     carrot_backend = broker_connection.backend_cls
     if carrot_backend and not isinstance(carrot_backend, str):
@@ -61,6 +62,6 @@ def get_broker_info():
             "vhost": vhost}
 
 
-def format_broker_info(info=None):
+def format_broker_info(info=None, app=None):
     """Get message broker connection info string for log dumps."""
-    return BROKER_FORMAT % get_broker_info()
+    return BROKER_FORMAT % get_broker_info(app=app)

+ 0 - 2
celery/utils/mail.py

@@ -6,8 +6,6 @@ try:
 except ImportError:
     from email.MIMEText import MIMEText
 
-from celery.defaults import app_or_default
-
 
 class SendmailWarning(UserWarning):
     """Problem happened while sending the e-mail message."""

+ 1 - 1
celery/worker/__init__.py

@@ -13,7 +13,7 @@ from celery import log
 from celery import registry
 from celery import platform
 from celery import signals
-from celery.defaults import app_or_default
+from celery.app import app_or_default
 from celery.utils import noop, instantiate
 
 from celery.worker import state

+ 14 - 10
celery/worker/control/__init__.py

@@ -1,5 +1,6 @@
 from celery import log
-from celery.messaging import ControlReplyPublisher, with_connection
+from celery.app import app_or_default
+from celery.pidbox import ControlReplyPublisher
 from celery.utils import kwdict
 from celery.worker.control.registry import Panel
 from celery.worker.control import builtins
@@ -10,20 +11,23 @@ class ControlDispatch(object):
     Panel = Panel
     ReplyPublisher = ControlReplyPublisher
 
-    def __init__(self, logger=None, hostname=None, listener=None):
+    def __init__(self, logger=None, hostname=None, listener=None, app=None):
+        self.app = app_or_default(app)
         self.logger = logger or log.get_default_logger()
         self.hostname = hostname
         self.listener = listener
         self.panel = self.Panel(self.logger, self.listener, self.hostname)
 
-    @with_connection
-    def reply(self, data, exchange, routing_key, connection=None,
-            connect_timeout=None):
-        crq = self.ReplyPublisher(connection, exchange=exchange)
-        try:
-            crq.send(data, routing_key=routing_key)
-        finally:
-            crq.close()
+    def reply(self, data, exchange, routing_key, **kwargs):
+
+        def _do_reply(connection=None, connect_timeout=None):
+            crq = self.ReplyPublisher(connection, exchange=exchange)
+            try:
+                crq.send(data, routing_key=routing_key)
+            finally:
+                crq.close()
+
+        self.app.with_default_connection(_do_reply)(**kwargs)
 
     def dispatch_from_message(self, message):
         """Dispatch by using message data received by the broker.

+ 1 - 1
celery/worker/job.py

@@ -8,8 +8,8 @@ from datetime import datetime
 
 from celery import log
 from celery import platform
+from celery.app import app_or_default, default_app
 from celery.datastructures import ExceptionInfo
-from celery.defaults import app_or_default, default_app
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs

+ 11 - 8
celery/worker/listener.py

@@ -80,16 +80,16 @@ import warnings
 
 from carrot.connection import AMQPConnectionException
 
-from celery.defaults import app_or_default
+from celery.app import app_or_default
+from celery.datastructures import SharedCounter
+from celery.events import EventDispatcher
+from celery.exceptions import NotRegistered
+from celery.pidbox import BroadcastConsumer
 from celery.utils import noop, retry_over_time
+
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
-from celery.events import EventDispatcher
-from celery.messaging import establish_connection
-from celery.messaging import get_consumer_set, BroadcastConsumer
-from celery.exceptions import NotRegistered
-from celery.datastructures import SharedCounter
 
 RUN = 0x1
 CLOSE = 0x2
@@ -215,10 +215,11 @@ class CarrotListener(object):
         self.event_dispatcher = None
         self.heart = None
         self.pool = pool
-        self.control_dispatch = ControlDispatch(logger=logger,
+        self.control_dispatch = ControlDispatch(app=self.app,
+                                                logger=logger,
                                                 hostname=self.hostname,
                                                 listener=self)
-        self.queues = queues or self.app.get_queues()
+        self.queues = queues
 
     def start(self):
         """Start the consumer.
@@ -390,6 +391,7 @@ class CarrotListener(object):
 
         self.task_consumer.on_decode_error = self.on_decode_error
         self.broadcast_consumer = BroadcastConsumer(self.connection,
+                                                    app=self.app,
                                                     hostname=self.hostname)
         self.task_consumer.register_callback(self.receive_message)
 
@@ -397,6 +399,7 @@ class CarrotListener(object):
         if self.event_dispatcher:
             self.event_dispatcher.flush()
         self.event_dispatcher = EventDispatcher(self.connection,
+                                                app=self.app,
                                                 hostname=self.hostname,
                                                 enabled=self.send_events)
         self.heart = Heart(self.event_dispatcher)