Selaa lähdekoodia

Global settings removed. Still a little bit of refactoring to be done, but the tests passes now.

Ask Solem 14 vuotta sitten
vanhempi
commit
b442d34c8e
43 muutettua tiedostoa jossa 1122 lisäystä ja 1032 poistoa
  1. 10 10
      celery/apps/beat.py
  2. 20 25
      celery/apps/worker.py
  3. 4 31
      celery/backends/__init__.py
  4. 10 15
      celery/backends/amqp.py
  5. 3 3
      celery/backends/base.py
  6. 7 5
      celery/backends/cache.py
  7. 11 14
      celery/backends/cassandra.py
  8. 5 5
      celery/backends/database.py
  9. 2 6
      celery/backends/mongodb.py
  10. 15 14
      celery/backends/pyredis.py
  11. 8 8
      celery/backends/tyrant.py
  12. 14 10
      celery/beat.py
  13. 3 6
      celery/bin/base.py
  14. 2 2
      celery/bin/celerybeat.py
  15. 3 3
      celery/bin/celeryd.py
  16. 111 262
      celery/conf.py
  17. 1 2
      celery/db/session.py
  18. 134 0
      celery/defaults/__init__.py
  19. 93 0
      celery/defaults/conf.py
  20. 20 5
      celery/events/__init__.py
  21. 2 3
      celery/events/snapshot.py
  22. 11 9
      celery/execute/__init__.py
  23. 4 0
      celery/loaders/base.py
  24. 28 10
      celery/log.py
  25. 56 54
      celery/messaging.py
  26. 9 4
      celery/result.py
  27. 419 403
      celery/task/base.py
  28. 3 5
      celery/task/control.py
  29. 3 4
      celery/task/sets.py
  30. 4 3
      celery/tests/test_backends/test_database.py
  31. 15 11
      celery/tests/test_bin/test_celeryd.py
  32. 4 4
      celery/tests/test_result.py
  33. 1 1
      celery/tests/test_serialization.py
  34. 4 4
      celery/tests/test_task.py
  35. 3 2
      celery/tests/test_task_sets.py
  36. 2 0
      celery/tests/test_worker_control.py
  37. 9 9
      celery/tests/test_worker_job.py
  38. 8 8
      celery/tests/utils.py
  39. 17 30
      celery/utils/mail.py
  40. 22 21
      celery/worker/__init__.py
  41. 2 2
      celery/worker/control/builtins.py
  42. 9 8
      celery/worker/job.py
  43. 11 11
      celery/worker/listener.py

+ 10 - 10
celery/apps/beat.py

@@ -5,6 +5,7 @@ import traceback
 from celery import __version__
 from celery import beat
 from celery import platform
+from celery.defaults import app_or_default
 from celery.log import emergency_error
 from celery.utils import info, LOG_LEVELS
 
@@ -20,16 +21,13 @@ class Beat(object):
     Service = beat.Service
 
     def __init__(self, loglevel=None, logfile=None, schedule=None,
-            max_interval=None, scheduler_cls=None, defaults=None, **kwargs):
+            max_interval=None, scheduler_cls=None, app=None, **kwargs):
         """Starts the celerybeat task scheduler."""
+        self.app = app = app_or_default(app)
 
-        if defaults is None:
-            from celery import conf as defaults
-        self.defaults = defaults
-
-        self.loglevel = loglevel or defaults.CELERYBEAT_LOG_LEVEL
-        self.logfile = logfile or defaults.CELERYBEAT_LOG_FILE
-        self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
+        self.loglevel = loglevel or app.conf.CELERYBEAT_LOG_LEVEL
+        self.logfile = logfile or app.conf.CELERYBEAT_LOG_FILE
+        self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
         self.scheduler_cls = scheduler_cls
         self.max_interval = max_interval
 
@@ -48,14 +46,16 @@ class Beat(object):
     def setup_logging(self):
         from celery import log
         handled = log.setup_logging_subsystem(loglevel=self.loglevel,
-                                              logfile=self.logfile)
+                                              logfile=self.logfile,
+                                              app=self.app)
         if not handled:
             logger = log.get_default_logger(name="celery.beat")
             log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
         return logger
 
     def start_scheduler(self, logger=None):
-        beat = self.Service(logger=logger,
+        beat = self.Service(app=self.app,
+                            logger=logger,
                             max_interval=self.max_interval,
                             scheduler_cls=self.scheduler_cls,
                             schedule_filename=self.schedule)

+ 20 - 25
celery/apps/worker.py

@@ -9,6 +9,7 @@ import warnings
 from celery import __version__
 from celery import platform
 from celery import signals
+from celery.defaults import app_or_default
 from celery.exceptions import ImproperlyConfigured
 from celery.routes import Router
 from celery.task import discard_all
@@ -42,29 +43,27 @@ class Worker(object):
             hostname=None, discard=False, run_clockservice=False,
             schedule=None, task_time_limit=None, task_soft_time_limit=None,
             max_tasks_per_child=None, queues=None, events=False, db=None,
-            include=None, defaults=None, **kwargs):
-        if defaults is None:
-            from celery import conf
-            defaults = conf
-        self.defaults = defaults
+            include=None, app=None, **kwargs):
+        self.app = app = app_or_default(app)
         self.concurrency = (concurrency or
-                            defaults.CELERYD_CONCURRENCY or
+                            app.conf.CELERYD_CONCURRENCY or
                             multiprocessing.cpu_count())
-        self.loglevel = loglevel or defaults.CELERYD_LOG_LEVEL
-        self.logfile = logfile or defaults.CELERYD_LOG_FILE
+        self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
+        self.logfile = logfile or app.conf.CELERYD_LOG_FILE
         self.hostname = hostname or socket.gethostname()
         self.discard = discard
         self.run_clockservice = run_clockservice
-        self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
+        self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
         self.events = events
         self.task_time_limit = (task_time_limit or
-                                defaults.CELERYD_TASK_TIME_LIMIT)
+                                app.conf.CELERYD_TASK_TIME_LIMIT)
         self.task_soft_time_limit = (task_soft_time_limit or
-                                     defaults.CELERYD_TASK_SOFT_TIME_LIMIT)
+                                     app.conf.CELERYD_TASK_SOFT_TIME_LIMIT)
         self.max_tasks_per_child = (max_tasks_per_child or
-                                    defaults.CELERYD_MAX_TASKS_PER_CHILD)
+                                    app.conf.CELERYD_MAX_TASKS_PER_CHILD)
         self.db = db
         self.use_queues = queues or []
+        self.queues = None
         self.include = include or []
         self._isatty = sys.stdout.isatty()
 
@@ -83,7 +82,7 @@ class Worker(object):
         self.redirect_stdouts_to_logger()
         print("celery@%s v%s is starting." % (self.hostname, __version__))
 
-        if getattr(self.settings, "DEBUG", False):
+        if getattr(self.settings, "DEBUG", False): # XXX
             warnings.warn("Using settings.DEBUG leads to a memory leak, "
                     "never use this setting in a production environment!")
 
@@ -102,16 +101,14 @@ class Worker(object):
         print("celery@%s has started." % self.hostname)
 
     def init_queues(self):
-        conf = self.defaults
-        from celery.conf import prepare_queues
-        queues = prepare_queues(conf.QUEUES, conf)
+        queues = self.app.get_queues()
         if self.use_queues:
             queues = dict((queue, options)
                                 for queue, options in queues.items()
                                     if queue in self.use_queues)
             for queue in self.use_queues:
                 if queue not in queues:
-                    if conf.CREATE_MISSING_QUEUES:
+                    if self.app.conf.CELERY_CREATE_MISSING_QUEUES:
                         Router(queues=queues).add_queue(queue)
                     else:
                         raise ImproperlyConfigured(
@@ -119,18 +116,15 @@ class Worker(object):
         self.queues = queues
 
     def init_loader(self):
-        from celery.loaders import current_loader, load_settings
-        self.loader = current_loader()
-        self.settings = load_settings()
-        if not self.loader.configured:
-            raise ImproperlyConfigured(
-                    "Celery needs to be configured to run celeryd.")
+        self.loader = self.app.loader
+        self.settings = self.app.conf
         map(self.loader.import_module, self.include)
 
     def redirect_stdouts_to_logger(self):
         from celery import log
         handled = log.setup_logging_subsystem(loglevel=self.loglevel,
-                                              logfile=self.logfile)
+                                              logfile=self.logfile,
+                                              app=self.app)
         # Redirect stdout/stderr to our logger.
         if not handled:
             logger = log.get_default_logger()
@@ -175,7 +169,8 @@ class Worker(object):
         }
 
     def run_worker(self):
-        worker = self.WorkController(concurrency=self.concurrency,
+        worker = self.WorkController(app=self.app,
+                                concurrency=self.concurrency,
                                 loglevel=self.loglevel,
                                 logfile=self.logfile,
                                 hostname=self.hostname,

+ 4 - 31
celery/backends/__init__.py

@@ -1,7 +1,6 @@
-from celery import conf
+from celery.defaults import default_app
 from celery.utils import get_cls_by_name
 from celery.utils.functional import curry
-from celery.loaders import current_loader
 
 BACKEND_ALIASES = {
     "amqp": "celery.backends.amqp.AMQPBackend",
@@ -16,36 +15,10 @@ BACKEND_ALIASES = {
 _backend_cache = {}
 
 
-def get_backend_cls(backend):
+def get_backend_cls(backend, loader=None):
     """Get backend class by name/alias"""
+    loader = loader or default_app.loader
     if backend not in _backend_cache:
-        aliases = dict(BACKEND_ALIASES, **current_loader().override_backends)
+        aliases = dict(BACKEND_ALIASES, **loader.override_backends)
         _backend_cache[backend] = get_cls_by_name(backend, aliases)
     return _backend_cache[backend]
-
-
-"""
-.. function:: get_default_backend_cls()
-
-    Get the backend class specified in the ``CELERY_RESULT_BACKEND`` setting.
-
-"""
-get_default_backend_cls = curry(get_backend_cls, conf.RESULT_BACKEND)
-
-
-"""
-.. class:: DefaultBackend
-
-    The default backend class used for storing task results and status,
-    specified in the ``CELERY_RESULT_BACKEND`` setting.
-
-"""
-DefaultBackend = get_default_backend_cls()
-
-"""
-.. data:: default_backend
-
-    An instance of :class:`DefaultBackend`.
-
-"""
-default_backend = DefaultBackend()

+ 10 - 15
celery/backends/amqp.py

@@ -7,7 +7,6 @@ from datetime import timedelta
 
 from carrot.messaging import Consumer, Publisher
 
-from celery import conf
 from celery import states
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import TimeoutError
@@ -20,11 +19,6 @@ class AMQResultWarning(UserWarning):
 
 
 class ResultPublisher(Publisher):
-    exchange = conf.RESULT_EXCHANGE
-    exchange_type = conf.RESULT_EXCHANGE_TYPE
-    delivery_mode = conf.RESULT_PERSISTENT and 2 or 1
-    serializer = conf.RESULT_SERIALIZER
-    durable = conf.RESULT_PERSISTENT
     auto_delete = False
 
     def __init__(self, connection, task_id, **kwargs):
@@ -34,9 +28,6 @@ class ResultPublisher(Publisher):
 
 
 class ResultConsumer(Consumer):
-    exchange = conf.RESULT_EXCHANGE
-    exchange_type = conf.RESULT_EXCHANGE_TYPE
-    durable = conf.RESULT_PERSISTENT
     no_ack = True
     auto_delete = False
 
@@ -62,22 +53,25 @@ class AMQPBackend(BaseDictBackend):
     _connection = None
 
     def __init__(self, connection=None, exchange=None, exchange_type=None,
-            persistent=None, serializer=None, auto_delete=None,
+            persistent=None, serializer=None, auto_delete=False,
             expires=None, **kwargs):
+        super(AMQPBackend, self).__init__(**kwargs)
+        conf = self.app.conf
         self._connection = connection
-        self.exchange = exchange
-        self.exchange_type = exchange_type
+        self.exchange = exchange or conf.CELERY_RESULT_EXCHANGE
+        self.exchange_type = exchange_type or conf.CELERY_RESULT_EXCHANGE_TYPE
+        if persistent is None:
+            persistent = conf.CELERY_RESULT_PERSISTENT
         self.persistent = persistent
-        self.serializer = serializer
+        self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
         self.auto_delete = auto_delete
         self.expires = expires
         if self.expires is None:
-            self.expires = conf.TASK_RESULT_EXPIRES
+            self.expires = conf.CELERY_TASK_RESULT_EXPIRES
         if isinstance(self.expires, timedelta):
             self.expires = timeutils.timedelta_seconds(self.expires)
         if self.expires is not None:
             self.expires = int(self.expires)
-        super(AMQPBackend, self).__init__(**kwargs)
 
     def _create_publisher(self, task_id, connection):
         delivery_mode = self.persistent and 2 or 1
@@ -89,6 +83,7 @@ class AMQPBackend(BaseDictBackend):
                                exchange=self.exchange,
                                exchange_type=self.exchange_type,
                                delivery_mode=delivery_mode,
+                               durable=self.persistent,
                                serializer=self.serializer,
                                auto_delete=self.auto_delete)
 

+ 3 - 3
celery/backends/base.py

@@ -1,7 +1,6 @@
 """celery.backends.base"""
 import time
 
-from celery import conf
 from celery import states
 from celery.exceptions import TimeoutError, TaskRevokedError
 from celery.serialization import pickle, get_pickled_exception
@@ -19,7 +18,8 @@ class BaseBackend(object):
     TimeoutError = TimeoutError
 
     def __init__(self, *args, **kwargs):
-        pass
+        from celery.defaults import app_or_default
+        self.app = app_or_default(kwargs.get("app"))
 
     def encode_result(self, result, status):
         if status in self.EXCEPTION_STATES:
@@ -144,7 +144,7 @@ class BaseDictBackend(BaseBackend):
     def __init__(self, *args, **kwargs):
         super(BaseDictBackend, self).__init__(*args, **kwargs)
         self._cache = LocalCache(limit=kwargs.get("max_cached_results") or
-                                 conf.MAX_CACHED_RESULTS)
+                                 self.app.conf.CELERY_MAX_CACHED_RESULTS)
 
     def store_result(self, task_id, result, status, traceback=None):
         """Store task result and status."""

+ 7 - 5
celery/backends/cache.py

@@ -2,7 +2,6 @@ from datetime import timedelta
 
 from carrot.utils import partition
 
-from celery import conf
 from celery.backends.base import KeyValueStoreBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.utils import timeutils
@@ -47,13 +46,16 @@ backends = {"memcache": get_best_memcache,
 class CacheBackend(KeyValueStoreBackend):
     _client = None
 
-    def __init__(self, expires=conf.TASK_RESULT_EXPIRES,
-            backend=conf.CACHE_BACKEND, options={}, **kwargs):
+    def __init__(self, expires=None, backend=None, options={}, **kwargs):
         super(CacheBackend, self).__init__(self, **kwargs)
+
         if isinstance(expires, timedelta):
             expires = timeutils.timedelta_seconds(expires)
-        self.expires = expires
-        self.options = dict(conf.CACHE_BACKEND_OPTIONS, **options)
+        self.expires = expires or self.app.conf.CELERY_TASK_RESULT_EXPIRES
+        self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
+                            **options)
+
+        backend = backend or self.app.conf.CELERY_CACHE_BACKEND
         self.backend, _, servers = partition(backend, "://")
         self.servers = servers.split(";")
         try:

+ 11 - 14
celery/backends/cassandra.py

@@ -14,9 +14,7 @@ import time
 from datetime import datetime
 
 from celery.backends.base import BaseDictBackend
-from celery import conf
 from celery.exceptions import ImproperlyConfigured
-from celery.loaders import load_settings
 from celery.log import setup_logger
 from celery.serialization import pickle
 from celery import states
@@ -50,34 +48,33 @@ class CassandraBackend(BaseDictBackend):
         the ``CASSANDRA_SERVERS`` setting is not set.
 
         """
-        self.logger = setup_logger("celery.backends.cassandra")
+        super(CassandraBackend, self).__init__(**kwargs)
+        self.logger = setup_logger(name="celery.backends.cassandra",
+                                   app=self.app)
 
         self.result_expires = kwargs.get("result_expires") or \
-                                conf.TASK_RESULT_EXPIRES
+                                self.app.conf.CELERY_TASK_RESULT_EXPIRES
 
         if not pycassa:
             raise ImproperlyConfigured(
                     "You need to install the pycassa library to use the "
                     "Cassandra backend. See http://github.com/vomjom/pycassa")
 
-        settings = load_settings()
-
         self.servers = servers or \
-                         getattr(settings, "CASSANDRA_SERVERS", self.servers)
+                        self.app.conf.get("CASSANDRA_SERVERS", self.servers)
         self.keyspace = keyspace or \
-                          getattr(settings, "CASSANDRA_KEYSPACE",
-                                  self.keyspace)
+                            self.app.conf.get("CASSANDRA_KEYSPACE",
+                                              self.keyspace)
         self.column_family = column_family or \
-                               getattr(settings, "CASSANDRA_COLUMN_FAMILY",
-                                       self.column_family)
+                                self.app.conf.get("CASSANDRA_COLUMN_FAMILY",
+                                                  self.column_family)
         self.cassandra_options = dict(cassandra_options or {},
-                                   **getattr(settings,
-                                             "CASSANDRA_OPTIONS", {}))
+                                   **self.app.conf.get("CASSANDRA_OPTIONS",
+                                                       {}))
         if not self.servers or not self.keyspace or not self.column_family:
             raise ImproperlyConfigured(
                     "Cassandra backend not configured.")
 
-        super(CassandraBackend, self).__init__()
         self._column_family = None
 
     def _retry_on_error(func):

+ 5 - 5
celery/backends/database.py

@@ -1,6 +1,5 @@
 from datetime import datetime
 
-from celery import conf
 from celery.backends.base import BaseDictBackend
 from celery.db.models import Task, TaskSet
 from celery.db.session import ResultSession
@@ -20,16 +19,17 @@ class DatabaseBackend(BaseDictBackend):
 
     def __init__(self, dburi=None, result_expires=None,
             engine_options=None, **kwargs):
-        self.result_expires = result_expires or conf.TASK_RESULT_EXPIRES
-        self.dburi = dburi or conf.RESULT_DBURI
+        super(DatabaseBackend, self).__init__(**kwargs)
+        self.result_expires = result_expires or \
+                                self.app.conf.CELERY_TASK_RESULT_EXPIRES
+        self.dburi = dburi or self.app.conf.CELERY_RESULT_DBURI
         self.engine_options = dict(engine_options or {},
-                                   **conf.RESULT_ENGINE_OPTIONS or {})
+                        **self.app.conf.CELERY_RESULT_ENGINE_OPTIONS or {})
         if not self.dburi:
             raise ImproperlyConfigured(
                     "Missing connection string! Do you have "
                     "CELERY_RESULT_DBURI set to a real value?")
 
-        super(DatabaseBackend, self).__init__(**kwargs)
 
     def ResultSession(self):
         return ResultSession(dburi=self.dburi, **self.engine_options)

+ 2 - 6
celery/backends/mongodb.py

@@ -6,9 +6,7 @@ try:
 except ImportError:
     pymongo = None
 
-from celery import conf
 from celery import states
-from celery.loaders import load_settings
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.serialization import pickle
@@ -36,16 +34,14 @@ class MongoBackend(BaseDictBackend):
 
         """
         self.result_expires = kwargs.get("result_expires") or \
-                                conf.TASK_RESULT_EXPIRES
+                                self.app.conf.CELERY_TASK_RESULT_EXPIRES
 
         if not pymongo:
             raise ImproperlyConfigured(
                 "You need to install the pymongo library to use the "
                 "MongoDB backend.")
 
-        settings = load_settings()
-
-        config = getattr(settings, "CELERY_MONGODB_BACKEND_SETTINGS", None)
+        config = self.app.conf.get("CELERY_MONGODB_BACKEND_SETTINGS", None)
         if config is not None:
             if not isinstance(config, dict):
                 raise ImproperlyConfigured(

+ 15 - 14
celery/backends/pyredis.py

@@ -1,7 +1,6 @@
 import warnings
 
 
-from celery.loaders import load_settings
 from celery.backends.base import KeyValueStoreBackend
 from celery.exceptions import ImproperlyConfigured
 
@@ -42,25 +41,28 @@ class RedisBackend(KeyValueStoreBackend):
             redis_timeout=None,
             redis_password=None,
             redis_connect_retry=None,
-            redis_connect_timeout=None):
+            redis_connect_timeout=None, **kwargs):
+        super(RedisBackend, self).__init__(**kwargs)
         if redis is None:
             raise ImproperlyConfigured(
                     "You need to install the redis library in order to use "
                   + "Redis result store backend.")
 
-        settings = load_settings()
-        self.redis_host = redis_host or \
-                            getattr(settings, "REDIS_HOST", self.redis_host)
-        self.redis_port = redis_port or \
-                            getattr(settings, "REDIS_PORT", self.redis_port)
-        self.redis_db = redis_db or \
-                            getattr(settings, "REDIS_DB", self.redis_db)
-        self.redis_password = redis_password or \
-                            getattr(settings, "REDIS_PASSWORD",
-                                    self.redis_password)
+        self.redis_host = (redis_host or
+                           self.app.conf.get("REDIS_HOST") or
+                           self.redis_host)
+        self.redis_port = (redis_port or
+                           self.app.conf.get("REDIS_PORT") or
+                           self.redis_port)
+        self.redis_db = (redis_db or
+                         self.app.conf.get("REDIS_DB") or
+                         self.redis_db)
+        self.redis_password = (redis_password or
+                               self.app.conf.get("REDIS_PASSWORD") or
+                               self.redis_password)
 
         for setting_name in self.deprecated_settings:
-            if getattr(settings, setting_name, None) is not None:
+            if self.app.conf.get(setting_name) is not None:
                 warnings.warn(
                     "The setting '%s' is no longer supported by the "
                     "python Redis client!" % setting_name.upper(),
@@ -72,7 +74,6 @@ class RedisBackend(KeyValueStoreBackend):
             raise ImproperlyConfigured(
                 "In order to use the Redis result store backend, you have to "
                 "set the REDIS_HOST and REDIS_PORT settings")
-        super(RedisBackend, self).__init__()
         self._connection = None
 
     def open(self):

+ 8 - 8
celery/backends/tyrant.py

@@ -4,7 +4,6 @@ try:
 except ImportError:
     pytyrant = None
 
-from celery.loaders import load_settings
 from celery.backends.base import KeyValueStoreBackend
 from celery.exceptions import ImproperlyConfigured
 
@@ -24,30 +23,31 @@ class TyrantBackend(KeyValueStoreBackend):
     tyrant_host = None
     tyrant_port = None
 
-    def __init__(self, tyrant_host=None, tyrant_port=None):
+    def __init__(self, tyrant_host=None, tyrant_port=None, **kwargs):
         """Initialize Tokyo Tyrant backend instance.
 
         Raises :class:`celery.exceptions.ImproperlyConfigured` if
         :setting:`TT_HOST` or :setting:`TT_PORT` is not set.
 
         """
+        super(TyrantBackend, self).__init__(**kwargs)
 
         if not pytyrant:
             raise ImproperlyConfigured(
                     "You need to install the pytyrant library to use the "
                   + "Tokyo Tyrant backend.")
-        settings = load_settings()
-        self.tyrant_host = tyrant_host or \
-                            getattr(settings, "TT_HOST", self.tyrant_host)
-        self.tyrant_port = tyrant_port or \
-                            getattr(settings, "TT_PORT", self.tyrant_port)
+        self.tyrant_host = (tyrant_host or
+                            self.app.conf.get("TT_HOST") or
+                            self.tyrant_host)
+        self.tyrant_port = (tyrant_port or
+                            self.app.conf.get("TT_PORT") or
+                            self.tyrant_port)
         if self.tyrant_port:
             self.tyrant_port = int(self.tyrant_port)
         if not self.tyrant_host or not self.tyrant_port:
             raise ImproperlyConfigured(
                 "To use the Tokyo Tyrant backend, you have to "
                 "set the TT_HOST and TT_PORT settings in your settings.py")
-        super(TyrantBackend, self).__init__()
         self._connection = None
 
     def open(self):

+ 14 - 10
celery/beat.py

@@ -11,8 +11,8 @@ from datetime import datetime
 from UserDict import UserDict
 
 from celery import log
-from celery import conf
 from celery import platform
+from celery.defaults import app_or_default
 from celery.execute import send_task
 from celery.schedules import maybe_schedule
 from celery.messaging import establish_connection
@@ -132,10 +132,12 @@ class Scheduler(UserDict):
     Entry = ScheduleEntry
 
     def __init__(self, schedule=None, logger=None, max_interval=None,
-            **kwargs):
+            app=None, **kwargs):
         UserDict.__init__(self)
         if schedule is None:
             schedule = {}
+        self.app = app_or_default(app)
+        conf = self.app.conf
         self.data = schedule
         self.logger = logger or log.get_default_logger(name="celery.beat")
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
@@ -250,7 +252,7 @@ class PersistentScheduler(Scheduler):
     def setup_schedule(self):
         self._store = self.persistence.open(self.schedule_filename)
         self.data = self._store
-        self.merge_inplace(conf.CELERYBEAT_SCHEDULE)
+        self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
         self.sync()
         self.data = self._store
 
@@ -268,15 +270,16 @@ class Service(object):
     scheduler_cls = PersistentScheduler
 
     def __init__(self, logger=None,
-            max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
-            schedule=conf.CELERYBEAT_SCHEDULE,
-            schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
-            scheduler_cls=None):
-        self.max_interval = max_interval
+            max_interval=None, schedule=None, schedule_filename=None,
+            scheduler_cls=None, app=None):
+        self.app = app_or_default(app)
+        self.max_interval = max_interval or \
+                            self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
         self.logger = logger or log.get_default_logger(name="celery.beat")
-        self.schedule = schedule
-        self.schedule_filename = schedule_filename
+        self.schedule = schedule or self.app.conf.CELERYBEAT_SCHEDULE
+        self.schedule_filename = schedule_filename or \
+                                    self.app.conf.CELERYBEAT_SCHEDULE_FILENAME
 
         self._scheduler = None
         self._shutdown = threading.Event()
@@ -320,6 +323,7 @@ class Service(object):
         if self._scheduler is None:
             filename = self.schedule_filename
             self._scheduler = instantiate(self.scheduler_cls,
+                                          app=self.app,
                                           schedule_filename=filename,
                                           logger=self.logger,
                                           max_interval=self.max_interval)

+ 3 - 6
celery/bin/base.py

@@ -4,6 +4,7 @@ import sys
 from optparse import OptionParser, make_option as Option
 
 from celery import __version__
+from celery.defaults import app_or_default
 
 
 class Command(object):
@@ -13,12 +14,8 @@ class Command(object):
 
     Parser = OptionParser
 
-    def __init__(self, defaults=None):
-        self.defaults = defaults
-
-        if self.defaults is None:
-            from celery import conf
-            self.defaults = conf
+    def __init__(self, app=None):
+        self.app = app_or_default(app)
 
     def parse_options(self, prog_name, arguments):
         """Parse the available options."""

+ 2 - 2
celery/bin/celerybeat.py

@@ -29,11 +29,11 @@ class BeatCommand(Command):
 
     def run(self, *args, **kwargs):
         from celery.apps.beat import Beat
-        kwargs["defaults"] = self.defaults
+        kwargs["app"] = self.app
         return Beat(*args, **kwargs).run()
 
     def get_options(self):
-        conf = self.defaults
+        conf = self.app.conf
 
         return (
             Option('-s', '--schedule',

+ 3 - 3
celery/bin/celeryd.py

@@ -77,11 +77,11 @@ class WorkerCommand(Command):
 
     def run(self, *args, **kwargs):
         from celery.apps.worker import Worker
-        kwargs["defaults"] = self.defaults
+        kwargs["app"] = self.app
         return Worker(*args, **kwargs).run()
 
     def get_options(self):
-        conf = self.defaults
+        conf = self.app.conf
         return (
             Option('-c', '--concurrency',
                 default=conf.CELERYD_CONCURRENCY,
@@ -119,7 +119,7 @@ class WorkerCommand(Command):
                 help="Path to the state database. The extension '.db' will "
                      "be appended to the filename. Default: %s" % (
                         conf.CELERYD_STATE_DB, )),
-            Option('-E', '--events', default=conf.SEND_EVENTS,
+            Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
                 action="store_true", dest="events",
                 help="Send events so the worker can be monitored by "
                      "celeryev, celerymon and other monitors.."),

+ 111 - 262
celery/conf.py

@@ -1,272 +1,121 @@
-import sys
-import logging
-import warnings
-from datetime import timedelta
-
-from celery import routes
-from celery.loaders import load_settings
-from celery.utils import LOG_LEVELS
-
-DEFAULT_PROCESS_LOG_FMT = """
-    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
-""".strip()
-DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
-DEFAULT_TASK_LOG_FMT = " ".join("""
-    [%(asctime)s: %(levelname)s/%(processName)s]
-    [%(task_name)s(%(task_id)s)] %(message)s
-""".strip().split())
-
-settings = load_settings()
-
-_DEFAULTS = {
-    "BROKER_CONNECTION_TIMEOUT": 4,
-    "BROKER_CONNECTION_RETRY": True,
-    "BROKER_CONNECTION_MAX_RETRIES": 100,
-    "BROKER_HOST": "localhost",
-    "BROKER_PORT": None,
-    "BROKER_USER": "guest",
-    "BROKER_PASSWORD": "guest",
-    "BROKER_VHOST": "/",
-    "CELERY_RESULT_BACKEND": "database",
-    "CELERY_ALWAYS_EAGER": False,
-    "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
-    "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
-    "CELERY_SEND_EVENTS": False,
-    "CELERY_IGNORE_RESULT": False,
-    "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
-    "CELERY_TASK_SERIALIZER": "pickle",
-    "CELERY_DISABLE_RATE_LIMITS": False,
-    "CELERYD_TASK_TIME_LIMIT": None,
-    "CELERYD_TASK_SOFT_TIME_LIMIT": None,
-    "CELERYD_MAX_TASKS_PER_CHILD": None,
-    "CELERY_ROUTES": None,
-    "CELERY_CREATE_MISSING_QUEUES": True,
-    "CELERY_DEFAULT_ROUTING_KEY": "celery",
-    "CELERY_DEFAULT_QUEUE": "celery",
-    "CELERY_DEFAULT_EXCHANGE": "celery",
-    "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
-    "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
-    "CELERY_ACKS_LATE": False,
-    "CELERYD_POOL_PUTLOCKS": True,
-    "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
-    "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
-    "CELERYD_ETA_SCHEDULER": "celery.utils.timer2.Timer",
-    "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
-    "CELERYD_CONCURRENCY": 0, # defaults to cpu count
-    "CELERYD_PREFETCH_MULTIPLIER": 4,
-    "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
-    "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
-    "CELERYD_LOG_COLOR": False,
-    "CELERYD_LOG_LEVEL": "WARN",
-    "CELERYD_LOG_FILE": None, # stderr
-    "CELERYBEAT_SCHEDULE": {},
-    "CELERYD_STATE_DB": None,
-    "CELERYD_ETA_SCHEDULER_PRECISION": 1,
-    "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
-    "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
-    "CELERYBEAT_LOG_LEVEL": "INFO",
-    "CELERYBEAT_LOG_FILE": None, # stderr
-    "CELERYMON_LOG_LEVEL": "INFO",
-    "CELERYMON_LOG_FILE": None, # stderr
-    "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
-    "CELERY_BROADCAST_QUEUE": "celeryctl",
-    "CELERY_BROADCAST_EXCHANGE": "celeryctl",
-    "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
-    "CELERY_EVENT_QUEUE": "celeryevent",
-    "CELERY_EVENT_EXCHANGE": "celeryevent",
-    "CELERY_EVENT_EXCHANGE_TYPE": "direct",
-    "CELERY_EVENT_ROUTING_KEY": "celeryevent",
-    "CELERY_EVENT_SERIALIZER": "json",
-    "CELERY_RESULT_EXCHANGE": "celeryresults",
-    "CELERY_RESULT_EXCHANGE_TYPE": "direct",
-    "CELERY_RESULT_SERIALIZER": "pickle",
-    "CELERY_RESULT_PERSISTENT": False,
-    "CELERY_MAX_CACHED_RESULTS": 5000,
-    "CELERY_TRACK_STARTED": False,
-
-    # Default e-mail settings.
-    "SERVER_EMAIL": "celery@localhost",
-    "EMAIL_HOST": "localhost",
-    "EMAIL_PORT": 25,
-    "ADMINS": (),
-}
-
-
-def isatty(fh):
-    # Fixes bug with mod_wsgi:
-    #   mod_wsgi.Log object has no attribute isatty.
-    return getattr(fh, "isatty", None) and fh.isatty()
-
-
-_DEPRECATION_FMT = """
-%s is deprecated in favor of %s and is scheduled for removal in celery v1.4.
-""".strip()
-
-def prepare(m, source=settings, defaults=_DEFAULTS):
-
-    def _get(name, default=None, compat=None):
-        compat = compat or []
-        if default is None:
-            default = defaults.get(name)
-        compat = [name] + compat
-        for i, alias in enumerate(compat):
-            try:
-                value = getattr(source, alias)
-                i > 0 and warnings.warn(
-                        DeprecationWarning(_DEPRECATION_FMT % (alias, name)))
-                return value
-            except AttributeError:
-                pass
-        return default
-
-    # <--- Task                                        <-   --   --- - ----- -- #
-    m.ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
-    m.EAGER_PROPAGATES_EXCEPTIONS = _get("CELERY_EAGER_PROPAGATES_EXCEPTIONS")
-    m.RESULT_BACKEND = _get("CELERY_RESULT_BACKEND", compat=["CELERY_BACKEND"])
-    m.CELERY_BACKEND = RESULT_BACKEND # FIXME Remove in 1.4
-    m.CACHE_BACKEND = _get("CELERY_CACHE_BACKEND") or _get("CACHE_BACKEND")
-    m.CACHE_BACKEND_OPTIONS = _get("CELERY_CACHE_BACKEND_OPTIONS") or {}
-    m.TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
-    m.TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
-    m.IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
-    m.TRACK_STARTED = _get("CELERY_TRACK_STARTED")
-    m.ACKS_LATE = _get("CELERY_ACKS_LATE")
-
-    # Make sure TASK_RESULT_EXPIRES is a timedelta.
-    if isinstance(m.TASK_RESULT_EXPIRES, int):
-        m.TASK_RESULT_EXPIRES = timedelta(seconds=m.TASK_RESULT_EXPIRES)
-
-    # <--- SQLAlchemy                                  <-   --   --- - ----- -- #
-    m.RESULT_DBURI = _get("CELERY_RESULT_DBURI")
-    m.RESULT_ENGINE_OPTIONS = _get("CELERY_RESULT_ENGINE_OPTIONS")
-
-    # <--- Client                                      <-   --   --- - ----- -- #
-
-    m.MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
-
-    # <--- Worker                                      <-   --   --- - ----- -- #
-
-    m.SEND_EVENTS = _get("CELERY_SEND_EVENTS")
-    m.DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
-    m.DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
-    m.CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
-    m.CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
-    m.CELERYD_MAX_TASKS_PER_CHILD = _get("CELERYD_MAX_TASKS_PER_CHILD")
-    m.STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
-    m.CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
-                                           False,
-                                    compat=["SEND_CELERY_TASK_ERROR_EMAILS"])
-    m.CELERY_TASK_ERROR_WHITELIST = _get("CELERY_TASK_ERROR_WHITELIST")
-    m.CELERYD_LOG_FORMAT = _get("CELERYD_LOG_FORMAT",
-                          compat=["CELERYD_DAEMON_LOG_FORMAT"])
-    m.CELERYD_TASK_LOG_FORMAT = _get("CELERYD_TASK_LOG_FORMAT")
-    m.CELERYD_LOG_FILE = _get("CELERYD_LOG_FILE")
-    m.CELERYD_LOG_COLOR = _get("CELERYD_LOG_COLOR",
-                       CELERYD_LOG_FILE is None and isatty(sys.stderr))
-    m.CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
-                            compat=["CELERYD_DAEMON_LOG_LEVEL"])
-    if not isinstance(m.CELERYD_LOG_LEVEL, int):
-        m.CELERYD_LOG_LEVEL = LOG_LEVELS[m.CELERYD_LOG_LEVEL.upper()]
-    m.CELERYD_STATE_DB = _get("CELERYD_STATE_DB")
-    m.CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
-    m.CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
-    m.CELERYD_POOL_PUTLOCKS = _get("CELERYD_POOL_PUTLOCKS")
-
-    m.CELERYD_POOL = _get("CELERYD_POOL")
-    m.CELERYD_LISTENER = _get("CELERYD_LISTENER")
-    m.CELERYD_MEDIATOR = _get("CELERYD_MEDIATOR")
-    m.CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
-    m.CELERYD_ETA_SCHEDULER_PRECISION = _get("CELERYD_ETA_SCHEDULER_PRECISION")
-
-    # :--- Email settings                               <-   --   --- - ----- -- #
-    m.ADMINS = _get("ADMINS")
-    m.SERVER_EMAIL = _get("SERVER_EMAIL")
-    m.EMAIL_HOST = _get("EMAIL_HOST")
-    m.EMAIL_HOST_USER = _get("EMAIL_HOST_USER")
-    m.EMAIL_HOST_PASSWORD = _get("EMAIL_HOST_PASSWORD")
-    m.EMAIL_PORT = _get("EMAIL_PORT")
-
-    # :--- Broker connections                           <-   --   --- - ----- -- #
-    m.BROKER_HOST = _get("BROKER_HOST")
-    m.BROKER_PORT = _get("BROKER_PORT")
-    m.BROKER_USER = _get("BROKER_USER")
-    m.BROKER_PASSWORD = _get("BROKER_PASSWORD")
-    m.BROKER_VHOST = _get("BROKER_VHOST")
-    m.BROKER_USE_SSL = _get("BROKER_USE_SSL")
-    m.BROKER_INSIST = _get("BROKER_INSIST")
-    m.BROKER_CONNECTION_TIMEOUT = _get("BROKER_CONNECTION_TIMEOUT",
-                                compat=["CELERY_BROKER_CONNECTION_TIMEOUT"])
-    m.BROKER_CONNECTION_RETRY = _get("BROKER_CONNECTION_RETRY",
-                                compat=["CELERY_BROKER_CONNECTION_RETRY"])
-    m.BROKER_CONNECTION_MAX_RETRIES = _get("BROKER_CONNECTION_MAX_RETRIES",
-                            compat=["CELERY_BROKER_CONNECTION_MAX_RETRIES"])
-    m.BROKER_BACKEND = _get("BROKER_TRANSPORT") or \
-                            _get("BROKER_BACKEND") or \
-                                _get("CARROT_BACKEND")
-
-    # <--- Message routing                             <-   --   --- - ----- -- #
-    m.DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
-    m.DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
-    m.DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
-    m.DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
-    m.DEFAULT_DELIVERY_MODE = _get("CELERY_DEFAULT_DELIVERY_MODE")
-    m.QUEUES = _get("CELERY_QUEUES") or {DEFAULT_QUEUE: {
-                                       "exchange": DEFAULT_EXCHANGE,
-                                       "exchange_type": DEFAULT_EXCHANGE_TYPE,
-                                       "binding_key": DEFAULT_ROUTING_KEY}}
-    m.CREATE_MISSING_QUEUES = _get("CELERY_CREATE_MISSING_QUEUES")
-    m.ROUTES = routes.prepare(_get("CELERY_ROUTES") or [])
-    # :--- Broadcast queue settings                     <-   --   --- - ----- -- #
-
-    m.BROADCAST_QUEUE = _get("CELERY_BROADCAST_QUEUE")
-    m.BROADCAST_EXCHANGE = _get("CELERY_BROADCAST_EXCHANGE")
-    m.BROADCAST_EXCHANGE_TYPE = _get("CELERY_BROADCAST_EXCHANGE_TYPE")
+"""
+
+**DEPRECATED**
+
+Use :mod:`celery.defaults` instead.
+
+
+"""
+from celery.defaults.conf import DEFAULTS as _DEFAULTS
+from celery.defaults import default_app
+
+conf = default_app.conf
+
+ALWAYS_EAGER = conf.CELERY_ALWAYS_EAGER
+EAGER_PROPAGATES_EXCEPTIONS = conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS
+RESULT_BACKEND = conf.CELERY_RESULT_BACKEND
+CACHE_BACKEND = conf.CELERY_CACHE_BACKEND
+CACHE_BACKEND_OPTIONS = conf.CELERY_CACHE_BACKEND_OPTIONS
+TASK_SERIALIZER = conf.CELERY_TASK_SERIALIZER
+TASK_RESULT_EXPIRES = conf.CELERY_TASK_RESULT_EXPIRES
+IGNORE_RESULT = conf.CELERY_IGNORE_RESULT
+TRACK_STARTED = conf.CELERY_TRACK_STARTED
+ACKS_LATE = conf.CELERY_ACKS_LATE
+
+# <--- SQLAlchemy                                  <-   --   --- - ----- -- #
+
+RESULT_DBURI = conf.CELERY_RESULT_DBURI
+RESULT_ENGINE_OPTIONS = conf.CELERY_RESULT_ENGINE_OPTIONS
+
+# <--- Client                                      <-   --   --- - ----- -- #
+
+MAX_CACHED_RESULTS = conf.CELERY_MAX_CACHED_RESULTS
+
+# <--- Worker                                      <-   --   --- - ----- -- #
+
+SEND_EVENTS = conf.CELERY_SEND_EVENTS
+DEFAULT_RATE_LIMIT = conf.CELERY_DEFAULT_RATE_LIMIT
+DISABLE_RATE_LIMITS = conf.CELERY_DISABLE_RATE_LIMITS
+CELERYD_TASK_TIME_LIMIT = conf.CELERYD_TASK_TIME_LIMIT
+CELERYD_TASK_SOFT_TIME_LIMIT = conf.CELERYD_TASK_SOFT_TIME_LIMIT
+CELERYD_MAX_TASKS_PER_CHILD = conf.CELERYD_MAX_TASKS_PER_CHILD
+STORE_ERRORS_EVEN_IF_IGNORED = conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
+CELERY_SEND_TASK_ERROR_EMAILS = conf.CELERY_SEND_TASK_ERROR_EMAILS
+CELERY_TASK_ERROR_WHITELIST = conf.CELERY_TASK_ERROR_WHITELIST
+CELERYD_LOG_FORMAT = conf.CELERYD_LOG_FORMAT
+CELERYD_TASK_LOG_FORMAT = conf.CELERYD_TASK_LOG_FORMAT
+CELERYD_LOG_FILE = conf.CELERYD_LOG_FILE
+CELERYD_LOG_COLOR = conf.CELERYD_LOG_COLOR
+CELERYD_LOG_LEVEL = conf.CELERYD_LOG_LEVEL
+CELERYD_STATE_DB = conf.CELERYD_STATE_DB
+CELERYD_CONCURRENCY = conf.CELERYD_CONCURRENCY
+CELERYD_PREFETCH_MULTIPLIER = conf.CELERYD_PREFETCH_MULTIPLIER
+CELERYD_POOL_PUTLOCKS = conf.CELERYD_POOL_PUTLOCKS
+
+CELERYD_POOL = conf.CELERYD_POOL
+CELERYD_LISTENER = conf.CELERYD_LISTENER
+CELERYD_MEDIATOR = conf.CELERYD_MEDIATOR
+CELERYD_ETA_SCHEDULER = conf.CELERYD_ETA_SCHEDULER
+CELERYD_ETA_SCHEDULER_PRECISION = conf.CELERYD_ETA_SCHEDULER_PRECISION
+
+# :--- Email settings                               <-   --   --- - ----- -- #
+ADMINS = conf.ADMINS
+SERVER_EMAIL = conf.SERVER_EMAIL
+EMAIL_HOST = conf.EMAIL_HOST
+EMAIL_HOST_USER = conf.EMAIL_HOST_USER
+EMAIL_HOST_PASSWORD = conf.EMAIL_HOST_PASSWORD
+EMAIL_PORT = conf.EMAIL_PORT
+
+# :--- Broker connections                           <-   --   --- - ----- -- #
+BROKER_HOST = conf.BROKER_HOST
+BROKER_PORT = conf.BROKER_PORT
+BROKER_USER = conf.BROKER_USER
+BROKER_PASSWORD = conf.BROKER_PASSWORD
+BROKER_VHOST = conf.BROKER_VHOST
+BROKER_USE_SSL = conf.BROKER_USE_SSL
+BROKER_INSIST = conf.BROKER_INSIST
+BROKER_CONNECTION_TIMEOUT = conf.BROKER_CONNECTION_TIMEOUT
+BROKER_CONNECTION_RETRY = conf.BROKER_CONNECTION_RETRY
+BROKER_CONNECTION_MAX_RETRIES = conf.BROKER_CONNECTION_MAX_RETRIES
+BROKER_BACKEND = conf.BROKER_BACKEND
+
+# <--- Message routing                             <-   --   --- - ----- -- #
+DEFAULT_QUEUE = conf.CELERY_DEFAULT_QUEUE
+DEFAULT_ROUTING_KEY = conf.CELERY_DEFAULT_ROUTING_KEY
+DEFAULT_EXCHANGE = conf.CELERY_DEFAULT_EXCHANGE
+DEFAULT_EXCHANGE_TYPE = conf.CELERY_DEFAULT_EXCHANGE_TYPE
+DEFAULT_DELIVERY_MODE = conf.CELERY_DEFAULT_DELIVERY_MODE
+QUEUES = conf.CELERY_QUEUES
+CREATE_MISSING_QUEUES = conf.CELERY_CREATE_MISSING_QUEUES
+ROUTES = conf.CELERY_ROUTES
+# :--- Broadcast queue settings                     <-   --   --- - ----- -- #
+
+BROADCAST_QUEUE = conf.CELERY_BROADCAST_QUEUE
+BROADCAST_EXCHANGE = conf.CELERY_BROADCAST_EXCHANGE
+BROADCAST_EXCHANGE_TYPE = conf.CELERY_BROADCAST_EXCHANGE_TYPE
 
 # :--- Event queue settings                         <-   --   --- - ----- -- #
 
-    m.EVENT_QUEUE = _get("CELERY_EVENT_QUEUE")
-    m.EVENT_EXCHANGE = _get("CELERY_EVENT_EXCHANGE")
-    m.EVENT_EXCHANGE_TYPE = _get("CELERY_EVENT_EXCHANGE_TYPE")
-    m.EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
-    m.EVENT_SERIALIZER = _get("CELERY_EVENT_SERIALIZER")
+EVENT_QUEUE = conf.CELERY_EVENT_QUEUE
+EVENT_EXCHANGE = conf.CELERY_EVENT_EXCHANGE
+EVENT_EXCHANGE_TYPE = conf.CELERY_EVENT_EXCHANGE_TYPE
+EVENT_ROUTING_KEY = conf.CELERY_EVENT_ROUTING_KEY
+EVENT_SERIALIZER = conf.CELERY_EVENT_SERIALIZER
 
 # :--- AMQP Backend settings                        <-   --   --- - ----- -- #
 
-    m.RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")
-    m.RESULT_EXCHANGE_TYPE = _get("CELERY_RESULT_EXCHANGE_TYPE")
-    m.RESULT_SERIALIZER = _get("CELERY_RESULT_SERIALIZER")
-    m.RESULT_PERSISTENT = _get("CELERY_RESULT_PERSISTENT")
+RESULT_EXCHANGE = conf.CELERY_RESULT_EXCHANGE
+RESULT_EXCHANGE_TYPE = conf.CELERY_RESULT_EXCHANGE_TYPE
+RESULT_SERIALIZER = conf.CELERY_RESULT_SERIALIZER
+RESULT_PERSISTENT = conf.CELERY_RESULT_PERSISTENT
 
 # :--- Celery Beat                                  <-   --   --- - ----- -- #
-    m.CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
-    m.CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
-    m.CELERYBEAT_SCHEDULE = _get("CELERYBEAT_SCHEDULE")
-    m.CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
-    m.CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
+CELERYBEAT_LOG_LEVEL = conf.CELERYBEAT_LOG_LEVEL
+CELERYBEAT_LOG_FILE = conf.CELERYBEAT_LOG_FILE
+CELERYBEAT_SCHEDULE = conf.CELERYBEAT_SCHEDULE
+CELERYBEAT_SCHEDULE_FILENAME = conf.CELERYBEAT_SCHEDULE_FILENAME
+CELERYBEAT_MAX_LOOP_INTERVAL = conf.CELERYBEAT_MAX_LOOP_INTERVAL
 
 # :--- Celery Monitor                               <-   --   --- - ----- -- #
-    m.CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
-    m.CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
-
-prepare(sys.modules[__name__])
-
-def _init_queues(queues, default_exchange=None, default_exchange_type=None):
-    """Convert configuration mapping to a table of queues digestible
-    by a :class:`carrot.messaging.ConsumerSet`."""
-
-    def _defaults(opts):
-        opts.setdefault("exchange", DEFAULT_EXCHANGE),
-        opts.setdefault("exchange_type", DEFAULT_EXCHANGE_TYPE)
-        opts.setdefault("binding_key", DEFAULT_EXCHANGE)
-        opts.setdefault("routing_key", opts.get("binding_key"))
-        return opts
-
-    return dict((queue, _defaults(opts)) for queue, opts in queues.items())
-
-
-def get_queues(): # TODO deprecate
-    return _init_queues(QUEUES, DEFAULT_EXCHANGE, DEFAULT_EXCHANGE_TYPE)
-
-
-def prepare_queues(queues, defaults):
-    return _init_queues(queues, defaults.DEFAULT_EXCHANGE,
+CELERYMON_LOG_LEVEL = conf.CELERYMON_LOG_LEVEL
+CELERYMON_LOG_FILE = conf.CELERYMON_LOG_FILE

+ 1 - 2
celery/db/session.py

@@ -2,7 +2,6 @@ from sqlalchemy import create_engine
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.ext.declarative import declarative_base
 
-from celery import conf
 from celery.utils.compat import defaultdict
 
 ResultModelBase = declarative_base()
@@ -28,7 +27,7 @@ def setup_results(engine):
         _SETUP["results"] = True
 
 
-def ResultSession(dburi=conf.RESULT_DBURI, **kwargs):
+def ResultSession(dburi, **kwargs):
     engine, session = create_session(dburi, **kwargs)
     setup_results(engine)
     return session()

+ 134 - 0
celery/defaults/__init__.py

@@ -0,0 +1,134 @@
+import os
+import sys
+
+from datetime import timedelta
+
+from celery import routes
+from celery.datastructures import AttributeDict
+from celery.defaults.conf import DEFAULTS
+
+def isatty(fh):
+    # Fixes bug with mod_wsgi:
+    #   mod_wsgi.Log object has no attribute isatty.
+    return getattr(fh, "isatty", None) and fh.isatty()
+
+
+class DefaultApp(object):
+    _backend = None
+    _conf = None
+    _loader = None
+
+    def __init__(self, loader=None, backend_cls=None):
+        self.loader_cls = loader or os.environ.get("CELERY_LOADER", "default")
+        self.backend_cls = backend_cls
+
+    def get_queues(self):
+        c = self.conf
+        queues = c.CELERY_QUEUES
+
+        def _defaults(opts):
+            opts.setdefault("exchange", c.CELERY_DEFAULT_EXCHANGE),
+            opts.setdefault("exchange_type", c.CELERY_DEFAULT_EXCHANGE_TYPE)
+            opts.setdefault("binding_key", c.CELERY_DEFAULT_EXCHANGE)
+            opts.setdefault("routing_key", opts.get("binding_key"))
+            return opts
+
+        return dict((queue, _defaults(opts))
+                    for queue, opts in queues.items())
+
+    def get_default_queue(self):
+        q = self.conf.CELERY_DEFAULT_QUEUE
+        return q, self.get_queues()[q]
+
+    def broker_connection(self, **kwargs):
+        from celery.messaging import establish_connection
+        return establish_connection(app=self, **kwargs)
+
+    def pre_config_merge(self, c):
+        if not c.get("CELERY_RESULT_BACKEND"):
+            c["CELERY_RESULT_BACKEND"] = c.get("CELERY_BACKEND")
+        if not c.get("BROKER_BACKEND"):
+            c["BROKER_BACKEND"] = c.get("BROKER_TRANSPORT")  or \
+                                    c.get("CARROT_BACKEND")
+        c.setdefault("CELERY_SEND_TASK_ERROR_EMAILS",
+                     c.get("SEND_CELERY_TASK_ERROR_EMAILS"))
+        return c
+
+    def get_consumer_set(self, connection, queues=None, **options):
+        from celery.messaging import ConsumerSet, Consumer
+
+        queues = queues or self.get_queues()
+
+        cset = ConsumerSet(connection)
+        for queue_name, queue_options in queues.items():
+            queue_options = dict(queue_options)
+            queue_options["routing_key"] = queue_options.pop("binding_key",
+                                                             None)
+            consumer = Consumer(connection, queue=queue_name,
+                                backend=cset.backend, **queue_options)
+            cset.consumers.append(consumer)
+        return cset
+
+    def post_config_merge(self, c):
+        if not c.get("CELERY_QUEUES"):
+            c["CELERY_QUEUES"] = {c.CELERY_DEFAULT_QUEUE: {
+                "exchange": c.CELERY_DEFAULT_EXCHANGE,
+                "exchange_type": c.CELERY_DEFAULT_EXCHANGE_TYPE,
+                "binding_key": c.CELERY_DEFAULT_ROUTING_KEY}}
+        c["CELERY_ROUTES"] = routes.prepare(c.get("CELERY_ROUTES") or {})
+        if c.get("CELERYD_LOG_COLOR") is None:
+            c["CELERYD_LOG_COLOR"] = not c.CELERYD_LOG_FILE and \
+                                        isatty(sys.stderr)
+        if isinstance(c.CELERY_TASK_RESULT_EXPIRES, int):
+            c["CELERY_TASK_RESULT_EXPIRES"] = timedelta(
+                    seconds=c.CELERY_TASK_RESULT_EXPIRES)
+        return c
+
+    def mail_admins(self, subject, message, fail_silently=False):
+        """Send an e-mail to the admins in conf.ADMINS."""
+        from celery.utils import mail
+
+        if not self.conf.ADMINS:
+            return
+
+        to = [admin_email for _, admin_email in self.conf.ADMINS]
+        message = mail.Message(sender=self.conf.SERVER_EMAIL,
+                               to=to, subject=subject, body=message)
+
+        mailer = mail.Mailer(self.conf.EMAIL_HOST,
+                             self.conf.EMAIL_PORT,
+                             self.conf.EMAIL_HOST_USER,
+                             self.conf.EMAIL_HOST_PASSWORD)
+        mailer.send(message, fail_silently=fail_silently)
+
+    @property
+    def backend(self):
+        if self._backend is None:
+            from celery.backends import get_backend_cls
+            backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
+            backend_cls = get_backend_cls(backend_cls, loader=self.loader)
+            self._backend = backend_cls(app=self)
+        return self._backend
+
+    @property
+    def loader(self):
+        if self._loader is None:
+            from celery.loaders import get_loader_cls
+            self._loader = get_loader_cls(self.loader_cls)(app=self)
+        return self._loader
+
+    @property
+    def conf(self):
+        if self._conf is None:
+            config = self.pre_config_merge(self.loader.conf)
+            self._conf = self.post_config_merge(
+                            AttributeDict(DEFAULTS, **config))
+        return self._conf
+
+default_app = DefaultApp()
+
+
+def app_or_default(app=None):
+    if app is None:
+        return default_app
+    return app

+ 93 - 0
celery/defaults/conf.py

@@ -0,0 +1,93 @@
+from datetime import timedelta
+
+DEFAULT_PROCESS_LOG_FMT = """
+    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
+""".strip()
+DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
+DEFAULT_TASK_LOG_FMT = " ".join("""
+    [%(asctime)s: %(levelname)s/%(processName)s]
+    [%(task_name)s(%(task_id)s)] %(message)s
+""".strip().split())
+
+
+DEFAULTS = {
+    "BROKER_CONNECTION_TIMEOUT": 4,
+    "BROKER_CONNECTION_RETRY": True,
+    "BROKER_CONNECTION_MAX_RETRIES": 100,
+    "BROKER_HOST": "localhost",
+    "BROKER_PORT": None,
+    "BROKER_USER": "guest",
+    "BROKER_PASSWORD": "guest",
+    "BROKER_INSIST": False,
+    "BROKER_USE_SSL": False,
+    "BROKER_VHOST": "/",
+    "CELERY_RESULT_BACKEND": "database",
+    "CELERY_ALWAYS_EAGER": False,
+    "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
+    "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
+    "CELERY_SEND_EVENTS": False,
+    "CELERY_IGNORE_RESULT": False,
+    "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
+    "CELERY_TASK_SERIALIZER": "pickle",
+    "CELERY_DEFAULT_RATE_LIMIT": None,
+    "CELERY_DISABLE_RATE_LIMITS": False,
+    "CELERYD_TASK_TIME_LIMIT": None,
+    "CELERYD_TASK_SOFT_TIME_LIMIT": None,
+    "CELERYD_MAX_TASKS_PER_CHILD": None,
+    "CELERY_ROUTES": None,
+    "CELERY_CREATE_MISSING_QUEUES": True,
+    "CELERY_DEFAULT_ROUTING_KEY": "celery",
+    "CELERY_DEFAULT_QUEUE": "celery",
+    "CELERY_DEFAULT_EXCHANGE": "celery",
+    "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
+    "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
+    "CELERY_ACKS_LATE": False,
+    "CELERY_CACHE_BACKEND": None,
+    "CELERY_CACHE_BACKEND_OPTIONS": {},
+    "CELERYD_POOL_PUTLOCKS": True,
+    "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
+    "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
+    "CELERYD_ETA_SCHEDULER": "celery.utils.timer2.Timer",
+    "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
+    "CELERYD_CONCURRENCY": 0, # defaults to cpu count
+    "CELERYD_PREFETCH_MULTIPLIER": 4,
+    "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
+    "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
+    "CELERYD_LOG_COLOR": None,
+    "CELERYD_LOG_LEVEL": "WARN",
+    "CELERYD_LOG_FILE": None, # stderr
+    "CELERYBEAT_SCHEDULE": {},
+    "CELERYD_STATE_DB": None,
+    "CELERYD_ETA_SCHEDULER_PRECISION": 1,
+    "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
+    "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
+    "CELERYBEAT_LOG_LEVEL": "INFO",
+    "CELERYBEAT_LOG_FILE": None, # stderr
+    "CELERYMON_LOG_LEVEL": "INFO",
+    "CELERYMON_LOG_FILE": None, # stderr
+    "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
+    "CELERY_BROADCAST_QUEUE": "celeryctl",
+    "CELERY_BROADCAST_EXCHANGE": "celeryctl",
+    "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
+    "CELERY_EVENT_QUEUE": "celeryevent",
+    "CELERY_EVENT_EXCHANGE": "celeryevent",
+    "CELERY_EVENT_EXCHANGE_TYPE": "direct",
+    "CELERY_EVENT_ROUTING_KEY": "celeryevent",
+    "CELERY_EVENT_SERIALIZER": "json",
+    "CELERY_RESULT_DBURI": None,
+    "CELERY_RESULT_ENGINE_OPTIONS": None,
+    "CELERY_RESULT_EXCHANGE": "celeryresults",
+    "CELERY_RESULT_EXCHANGE_TYPE": "direct",
+    "CELERY_RESULT_SERIALIZER": "pickle",
+    "CELERY_RESULT_PERSISTENT": False,
+    "CELERY_MAX_CACHED_RESULTS": 5000,
+    "CELERY_TRACK_STARTED": False,
+
+    # Default e-mail settings.
+    "SERVER_EMAIL": "celery@localhost",
+    "EMAIL_HOST": "localhost",
+    "EMAIL_PORT": 25,
+    "EMAIL_HOST_USER": None,
+    "EMAIL_HOST_PASSWORD": None,
+    "ADMINS": (),
+}

+ 20 - 5
celery/events/__init__.py

@@ -5,7 +5,9 @@ import threading
 from collections import deque
 from itertools import count
 
-from celery.messaging import EventPublisher, EventConsumer
+from carrot.messaging import Publisher, Consumer
+
+from celery.defaults import app_or_default
 
 
 def create_event(type, fields):
@@ -38,7 +40,8 @@ class EventDispatcher(object):
 
     """
 
-    def __init__(self, connection, hostname=None, enabled=True):
+    def __init__(self, connection, hostname=None, enabled=True, app=None):
+        self.app = app_or_default(app)
         self.connection = connection
         self.hostname = hostname or socket.gethostname()
         self.enabled = enabled
@@ -50,8 +53,13 @@ class EventDispatcher(object):
             self.enable()
 
     def enable(self):
+        conf = self.app.conf
         self.enabled = True
-        self.publisher = EventPublisher(self.connection)
+        self.publisher = Publisher(self.connection,
+                                exchange=conf.CELERY_EVENT_EXCHANGE,
+                                exchange_type=conf.CELERY_EVENT_EXCHANGE_TYPE,
+                                routing_key=conf.CELERY_EVENT_ROUTING_KEY,
+                                serializer=conf.CELERY_EVENT_SERIALIZER)
 
     def disable(self):
         self.enabled = False
@@ -103,7 +111,8 @@ class EventReceiver(object):
     """
     handlers = {}
 
-    def __init__(self, connection, handlers=None):
+    def __init__(self, connection, handlers=None, app=None):
+        self.app = app_or_default(app)
         self.connection = connection
         if handlers is not None:
             self.handlers = handlers
@@ -115,7 +124,13 @@ class EventReceiver(object):
         handler and handler(event)
 
     def consumer(self):
-        consumer = EventConsumer(self.connection)
+        conf = self.app.conf
+        consumer = Consumer(self.connection,
+                            queue=conf.CELERY_EVENT_QUEUE,
+                            exchange=conf.CELERY_EVENT_EXCHANGE,
+                            exchange_type=conf.CELERY_EVENT_EXCHANGE_TYPE,
+                            routing_key=conf.CELERY_EVENT_ROUTING_KEY,
+                            no_ack=True)
         consumer.register_callback(self._receive)
         return consumer
 

+ 2 - 3
celery/events/snapshot.py

@@ -1,12 +1,11 @@
 from celery.utils import timer2
 
-from celery import conf
 from celery import log
 from celery.datastructures import TokenBucket
 from celery.events import EventReceiver
 from celery.events.state import State
 from celery.messaging import establish_connection
-from celery.utils import instantiate
+from celery.utils import instantiate, LOG_LEVELS
 from celery.utils.dispatch import Signal
 from celery.utils.timeutils import rate
 
@@ -74,7 +73,7 @@ class Polaroid(object):
 def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
         logfile=None):
     if not isinstance(loglevel, int):
-        loglevel = conf.LOG_LEVELS[loglevel.upper()]
+        loglevel = LOG_LEVELS[loglevel.upper()]
     logger = log.setup_logger(loglevel=loglevel,
                               logfile=logfile,
                               name="celery.evcam")

+ 11 - 9
celery/execute/__init__.py

@@ -1,5 +1,5 @@
-from celery import conf
 from celery.datastructures import ExceptionInfo
+from celery.defaults import app_or_default
 from celery.execute.trace import TaskTrace
 from celery.messaging import with_connection
 from celery.messaging import TaskPublisher
@@ -17,7 +17,7 @@ extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
 @with_connection
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
-        router=None, expires=None, queues=None, **options):
+        router=None, expires=None, queues=None, app=None, **options):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The :class:`~celery.task.base.Task` to run.
@@ -84,11 +84,12 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     replaced by a local :func:`apply` call instead.
 
     """
-    queues = conf.prepare_queues(queues or conf.QUEUES, conf)
-    router = router or Router(conf.ROUTES, queues,
-                              conf.CREATE_MISSING_QUEUES)
+    app = app_or_default(app)
+    queues = queues or app.get_queues()
+    router = router or Router(app.conf.CELERY_ROUTES, queues,
+                              app.conf.CELERY_CREATE_MISSING_QUEUES)
 
-    if conf.ALWAYS_EAGER:
+    if app.conf.CELERY_ALWAYS_EAGER:
         return apply(task, args, kwargs, task_id=task_id)
 
     task = tasks[task.name] # get instance from registry
@@ -103,7 +104,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     try:
         task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
                                      countdown=countdown, eta=eta,
-                                     expires=expires, **options)
+                                     expires=expires, app=app, **options)
     finally:
         publisher or publish.close()
 
@@ -163,7 +164,7 @@ def delay_task(task_name, *args, **kwargs):
     return apply_async(tasks[task_name], args, kwargs)
 
 
-def apply(task, args, kwargs, **options):
+def apply(task, args, kwargs, app=None, **options):
     """Apply the task locally.
 
     :keyword throw: Re-raise task exceptions. Defaults to
@@ -173,11 +174,12 @@ def apply(task, args, kwargs, **options):
     :class:`celery.result.EagerResult` instance.
 
     """
+    app = app_or_default(app)
     args = args or []
     kwargs = kwargs or {}
     task_id = options.get("task_id") or gen_unique_id()
     retries = options.get("retries", 0)
-    throw = options.pop("throw", conf.EAGER_PROPAGATES_EXCEPTIONS)
+    throw = options.pop("throw", app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS)
 
     task = tasks[task.name] # Make sure we get the instance, not class.
 

+ 4 - 0
celery/loaders/base.py

@@ -24,6 +24,10 @@ class BaseLoader(object):
     override_backends = {}
     configured = False
 
+    def __init__(self, app=None, **kwargs):
+        from celery.defaults import app_or_default
+        self.app = app_or_default(app)
+
     def on_task_init(self, task_id, task):
         """This method is called before a task is executed."""
         pass

+ 28 - 10
celery/log.py

@@ -9,8 +9,8 @@ import traceback
 from multiprocessing import current_process
 from multiprocessing import util as mputil
 
-from celery import conf
 from celery import signals
+from celery.defaults import app_or_default
 from celery.utils import noop
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger
@@ -54,9 +54,16 @@ def get_task_logger(loglevel=None, name=None):
     return logger
 
 
-def setup_logging_subsystem(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
-        format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
-        **kwargs):
+def setup_logging_subsystem(loglevel=None, logfile=None,
+        format=None, colorize=None, **kwargs):
+    app = app_or_default(kwargs.get("app"))
+    loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
+    format = format or app.conf.CELERYD_LOG_FORMAT
+    if colorize is None:
+        colorize = app.conf.CELERYD_LOG_COLOR
+
+    print("COLORIZE: %s" % (app.conf.CELERYD_LOG_COLOR, ))
+
     global _setup
     if not _setup:
         try:
@@ -100,15 +107,21 @@ def get_default_logger(loglevel=None, name="celery"):
     return logger
 
 
-def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
-        format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
-        name="celery", root=True, **kwargs):
+def setup_logger(loglevel=None, logfile=None,
+        format=None, colorize=None, name="celery", root=True,
+        app=None, **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
     then ``stderr`` is used.
 
     Returns logger object.
 
     """
+    app = app_or_default(app)
+    loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
+    format = format or app.conf.CELERYD_LOG_FORMAT
+    if colorize is None:
+        colorize = app.conf.CELERYD_LOG_COLOR
+
     if not root:
         return _setup_logger(get_default_logger(loglevel, name),
                              logfile, format, colorize, **kwargs)
@@ -116,15 +129,20 @@ def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
     return get_default_logger(name=name)
 
 
-def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
-        format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
-        task_kwargs=None, **kwargs):
+def setup_task_logger(loglevel=None, logfile=None, format=None, colorize=None,
+        task_kwargs=None, app=None, **kwargs):
     """Setup the task logger. If ``logfile`` is not specified, then
     ``stderr`` is used.
 
     Returns logger object.
 
     """
+    app = app_or_default(app)
+    loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
+    format = format or app.conf.CELERYD_LOG_FORMAT
+    if colorize is None:
+        colorize = app.conf.CELERYD_LOG_COLOR
+
     if task_kwargs is None:
         task_kwargs = {}
     task_kwargs.setdefault("task_id", "-?-")

+ 56 - 54
celery/messaging.py

@@ -12,8 +12,8 @@ from itertools import count
 from carrot.connection import BrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
 
-from celery import conf
 from celery import signals
+from celery.defaults import app_or_default, default_app
 from celery.utils import gen_unique_id, mitemgetter, noop
 from celery.utils.functional import wraps
 
@@ -23,7 +23,6 @@ MSG_OPTIONS = ("mandatory", "priority", "immediate",
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
-default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
 
 _queues_declared = False
 _exchanges_declared = set()
@@ -31,19 +30,26 @@ _exchanges_declared = set()
 
 class TaskPublisher(Publisher):
     """Publish tasks."""
-    exchange = default_queue["exchange"]
-    exchange_type = default_queue["exchange_type"]
-    routing_key = conf.DEFAULT_ROUTING_KEY
-    serializer = conf.TASK_SERIALIZER
     auto_declare = False
 
     def __init__(self, *args, **kwargs):
+        self.app = app = app_or_default(kwargs.get("app"))
+        _, default_queue = app.get_default_queue()
+        kwargs["exchange"] = kwargs.get("exchange") or \
+                                    default_queue["exchange"]
+        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
+                                    default_queue["exchange_type"]
+        kwargs["routing_key"] = kwargs.get("routing_key") or \
+                                    app.conf.CELERY_DEFAULT_ROUTING_KEY
+        kwargs["serializer"] = kwargs.get("serializer") or \
+                                    app.conf.CELERY_TASK_SERIALIZER
         super(TaskPublisher, self).__init__(*args, **kwargs)
 
         # Make sure all queues are declared.
         global _queues_declared
         if not _queues_declared:
-            consumers = get_consumer_set(self.connection)
+            consumers = self.app.get_consumer_set(self.connection,
+                                                  self.app.get_queues())
             consumers.close()
             _queues_declared = True
         self.declare()
@@ -125,27 +131,18 @@ class ConsumerSet(_ConsumerSet):
 
 class TaskConsumer(Consumer):
     """Consume tasks"""
-    queue = conf.DEFAULT_QUEUE
-    exchange = default_queue["exchange"]
-    routing_key = default_queue["binding_key"]
-    exchange_type = default_queue["exchange_type"]
-
-
-class EventPublisher(Publisher):
-    """Publish events"""
-    exchange = conf.EVENT_EXCHANGE
-    exchange_type = conf.EVENT_EXCHANGE_TYPE
-    routing_key = conf.EVENT_ROUTING_KEY
-    serializer = conf.EVENT_SERIALIZER
-
-
-class EventConsumer(Consumer):
-    """Consume events"""
-    queue = conf.EVENT_QUEUE
-    exchange = conf.EVENT_EXCHANGE
-    exchange_type = conf.EVENT_EXCHANGE_TYPE
-    routing_key = conf.EVENT_ROUTING_KEY
-    no_ack = True
+
+    def __init__(self, *args, **kwargs):
+        app = app_or_default(kwargs.get("app"))
+        default_queue_name, default_queue = app.get_default_queue()
+        kwargs["queue"] = kwargs.get("queue") or default_queue_name
+        kwargs["exchange"] = kwargs.get("exchange") or \
+                                default_queue["exchange"]
+        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
+                                default_queue["exchange_type"]
+        kwargs["routing_key"] = kwargs.get("routing_key") or \
+                                    default_queue["binding_key"]
+        super(TaskConsumer, self).__init__(*args, **kwargs)
 
 
 class ControlReplyConsumer(Consumer):
@@ -196,13 +193,20 @@ class BroadcastPublisher(Publisher):
 
     ReplyTo = ControlReplyConsumer
 
-    exchange = conf.BROADCAST_EXCHANGE
-    exchange_type = conf.BROADCAST_EXCHANGE_TYPE
+    def __init__(self, *args, **kwargs):
+        app = self.app = app_or_default(kwargs.get("app"))
+        kwargs["exchange"] = kwargs.get("exchange") or \
+                                app.conf.CELERY_BROADCAST_EXCHANGE
+        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
+                                app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
+        super(BroadcastPublisher, self).__init__(*args, **kwargs)
 
     def send(self, type, arguments, destination=None, reply_ticket=None):
         """Send broadcast command."""
         arguments["command"] = type
         arguments["destination"] = destination
+        reply_to = self.ReplyTo(self.connection, None, app=self.app,
+                                auto_declare=False)
         if reply_ticket:
             arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
                                      "routing_key": reply_ticket}
@@ -211,12 +215,16 @@ class BroadcastPublisher(Publisher):
 
 class BroadcastConsumer(Consumer):
     """Consume broadcast commands"""
-    queue = conf.BROADCAST_QUEUE
-    exchange = conf.BROADCAST_EXCHANGE
-    exchange_type = conf.BROADCAST_EXCHANGE_TYPE
     no_ack = True
 
     def __init__(self, *args, **kwargs):
+        self.app = app = app_or_default(kwargs.get("app"))
+        kwargs["queue"] = kwargs.get("queue") or \
+                            app.conf.CELERY_BROADCAST_QUEUE
+        kwargs["exchange"] = kwargs.get("exchange") or \
+                            app.conf.CELERY_BROADCAST_EXCHANGE
+        kwargs["exchange_type"] = kwargs.get("exchange_type") or \
+                            app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
         self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
         self.queue = "%s_%s" % (self.queue, self.hostname)
         super(BroadcastConsumer, self).__init__(*args, **kwargs)
@@ -244,21 +252,22 @@ class BroadcastConsumer(Consumer):
 
 def establish_connection(hostname=None, userid=None, password=None,
         virtual_host=None, port=None, ssl=None, insist=None,
-        connect_timeout=None, backend_cls=None, defaults=conf):
+        connect_timeout=None, backend_cls=None, app=None):
     """Establish a connection to the message broker."""
+    app = app_or_default(app)
     if insist is None:
-        insist = defaults.BROKER_INSIST
+        insist = app.conf.get("BROKER_INSIST")
     if ssl is None:
-        ssl = defaults.BROKER_USE_SSL
+        ssl = app.conf.get("BROKER_USE_SSL")
     if connect_timeout is None:
-        connect_timeout = defaults.BROKER_CONNECTION_TIMEOUT
-
-    return BrokerConnection(hostname or defaults.BROKER_HOST,
-                            userid or defaults.BROKER_USER,
-                            password or defaults.BROKER_PASSWORD,
-                            virtual_host or defaults.BROKER_VHOST,
-                            port or defaults.BROKER_PORT,
-                            backend_cls=backend_cls or defaults.BROKER_BACKEND,
+        connect_timeout = app.conf.get("BROKER_CONNECTION_TIMEOUT")
+
+    return BrokerConnection(hostname or app.conf.BROKER_HOST,
+                            userid or app.conf.BROKER_USER,
+                            password or app.conf.BROKER_PASSWORD,
+                            virtual_host or app.conf.BROKER_VHOST,
+                            port or app.conf.BROKER_PORT,
+                            backend_cls=backend_cls or app.conf.BROKER_BACKEND,
                             insist=insist, ssl=ssl,
                             connect_timeout=connect_timeout)
 
@@ -271,7 +280,8 @@ def with_connection(fun):
     @wraps(fun)
     def _inner(*args, **kwargs):
         connection = kwargs.get("connection")
-        timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
+        timeout = kwargs.get("connect_timeout",
+                    default_app.conf.get("BROKER_CONNECTION_TIMEOUT"))
         kwargs["connection"] = conn = connection or \
                 establish_connection(connect_timeout=timeout)
         close_connection = not connection and conn.close or noop
@@ -291,12 +301,4 @@ def get_consumer_set(connection, queues=None, **options):
     Defaults to the queues in ``CELERY_QUEUES``.
 
     """
-    queues = conf.prepare_queues(queues, conf)
-    cset = ConsumerSet(connection)
-    for queue_name, queue_options in queues.items():
-        queue_options = dict(queue_options)
-        queue_options["routing_key"] = queue_options.pop("binding_key", None)
-        consumer = Consumer(connection, queue=queue_name,
-                            backend=cset.backend, **queue_options)
-        cset.consumers.append(consumer)
-    return cset
+    return default_app.get_consumer_set(connection, queues, **options)

+ 9 - 4
celery/result.py

@@ -6,7 +6,7 @@ from copy import copy
 from itertools import imap
 
 from celery import states
-from celery.backends import default_backend
+from celery.defaults import default_app
 from celery.datastructures import PositionQueue
 from celery.exceptions import TimeoutError
 from celery.messaging import with_connection
@@ -165,7 +165,8 @@ class AsyncResult(BaseAsyncResult):
     """
 
     def __init__(self, task_id, backend=None):
-        super(AsyncResult, self).__init__(task_id, backend or default_backend)
+        backend = backend or default_app.backend
+        super(AsyncResult, self).__init__(task_id, backend)
 
 
 class TaskSetResult(object):
@@ -324,7 +325,7 @@ class TaskSetResult(object):
                         time.time() >= time_start + timeout:
                     on_timeout()
 
-    def save(self, backend=default_backend):
+    def save(self, backend=None):
         """Save taskset result for later retrieval using :meth:`restore`.
 
         Example:
@@ -333,11 +334,15 @@ class TaskSetResult(object):
             >>> result = TaskSetResult.restore(task_id)
 
         """
+        if backend is None:
+            backend = default_app.backend
         backend.save_taskset(self.taskset_id, self)
 
     @classmethod
-    def restore(self, taskset_id, backend=default_backend):
+    def restore(self, taskset_id, backend=None):
         """Restore previously saved taskset result."""
+        if backend is None:
+            backend = default_app.backend
         return backend.restore_taskset(taskset_id)
 
     @property

+ 419 - 403
celery/task/base.py

@@ -1,8 +1,7 @@
 import sys
 import warnings
 
-from celery import conf
-from celery.backends import default_backend
+from celery.defaults import default_app
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.execute import apply_async, apply
 from celery.log import setup_task_logger
@@ -71,493 +70,510 @@ class TaskType(type):
         return tasks[task_name].__class__
 
 
-class Task(object):
-    """A celery task.
+def create_task_cls(app):
 
-    All subclasses of :class:`Task` must define the :meth:`run` method,
-    which is the actual method the ``celery`` daemon executes.
 
-    The :meth:`run` method can take use of the default keyword arguments,
-    as listed in the :meth:`run` documentation.
+    class Task(object):
+        """A celery task.
 
-    The resulting class is callable, which if called will apply the
-    :meth:`run` method.
+        All subclasses of :class:`Task` must define the :meth:`run` method,
+        which is the actual method the ``celery`` daemon executes.
 
-    .. attribute:: name
+        The :meth:`run` method can take use of the default keyword arguments,
+        as listed in the :meth:`run` documentation.
 
-        Name of the task.
+        The resulting class is callable, which if called will apply the
+        :meth:`run` method.
 
-    .. attribute:: abstract
+        .. attribute:: name
 
-        If ``True`` the task is an abstract base class.
+            Name of the task.
 
-    .. attribute:: type
+        .. attribute:: abstract
 
-        The type of task, currently this can be ``regular``, or ``periodic``,
-        however if you want a periodic task, you should subclass
-        :class:`PeriodicTask` instead.
+            If ``True`` the task is an abstract base class.
 
-    .. attribute:: queue
+        .. attribute:: type
 
-        Select a destination queue for this task. The queue needs to exist
-        in ``CELERY_QUEUES``. The ``routing_key``, ``exchange`` and
-        ``exchange_type`` attributes will be ignored if this is set.
+            The type of task, currently unused.
 
-    .. attribute:: routing_key
+        .. attribute:: queue
 
-        Override the global default ``routing_key`` for this task.
+            Select a destination queue for this task. The queue needs to exist
+            in ``CELERY_QUEUES``. The ``routing_key``, ``exchange`` and
+            ``exchange_type`` attributes will be ignored if this is set.
 
-    .. attribute:: exchange
+        .. attribute:: routing_key
 
-        Override the global default ``exchange`` for this task.
+            Override the global default ``routing_key`` for this task.
 
-    .. attribute:: exchange_type
+        .. attribute:: exchange
 
-        Override the global default exchange type for this task.
+            Override the global default ``exchange`` for this task.
 
-    .. attribute:: delivery_mode
+        .. attribute:: exchange_type
 
-        Override the global default delivery mode for this task.
-        By default this is set to ``2`` (persistent). You can change this
-        to ``1`` to get non-persistent behavior, which means the messages
-        are lost if the broker is restarted.
+            Override the global default exchange type for this task.
 
-    .. attribute:: mandatory
+        .. attribute:: delivery_mode
 
-        Mandatory message routing. An exception will be raised if the task
-        can't be routed to a queue.
+            Override the global default delivery mode for this task.
+            By default this is set to ``2`` (persistent). You can change this
+            to ``1`` to get non-persistent behavior, which means the messages
+            are lost if the broker is restarted.
 
-    .. attribute:: immediate:
+        .. attribute:: mandatory
 
-        Request immediate delivery. An exception will be raised if the task
-        can't be routed to a worker immediately.
+            Mandatory message routing. An exception will be raised if the task
+            can't be routed to a queue.
 
-    .. attribute:: priority:
+        .. attribute:: immediate:
 
-        The message priority. A number from ``0`` to ``9``, where ``0`` is the
-        highest. Note that RabbitMQ doesn't support priorities yet.
+            Request immediate delivery. An exception will be raised if the task
+            can't be routed to a worker immediately.
 
-    .. attribute:: max_retries
+        .. attribute:: priority:
 
-        Maximum number of retries before giving up.
-        If set to ``None``, it will never stop retrying.
+            The message priority. A number from ``0`` to ``9``, where ``0``
+            is the highest. Note that RabbitMQ doesn't support priorities yet.
 
-    .. attribute:: default_retry_delay
+        .. attribute:: max_retries
 
-        Default time in seconds before a retry of the task should be
-        executed. Default is a 3 minute delay.
+            Maximum number of retries before giving up.
+            If set to ``None``, it will never stop retrying.
 
-    .. attribute:: rate_limit
+        .. attribute:: default_retry_delay
 
-        Set the rate limit for this task type, Examples: ``None`` (no rate
-        limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
-        tasks a minute), ``"100/h"`` (hundred tasks an hour)
+            Default time in seconds before a retry of the task should be
+            executed. Default is a 3 minute delay.
 
-    .. attribute:: ignore_result
+        .. attribute:: rate_limit
 
-        Don't store the return value of this task.
+            Set the rate limit for this task type, Examples: ``None`` (no rate
+            limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
+            tasks a minute), ``"100/h"`` (hundred tasks an hour)
 
-    .. attribute:: store_errors_even_if_ignored
+        .. attribute:: ignore_result
 
-        If true, errors will be stored even if the task is configured
-        to ignore results.
+            Don't store the return value of this task.
 
-    .. attribute:: send_error_emails
+        .. attribute:: store_errors_even_if_ignored
 
-        If true, an e-mail will be sent to the admins whenever
-        a task of this type raises an exception.
+            If true, errors will be stored even if the task is configured
+            to ignore results.
 
-    .. attribute:: error_whitelist
+        .. attribute:: send_error_emails
 
-        List of exception types to send error e-mails for.
+            If true, an e-mail will be sent to the admins whenever
+            a task of this type raises an exception.
 
-    .. attribute:: serializer
+        .. attribute:: error_whitelist
 
-        The name of a serializer that has been registered with
-        :mod:`carrot.serialization.registry`. Example: ``"json"``.
+            List of exception types to send error e-mails for.
 
-    .. attribute:: backend
+        .. attribute:: serializer
 
-        The result store backend used for this task.
+            The name of a serializer that has been registered with
+            :mod:`carrot.serialization.registry`. Example: ``"json"``.
 
-    .. attribute:: autoregister
+        .. attribute:: backend
 
-        If ``True`` the task is automatically registered in the task
-        registry, which is the default behaviour.
+            The result store backend used for this task.
 
-    .. attribute:: track_started
+        .. attribute:: autoregister
 
-        If ``True`` the task will report its status as "started"
-        when the task is executed by a worker.
-        The default value is ``False`` as the normal behaviour is to not
-        report that level of granularity. Tasks are either pending, finished,
-        or waiting to be retried. Having a "started" status can be useful for
-        when there are long running tasks and there is a need to report which
-        task is currently running.
+            If ``True`` the task is automatically registered in the task
+            registry, which is the default behaviour.
 
-        The global default can be overridden by the ``CELERY_TRACK_STARTED``
-        setting.
+        .. attribute:: track_started
 
-    .. attribute:: acks_late
+            If ``True`` the task will report its status as "started"
+            when the task is executed by a worker.
+            The default value is ``False`` as the normal behaviour is to not
+            report that level of granularity. Tasks are either pending,
+            finished, or waiting to be retried.
 
-        If set to ``True`` messages for this task will be acknowledged
-        **after** the task has been executed, not *just before*, which is
-        the default behavior.
+            Having a "started" status can be useful for when there are long
+            running tasks and there is a need to report which task is
+            currently running.
 
-        Note that this means the task may be executed twice if the worker
-        crashes in the middle of execution, which may be acceptable for some
-        applications.
+            The global default can be overridden with the
+            ``CELERY_TRACK_STARTED`` setting.
 
-        The global default can be overriden by the ``CELERY_ACKS_LATE``
-        setting.
+        .. attribute:: acks_late
 
-    """
-    __metaclass__ = TaskType
-
-    name = None
-    abstract = True
-    autoregister = True
-    type = "regular"
-
-    queue = None
-    routing_key = None
-    exchange = None
-    exchange_type = conf.DEFAULT_EXCHANGE_TYPE
-    delivery_mode = conf.DEFAULT_DELIVERY_MODE
-    immediate = False
-    mandatory = False
-    priority = None
-
-    ignore_result = conf.IGNORE_RESULT
-    store_errors_even_if_ignored = conf.STORE_ERRORS_EVEN_IF_IGNORED
-    send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
-    error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
-    disable_error_emails = False # FIXME
-    max_retries = 3
-    default_retry_delay = 3 * 60
-    serializer = conf.TASK_SERIALIZER
-    rate_limit = conf.DEFAULT_RATE_LIMIT
-    backend = default_backend
-    track_started = conf.TRACK_STARTED
-    acks_late = conf.ACKS_LATE
-
-    MaxRetriesExceededError = MaxRetriesExceededError
-
-    def __call__(self, *args, **kwargs):
-        return self.run(*args, **kwargs)
-
-    def __reduce__(self):
-        return (_unpickle_task, (self.name, ), None)
-
-    def run(self, *args, **kwargs):
-        """The body of the task executed by the worker.
-
-        The following standard keyword arguments are reserved and is passed
-        by the worker if the function/method supports them:
-
-            * task_id
-            * task_name
-            * task_retries
-            * task_is_eager
-            * logfile
-            * loglevel
-            * delivery_info
-
-        Additional standard keyword arguments may be added in the future.
-        To take these default arguments, the task can either list the ones
-        it wants explicitly or just take an arbitrary list of keyword
-        arguments (\*\*kwargs).
-
-        """
-        raise NotImplementedError("Tasks must define the run method.")
-
-    @classmethod
-    def get_logger(self, loglevel=None, logfile=None, **kwargs):
-        """Get task-aware logger object.
-
-        See :func:`celery.log.setup_task_logger`.
-
-        """
-        return setup_task_logger(loglevel=loglevel, logfile=logfile,
-                                 task_kwargs=kwargs)
-
-    @classmethod
-    def establish_connection(self,
-            connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
-        """Establish a connection to the message broker."""
-        return _establish_connection(connect_timeout=connect_timeout)
-
-    @classmethod
-    def get_publisher(self, connection=None, exchange=None,
-            connect_timeout=conf.BROKER_CONNECTION_TIMEOUT,
-            exchange_type=None):
-        """Get a celery task message publisher.
-
-        :rtype :class:`celery.messaging.TaskPublisher`:
-
-        Please be sure to close the AMQP connection when you're done
-        with this object, i.e.:
-
-            >>> publisher = self.get_publisher()
-            >>> # do something with publisher
-            >>> publisher.connection.close()
-
-        """
-        if exchange is None:
-            exchange = self.exchange
-        if exchange_type is None:
-            exchange_type = self.exchange_type
-        connection = connection or self.establish_connection(connect_timeout)
-        return TaskPublisher(connection=connection,
-                             exchange=exchange,
-                             exchange_type=exchange_type,
-                             routing_key=self.routing_key)
-
-    @classmethod
-    def get_consumer(self, connection=None,
-            connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
-        """Get a celery task message consumer.
-
-        :rtype :class:`celery.messaging.TaskConsumer`:
-
-        Please be sure to close the AMQP connection when you're done
-        with this object. i.e.:
-
-            >>> consumer = self.get_consumer()
-            >>> # do something with consumer
-            >>> consumer.connection.close()
-
-        """
-        connection = connection or self.establish_connection(connect_timeout)
-        return TaskConsumer(connection=connection, exchange=self.exchange,
-                            routing_key=self.routing_key)
+            If set to ``True`` messages for this task will be acknowledged
+            **after** the task has been executed, not *just before*, which is
+            the default behavior.
 
-    @classmethod
-    def delay(self, *args, **kwargs):
-        """Shortcut to :meth:`apply_async`, with star arguments,
-        but doesn't support the extra options.
+            Note that this means the task may be executed twice if the worker
+            crashes in the middle of execution, which may be acceptable for some
+            applications.
 
-        :param \*args: positional arguments passed on to the task.
-        :param \*\*kwargs: keyword arguments passed on to the task.
-
-        :returns :class:`celery.result.AsyncResult`:
-
-        """
-        return self.apply_async(args, kwargs)
-
-    @classmethod
-    def apply_async(self, args=None, kwargs=None, **options):
-        """Delay this task for execution by the ``celery`` daemon(s).
-
-        :param args: positional arguments passed on to the task.
-        :param kwargs: keyword arguments passed on to the task.
-        :keyword \*\*options: Any keyword arguments to pass on to
-            :func:`celery.execute.apply_async`.
-
-        See :func:`celery.execute.apply_async` for more information.
-
-        :returns :class:`celery.result.AsyncResult`:
-
-        """
-        return apply_async(self, args, kwargs, **options)
-
-    @classmethod
-    def retry(self, args=None, kwargs=None, exc=None, throw=True, **options):
-        """Retry the task.
-
-        :param args: Positional arguments to retry with.
-        :param kwargs: Keyword arguments to retry with.
-        :keyword exc: Optional exception to raise instead of
-            :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
-            restart limit has been exceeded.
-        :keyword countdown: Time in seconds to delay the retry for.
-        :keyword eta: Explicit time and date to run the retry at (must be a
-            :class:`datetime.datetime` instance).
-        :keyword \*\*options: Any extra options to pass on to
-            meth:`apply_async`. See :func:`celery.execute.apply_async`.
-        :keyword throw: If this is ``False``, do not raise the
-            :exc:`~celery.exceptions.RetryTaskError` exception,
-            that tells the worker to mark the task as being retried.
-            Note that this means the task will be marked as failed
-            if the task raises an exception, or successful if it
-            returns.
-
-        :raises celery.exceptions.RetryTaskError: To tell the worker that the
-            task has been re-sent for retry. This always happens, unless
-            the ``throw`` keyword argument has been explicitly set
-            to ``False``, and is considered normal operation.
-
-        Example
-
-            >>> class TwitterPostStatusTask(Task):
-            ...
-            ...     def run(self, username, password, message, **kwargs):
-            ...         twitter = Twitter(username, password)
-            ...         try:
-            ...             twitter.post_status(message)
-            ...         except twitter.FailWhale, exc:
-            ...             # Retry in 5 minutes.
-            ...             self.retry([username, password, message], kwargs,
-            ...                        countdown=60 * 5, exc=exc)
+            The global default can be overriden by the ``CELERY_ACKS_LATE``
+            setting.
 
         """
-        if not kwargs:
-            raise TypeError(
-                    "kwargs argument to retries can't be empty. "
-                    "Task must accept **kwargs, see http://bit.ly/cAx3Bg")
-
-        delivery_info = kwargs.pop("delivery_info", {})
-        options.setdefault("exchange", delivery_info.get("exchange"))
-        options.setdefault("routing_key", delivery_info.get("routing_key"))
-
-        options["retries"] = kwargs.pop("task_retries", 0) + 1
-        options["task_id"] = kwargs.pop("task_id", None)
-        options["countdown"] = options.get("countdown",
-                                           self.default_retry_delay)
-        max_exc = exc or self.MaxRetriesExceededError(
-                "Can't retry %s[%s] args:%s kwargs:%s" % (
-                    self.name, options["task_id"], args, kwargs))
-        max_retries = self.max_retries
-        if max_retries is not None and options["retries"] > max_retries:
-            raise max_exc
-
-        # If task was executed eagerly using apply(),
-        # then the retry must also be executed eagerly.
-        if kwargs.get("task_is_eager", False):
-            result = self.apply(args=args, kwargs=kwargs, **options)
-            if isinstance(result, EagerResult):
-                return result.get() # propogates exceptions.
-            return result
-
-        self.apply_async(args=args, kwargs=kwargs, **options)
-
-        if throw:
-            message = "Retry in %d seconds." % options["countdown"]
-            raise RetryTaskError(message, exc)
-
-    @classmethod
-    def apply(self, args=None, kwargs=None, **options):
-        """Execute this task locally, by blocking until the task
-        has finished executing.
-
-        :param args: positional arguments passed on to the task.
-        :param kwargs: keyword arguments passed on to the task.
-        :keyword throw: Re-raise task exceptions. Defaults to
-            the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
-
-        :rtype :class:`celery.result.EagerResult`:
-
-        See :func:`celery.execute.apply`.
+        __metaclass__ = TaskType
+
+        name = None
+        abstract = True
+        autoregister = True
+        type = "regular"
+
+        queue = None
+        routing_key = None
+        exchange = None
+        exchange_type = app.conf.CELERY_DEFAULT_EXCHANGE_TYPE
+        delivery_mode = app.conf.CELERY_DEFAULT_DELIVERY_MODE
+        immediate = False
+        mandatory = False
+        priority = None
+
+        ignore_result = app.conf.CELERY_IGNORE_RESULT
+        store_errors_even_if_ignored = \
+                app.conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
+        send_error_emails = app.conf.CELERY_SEND_TASK_ERROR_EMAILS
+        error_whitelist = app.conf.CELERY_TASK_ERROR_WHITELIST
+        disable_error_emails = False # FIXME
+        max_retries = 3
+        default_retry_delay = 3 * 60
+        serializer = app.conf.CELERY_TASK_SERIALIZER
+        rate_limit = app.conf.CELERY_DEFAULT_RATE_LIMIT
+        backend = app.backend
+        track_started = app.conf.CELERY_TRACK_STARTED
+        acks_late = app.conf.CELERY_ACKS_LATE
+
+        MaxRetriesExceededError = MaxRetriesExceededError
+
+        def __call__(self, *args, **kwargs):
+            return self.run(*args, **kwargs)
+
+        def __reduce__(self):
+            return (_unpickle_task, (self.name, ), None)
+
+        def run(self, *args, **kwargs):
+            """The body of the task executed by the worker.
+
+            The following standard keyword arguments are reserved and is
+            automatically passed by the worker if the function/method
+            supports them:
+
+                * task_id
+                * task_name
+                * task_retries
+                * task_is_eager
+                * logfile
+                * loglevel
+                * delivery_info
+
+            Additional standard keyword arguments may be added in the future.
+            To take these default arguments, the task can either list the ones
+            it wants explicitly or just take an arbitrary list of keyword
+            arguments (\*\*kwargs).
+
+            """
+            raise NotImplementedError("Tasks must define the run method.")
+
+        @classmethod
+        def get_logger(self, loglevel=None, logfile=None, **kwargs):
+            """Get task-aware logger object.
+
+            See :func:`celery.log.setup_task_logger`.
+
+            """
+            return setup_task_logger(loglevel=loglevel, logfile=logfile,
+                                     task_kwargs=kwargs)
+
+        @classmethod
+        def establish_connection(self,
+            connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT):
+            """Establish a connection to the message broker."""
+            return _establish_connection(connect_timeout=connect_timeout)
+
+        @classmethod
+        def get_publisher(self, connection=None, exchange=None,
+                connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT,
+                exchange_type=None):
+            """Get a celery task message publisher.
+
+            :rtype :class:`celery.messaging.TaskPublisher`:
+
+            Please be sure to close the AMQP connection when you're done
+            with this object, i.e.:
+
+                >>> publisher = self.get_publisher()
+                >>> # do something with publisher
+                >>> publisher.connection.close()
+
+            """
+            if exchange is None:
+                exchange = self.exchange
+            if exchange_type is None:
+                exchange_type = self.exchange_type
+            connection = connection or \
+                    self.establish_connection(connect_timeout)
+            return TaskPublisher(connection=connection,
+                                 exchange=exchange,
+                                 exchange_type=exchange_type,
+                                 routing_key=self.routing_key)
+
+        @classmethod
+        def get_consumer(self, connection=None,
+            connect_timeout=app.conf.BROKER_CONNECTION_TIMEOUT):
+            """Get a celery task message consumer.
+
+            :rtype :class:`celery.messaging.TaskConsumer`:
+
+            Please be sure to close the AMQP connection when you're done
+            with this object. i.e.:
+
+                >>> consumer = self.get_consumer()
+                >>> # do something with consumer
+                >>> consumer.connection.close()
+
+            """
+            connection = connection or \
+                         self.establish_connection(connect_timeout)
+            return TaskConsumer(connection=connection,
+                                exchange=self.exchange,
+                                routing_key=self.routing_key)
+
+        @classmethod
+        def delay(self, *args, **kwargs):
+            """Shortcut to :meth:`apply_async`, with star arguments,
+            but doesn't support the extra options.
+
+            :param \*args: positional arguments passed on to the task.
+            :param \*\*kwargs: keyword arguments passed on to the task.
+
+            :returns :class:`celery.result.AsyncResult`:
+
+            """
+            return self.apply_async(args, kwargs)
+
+        @classmethod
+        def apply_async(self, args=None, kwargs=None, **options):
+            """Delay this task for execution by the ``celery`` daemon(s).
+
+            :param args: positional arguments passed on to the task.
+            :param kwargs: keyword arguments passed on to the task.
+            :keyword \*\*options: Any keyword arguments to pass on to
+                :func:`celery.execute.apply_async`.
+
+            See :func:`celery.execute.apply_async` for more information.
+
+            :returns :class:`celery.result.AsyncResult`:
+
+            """
+            return apply_async(self, args, kwargs, **options)
+
+        @classmethod
+        def retry(self, args=None, kwargs=None, exc=None, throw=True,
+                **options):
+            """Retry the task.
+
+            :param args: Positional arguments to retry with.
+            :param kwargs: Keyword arguments to retry with.
+            :keyword exc: Optional exception to raise instead of
+                :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
+                restart limit has been exceeded.
+            :keyword countdown: Time in seconds to delay the retry for.
+            :keyword eta: Explicit time and date to run the retry at
+            (must be a :class:`~datetime.datetime` instance).
+            :keyword \*\*options: Any extra options to pass on to
+                meth:`apply_async`. See :func:`celery.execute.apply_async`.
+            :keyword throw: If this is ``False``, do not raise the
+                :exc:`~celery.exceptions.RetryTaskError` exception,
+                that tells the worker to mark the task as being retried.
+                Note that this means the task will be marked as failed
+                if the task raises an exception, or successful if it
+                returns.
+
+            :raises celery.exceptions.RetryTaskError: To tell the worker that
+                the task has been re-sent for retry. This always happens,
+                unless the ``throw`` keyword argument has been explicitly set
+                to ``False``, and is considered normal operation.
+
+            Example
+
+                >>> class TwitterPostStatusTask(Task):
+                ...
+                ...     def run(self, username, password, message, **kwargs):
+                ...         twitter = Twitter(username, password)
+                ...         try:
+                ...             twitter.post_status(message)
+                ...         except twitter.FailWhale, exc:
+                ...             # Retry in 5 minutes.
+                ...             self.retry([username, password, message],
+                ...                        kwargs,
+                ...                        countdown=60 * 5, exc=exc)
+
+            """
+            if not kwargs:
+                raise TypeError(
+                        "kwargs argument to retries can't be empty. "
+                        "Task must accept **kwargs, see http://bit.ly/cAx3Bg")
+
+            delivery_info = kwargs.pop("delivery_info", {})
+            options.setdefault("exchange", delivery_info.get("exchange"))
+            options.setdefault("routing_key", delivery_info.get("routing_key"))
+
+            options["retries"] = kwargs.pop("task_retries", 0) + 1
+            options["task_id"] = kwargs.pop("task_id", None)
+            options["countdown"] = options.get("countdown",
+                                            self.default_retry_delay)
+            max_exc = exc or self.MaxRetriesExceededError(
+                    "Can't retry %s[%s] args:%s kwargs:%s" % (
+                        self.name, options["task_id"], args, kwargs))
+            max_retries = self.max_retries
+            if max_retries is not None and options["retries"] > max_retries:
+                raise max_exc
+
+            # If task was executed eagerly using apply(),
+            # then the retry must also be executed eagerly.
+            if kwargs.get("task_is_eager", False):
+                result = self.apply(args=args, kwargs=kwargs, **options)
+                if isinstance(result, EagerResult):
+                    return result.get() # propogates exceptions.
+                return result
 
-        """
-        return apply(self, args, kwargs, **options)
+            self.apply_async(args=args, kwargs=kwargs, **options)
+
+            if throw:
+                message = "Retry in %d seconds." % options["countdown"]
+                raise RetryTaskError(message, exc)
 
-    @classmethod
-    def AsyncResult(self, task_id):
-        """Get AsyncResult instance for this kind of task.
+        @classmethod
+        def apply(self, args=None, kwargs=None, **options):
+            """Execute this task locally, by blocking until the task
+            has finished executing.
 
-        :param task_id: Task id to get result for.
+            :param args: positional arguments passed on to the task.
+            :param kwargs: keyword arguments passed on to the task.
+            :keyword throw: Re-raise task exceptions. Defaults to
+                the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
+
+            :rtype :class:`celery.result.EagerResult`:
+
+            See :func:`celery.execute.apply`.
+
+            """
+            return apply(self, args, kwargs, **options)
+
+        @classmethod
+        def AsyncResult(self, task_id):
+            """Get AsyncResult instance for this kind of task.
+
+            :param task_id: Task id to get result for.
+
+            """
+            return BaseAsyncResult(task_id, backend=self.backend)
+
+        def on_retry(self, exc, task_id, args, kwargs, einfo=None):
+            """Retry handler.
+
+            This is run by the worker when the task is to be retried.
+
+            :param exc: The exception sent to :meth:`retry`.
+            :param task_id: Unique id of the retried task.
+            :param args: Original arguments for the retried task.
+            :param kwargs: Original keyword arguments for the retried task.
 
-        """
-        return BaseAsyncResult(task_id, backend=self.backend)
+            :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
+            instance, containing the traceback.
 
-    def on_retry(self, exc, task_id, args, kwargs, einfo=None):
-        """Retry handler.
+            The return value of this handler is ignored.
 
-        This is run by the worker when the task is to be retried.
+            """
+            pass
 
-        :param exc: The exception sent to :meth:`retry`.
-        :param task_id: Unique id of the retried task.
-        :param args: Original arguments for the retried task.
-        :param kwargs: Original keyword arguments for the retried task.
+        def after_return(self, status, retval, task_id, args,
+                kwargs, einfo=None):
+            """Handler called after the task returns.
 
-        :keyword einfo: :class:`~celery.datastructures.ExceptionInfo` instance,
-           containing the traceback.
+            :param status: Current task state.
+            :param retval: Task return value/exception.
+            :param task_id: Unique id of the task.
+            :param args: Original arguments for the task that failed.
+            :param kwargs: Original keyword arguments for the task
+              that failed.
 
-        The return value of this handler is ignored.
+            :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
+            instance, containing the traceback (if any).
 
-        """
-        pass
+            The return value of this handler is ignored.
 
-    def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
-        """Handler called after the task returns.
+            """
+            pass
 
-        :param status: Current task state.
-        :param retval: Task return value/exception.
-        :param task_id: Unique id of the task.
-        :param args: Original arguments for the task that failed.
-        :param kwargs: Original keyword arguments for the task that failed.
+        def on_failure(self, exc, task_id, args, kwargs, einfo=None):
+            """Error handler.
 
-        :keyword einfo: :class:`~celery.datastructures.ExceptionInfo` instance,
-           containing the traceback (if any).
+            This is run by the worker when the task fails.
 
-        The return value of this handler is ignored.
+            :param exc: The exception raised by the task.
+            :param task_id: Unique id of the failed task.
+            :param args: Original arguments for the task that failed.
+            :param kwargs: Original keyword arguments for the task
+              that failed.
 
-        """
-        pass
+            :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
+                instance, containing the traceback.
 
-    def on_failure(self, exc, task_id, args, kwargs, einfo=None):
-        """Error handler.
+            The return value of this handler is ignored.
 
-        This is run by the worker when the task fails.
+            """
+            pass
 
-        :param exc: The exception raised by the task.
-        :param task_id: Unique id of the failed task.
-        :param args: Original arguments for the task that failed.
-        :param kwargs: Original keyword arguments for the task that failed.
+        def on_success(self, retval, task_id, args, kwargs):
+            """Success handler.
 
-        :keyword einfo: :class:`~celery.datastructures.ExceptionInfo` instance,
-           containing the traceback.
+            Run by the worker if the task executes successfully.
 
-        The return value of this handler is ignored.
+            :param retval: The return value of the task.
+            :param task_id: Unique id of the executed task.
+            :param args: Original arguments for the executed task.
+            :param kwargs: Original keyword arguments for the executed task.
 
-        """
-        pass
+            The return value of this handler is ignored.
 
-    def on_success(self, retval, task_id, args, kwargs):
-        """Success handler.
+            """
+            pass
 
-        Run by the worker if the task executes successfully.
+        def execute(self, wrapper, pool, loglevel, logfile):
+            """The method the worker calls to execute the task.
 
-        :param retval: The return value of the task.
-        :param task_id: Unique id of the executed task.
-        :param args: Original arguments for the executed task.
-        :param kwargs: Original keyword arguments for the executed task.
+            :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
+            :param pool: A task pool.
+            :param loglevel: Current loglevel.
+            :param logfile: Name of the currently used logfile.
 
-        The return value of this handler is ignored.
+            """
+            wrapper.execute_using_pool(pool, loglevel, logfile)
 
-        """
-        pass
-
-    def execute(self, wrapper, pool, loglevel, logfile):
-        """The method the worker calls to execute the task.
+        def __repr__(self):
+            """repr(task)"""
+            try:
+                kind = self.__class__.mro()[1].__name__
+            except (AttributeError, IndexError):
+                kind = "%s(Task)" % self.__class__.__name__
+            return "<%s: %s (%s)>" % (kind, self.name, self.type)
 
-        :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
-        :param pool: A task pool.
-        :param loglevel: Current loglevel.
-        :param logfile: Name of the currently used logfile.
-
-        """
-        wrapper.execute_using_pool(pool, loglevel, logfile)
+        @classmethod
+        def subtask(cls, *args, **kwargs):
+            """Returns a :class:`~celery.task.sets.subtask` object for
+            this task that wraps arguments and execution options
+            for a single task invocation."""
+            return subtask(cls, *args, **kwargs)
 
-    def __repr__(self):
-        """repr(task)"""
-        try:
-            kind = self.__class__.mro()[1].__name__
-        except (AttributeError, IndexError):
-            kind = "%s(Task)" % self.__class__.__name__
-        return "<%s: %s (%s)>" % (kind, self.name, self.type)
+        @property
+        def __name__(self):
+            return self.__class__.__name__
 
-    @classmethod
-    def subtask(cls, *args, **kwargs):
-        """Returns a :class:`~celery.task.sets.subtask` object for
-        this task that wraps arguments and execution options
-        for a single task invocation."""
-        return subtask(cls, *args, **kwargs)
+    return Task
 
-    @property
-    def __name__(self):
-        return self.__class__.__name__
+Task = create_task_cls(default_app)
 
 
 class PeriodicTask(Task):
@@ -633,7 +649,7 @@ class PeriodicTask(Task):
 
         # For backward compatibility, add the periodic task to the
         # configuration schedule instead.
-        conf.CELERYBEAT_SCHEDULE[self.name] = {
+        default_app.conf.CELERYBEAT_SCHEDULE[self.name] = {
                 "task": self.name,
                 "schedule": self.run_every,
                 "args": (),

+ 3 - 5
celery/task/control.py

@@ -1,12 +1,10 @@
-from celery import conf
 from celery.utils import gen_unique_id
 from celery.messaging import BroadcastPublisher, ControlReplyConsumer
 from celery.messaging import with_connection, get_consumer_set
 
 
 @with_connection
-def discard_all(connection=None,
-        connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+def discard_all(connection=None, connect_timeout=None):
     """Discard all waiting tasks.
 
     This will ignore all tasks waiting for execution, and they will
@@ -151,8 +149,8 @@ class inspect(object):
 
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,
-        connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,
-        timeout=1, limit=None, callback=None):
+        connect_timeout=None, reply=False, timeout=1, limit=None,
+        callback=None):
     """Broadcast a control command to the celery workers.
 
     :param command: Name of command to send.

+ 3 - 4
celery/task/sets.py

@@ -2,9 +2,9 @@ import warnings
 
 from UserList import UserList
 
-from celery import conf
 from celery import registry
 from celery.datastructures import AttributeDict
+from celery.defaults import app_or_default
 from celery.messaging import with_connection
 from celery.messaging import TaskPublisher
 from celery.result import TaskSetResult
@@ -146,8 +146,7 @@ class TaskSet(UserList):
         self.total = len(self.tasks)
 
     @with_connection
-    def apply_async(self, connection=None,
-            connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
+    def apply_async(self, connection=None, connect_timeout=None, app=None):
         """Run all tasks in the taskset.
 
         Returns a :class:`celery.result.TaskSetResult` instance.
@@ -177,7 +176,7 @@ class TaskSet(UserList):
             [True, True]
 
         """
-        if conf.ALWAYS_EAGER:
+        if app_or_default(app).conf.CELERY_ALWAYS_EAGER:
             return self.apply()
 
         taskset_id = gen_unique_id()

+ 4 - 3
celery/tests/test_backends/test_database.py

@@ -4,9 +4,9 @@ from datetime import datetime
 
 from celery.exceptions import ImproperlyConfigured
 
-from celery import conf
 from celery import states
 from celery.db.models import Task, TaskSet
+from celery.defaults import default_app
 from celery.utils import gen_unique_id
 from celery.backends.database import DatabaseBackend
 
@@ -20,11 +20,12 @@ class SomeClass(object):
 class test_DatabaseBackend(unittest.TestCase):
 
     def test_missing_dburi_raises_ImproperlyConfigured(self):
-        prev, conf.RESULT_DBURI = conf.RESULT_DBURI, None
+        conf = default_app.conf
+        prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
         try:
             self.assertRaises(ImproperlyConfigured, DatabaseBackend)
         finally:
-            conf.RESULT_DBURI = prev
+            conf.CELERY_RESULT_DBURI = prev
 
     def test_missing_task_id_is_PENDING(self):
         tb = DatabaseBackend()

+ 15 - 11
celery/tests/test_bin/test_celeryd.py

@@ -6,9 +6,9 @@ import unittest2 as unittest
 from multiprocessing import get_logger, current_process
 from StringIO import StringIO
 
-from celery import conf
 from celery import platform
 from celery import signals
+from celery.defaults import default_app
 from celery.apps import worker as cd
 from celery.bin.celeryd import WorkerCommand, main as celeryd_main
 from celery.exceptions import ImproperlyConfigured
@@ -54,7 +54,8 @@ class test_Worker(unittest.TestCase):
     @disable_stdouts
     def test_queues_string(self):
         worker = self.Worker(queues="foo,bar,baz")
-        self.assertEqual(worker.queues, ["foo", "bar", "baz"])
+        worker.init_queues()
+        self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
 
     @disable_stdouts
     def test_loglevel_string(self):
@@ -119,24 +120,27 @@ class test_Worker(unittest.TestCase):
 
     @disable_stdouts
     def test_init_queues(self):
-        p, conf.QUEUES = conf.QUEUES, {
+        c = default_app.conf
+        p, c.CELERY_QUEUES = c.CELERY_QUEUES, {
                 "celery": {"exchange": "celery",
                            "binding_key": "celery"},
                 "video": {"exchange": "video",
                            "binding_key": "video"}}
         try:
-            self.Worker(queues=["video"]).init_queues()
-            self.assertIn("video", conf.QUEUES)
-            self.assertNotIn("celery", conf.QUEUES)
+            worker = self.Worker(queues=["video"])
+            worker.init_queues()
+            self.assertIn("video", worker.queues)
+            self.assertNotIn("celery", worker.queues)
 
-            conf.CREATE_MISSING_QUEUES = False
+            c.CELERY_CREATE_MISSING_QUEUES = False
             self.assertRaises(ImproperlyConfigured,
                     self.Worker(queues=["image"]).init_queues)
-            conf.CREATE_MISSING_QUEUES = True
-            self.Worker(queues=["image"]).init_queues()
-            self.assertIn("image", conf.QUEUES)
+            c.CELERY_CREATE_MISSING_QUEUES = True
+            worker = self.Worker(queues=["image"])
+            worker.init_queues()
+            self.assertIn("image", worker.queues)
         finally:
-            conf.QUEUES = p
+            c.CELERY_QUEUES = p
 
     @disable_stdouts
     def test_on_listener_ready(self):

+ 4 - 4
celery/tests/test_result.py

@@ -3,10 +3,10 @@ from __future__ import generators
 import unittest2 as unittest
 
 from celery import states
+from celery.defaults import default_app
 from celery.utils import gen_unique_id
 from celery.utils.compat import all
 from celery.result import AsyncResult, TaskSetResult
-from celery.backends import default_backend
 from celery.exceptions import TimeoutError
 from celery.task.base import Task
 
@@ -20,12 +20,12 @@ def mock_task(name, status, result):
 def save_result(task):
     traceback = "Some traceback"
     if task["status"] == states.SUCCESS:
-        default_backend.mark_as_done(task["id"], task["result"])
+        default_app.backend.mark_as_done(task["id"], task["result"])
     elif task["status"] == states.RETRY:
-        default_backend.mark_as_retry(task["id"], task["result"],
+        default_app.backend.mark_as_retry(task["id"], task["result"],
                 traceback=traceback)
     else:
-        default_backend.mark_as_failure(task["id"], task["result"],
+        default_app.backend.mark_as_failure(task["id"], task["result"],
                 traceback=traceback)
 
 

+ 1 - 1
celery/tests/test_serialization.py

@@ -7,7 +7,7 @@ from celery.tests.utils import execute_context, mask_modules
 class TestAAPickle(unittest.TestCase):
 
     def test_no_cpickle(self):
-        prev = sys.modules.pop("celery.serialization")
+        prev = sys.modules.pop("celery.serialization", None)
         try:
             def with_cPickle_masked(_val):
                 from celery.serialization import pickle

+ 4 - 4
celery/tests/test_task.py

@@ -8,13 +8,13 @@ from pyparsing import ParseException
 from celery import conf
 from celery import task
 from celery import messaging
+from celery.defaults import default_app
 from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import gen_unique_id, parse_iso8601
 from celery.utils.functional import wraps
 from celery.result import EagerResult
 from celery.execute import send_task
-from celery.backends import default_backend
 from celery.decorators import task as task_dec
 from celery.exceptions import RetryTaskError
 
@@ -327,7 +327,7 @@ class TestCeleryTasks(unittest.TestCase):
         self.assertIsNone(consumer.fetch())
 
         self.assertFalse(presult.successful())
-        default_backend.mark_as_done(presult.task_id, result=None)
+        t1.backend.mark_as_done(presult.task_id, result=None)
         self.assertTrue(presult.successful())
 
         publisher = t1.get_publisher()
@@ -398,11 +398,11 @@ class TestTaskApply(unittest.TestCase):
         self.assertRaises(KeyError, RaisingTask.apply, throw=True)
 
     def test_apply_with_CELERY_EAGER_PROPAGATES_EXCEPTIONS(self):
-        conf.EAGER_PROPAGATES_EXCEPTIONS = True
+        default_app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
         try:
             self.assertRaises(KeyError, RaisingTask.apply)
         finally:
-            conf.EAGER_PROPAGATES_EXCEPTIONS = False
+            default_app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
 
     def test_apply(self):
         IncrementCounterTask.count = 0

+ 3 - 2
celery/tests/test_task_sets.py

@@ -3,6 +3,7 @@ import unittest2 as unittest
 import simplejson
 
 from celery import conf
+from celery.defaults import default_app
 from celery.task import Task
 from celery.task.sets import subtask, TaskSet
 
@@ -139,11 +140,11 @@ class test_TaskSet(unittest.TestCase):
 
         ts = MockTaskSet([MockTask.subtask((i, i))
                         for i in (2, 4, 8)])
-        conf.ALWAYS_EAGER = True
+        default_app.conf.CELERY_ALWAYS_EAGER = True
         try:
             ts.apply_async()
         finally:
-            conf.ALWAYS_EAGER = False
+            default_app.conf.CELERY_ALWAYS_EAGER = False
         self.assertEqual(ts.applied, 1)
 
     def test_apply_async(self):

+ 2 - 0
celery/tests/test_worker_control.py

@@ -5,6 +5,7 @@ from celery.utils.timer2 import Timer
 
 from celery import conf
 from celery.decorators import task
+from celery.defaults import default_app
 from celery.registry import tasks
 from celery.task.builtins import PingTask
 from celery.utils import gen_unique_id
@@ -46,6 +47,7 @@ class Listener(object):
                                          args=(2, 2),
                                          kwargs={}))
         self.eta_schedule = Timer()
+        self.app = default_app
         self.event_dispatcher = Dispatcher()
 
 

+ 9 - 9
celery/tests/test_worker_job.py

@@ -9,8 +9,8 @@ from StringIO import StringIO
 from carrot.backends.base import BaseMessage
 
 from celery import states
-from celery.backends import default_backend
 from celery.datastructures import ExceptionInfo
+from celery.defaults import default_app
 from celery.decorators import task as task_dec
 from celery.exceptions import RetryTaskError, NotRegistered
 from celery.log import setup_logger
@@ -127,14 +127,14 @@ class test_TaskRequest(unittest.TestCase):
     def test_send_email(self):
         from celery import conf
         from celery.worker import job
-        old_mail_admins = job.mail_admins
+        old_mail_admins = default_app.mail_admins
         old_enable_mails = mytask.send_error_emails
         mail_sent = [False]
 
         def mock_mail_admins(*args, **kwargs):
             mail_sent[0] = True
 
-        job.mail_admins = mock_mail_admins
+        default_app.mail_admins = mock_mail_admins
         mytask.send_error_emails = True
         try:
             tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
@@ -152,7 +152,7 @@ class test_TaskRequest(unittest.TestCase):
             self.assertFalse(mail_sent[0])
 
         finally:
-            job.mail_admins = old_mail_admins
+            default_app.mail_admins = old_mail_admins
             mytask.send_error_emails = old_enable_mails
 
     def test_already_revoked(self):
@@ -365,7 +365,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertEqual(tw.execute(), 256)
-        meta = default_backend.get_task_meta(tid)
+        meta = mytask.backend.get_task_meta(tid)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
 
@@ -373,7 +373,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(), 256)
-        meta = default_backend.get_task_meta(tid)
+        meta = mytask_no_kwargs.backend.get_task_meta(tid)
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
 
@@ -381,7 +381,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
-        meta = default_backend.get_task_meta(tid)
+        meta = mytask_some_kwargs.backend.get_task_meta(tid)
         self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
@@ -391,7 +391,7 @@ class test_TaskRequest(unittest.TestCase):
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
                         on_ack=on_ack)
         self.assertEqual(tw.execute(), 256)
-        meta = default_backend.get_task_meta(tid)
+        meta = mytask.backend.get_task_meta(tid)
         self.assertTrue(scratch["ACK"])
         self.assertEqual(meta["result"], 256)
         self.assertEqual(meta["status"], states.SUCCESS)
@@ -400,7 +400,7 @@ class test_TaskRequest(unittest.TestCase):
         tid = gen_unique_id()
         tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
         self.assertIsInstance(tw.execute(), ExceptionInfo)
-        meta = default_backend.get_task_meta(tid)
+        meta = mytask_raising.backend.get_task_meta(tid)
         self.assertEqual(meta["status"], states.FAILURE)
         self.assertIsInstance(meta["result"], KeyError)
 

+ 8 - 8
celery/tests/utils.py

@@ -70,26 +70,26 @@ from celery.utils import noop
 @contextmanager
 def eager_tasks():
 
-    from celery import conf
-    prev = conf.ALWAYS_EAGER
-    conf.ALWAYS_EAGER = True
+    from celery.defaults import default_app
+    prev = default_app.conf.CELERY_ALWAYS_EAGER
+    default_app.conf.CELERY_ALWAYS_EAGER = True
 
     yield True
 
-    conf.ALWAYS_EAGER = prev
+    default_app.conf.CELERY_ALWAYS_EAGER = prev
 
 
 def with_eager_tasks(fun):
 
     @wraps(fun)
     def _inner(*args, **kwargs):
-        from celery import conf
-        prev = conf.ALWAYS_EAGER
-        conf.ALWAYS_EAGER = True
+        from celery.defaults import default_app
+        prev = default_app.conf.CELERY_ALWAYS_EAGER
+        default_app.conf.CELERY_ALWAYS_EAGER = True
         try:
             return fun(*args, **kwargs)
         finally:
-            conf.ALWAYS_EAGER = prev
+            default_app.conf.CELERY_ALWAYS_EAGER = prev
 
 
 def with_environ(env_name, env_value):

+ 17 - 30
celery/utils/mail.py

@@ -6,6 +6,8 @@ try:
 except ImportError:
     from email.MIMEText import MIMEText
 
+from celery.defaults import app_or_default
+
 
 class SendmailWarning(UserWarning):
     """Problem happened while sending the e-mail message."""
@@ -24,6 +26,9 @@ class Message(object):
         if not isinstance(self.to, (list, tuple)):
             self.to = [self.to]
 
+    def __repr__(self):
+        return "<E-mail: To:%r Subject:%r>" % (self.to, self.subject)
+
     def __str__(self):
         msg = MIMEText(self.body, "plain", self.charset)
         msg["Subject"] = self.subject
@@ -40,35 +45,17 @@ class Mailer(object):
         self.user = user
         self.password = password
 
-    def send(self, message):
-        client = smtplib.SMTP(self.host, self.port)
-
-        if self.user and self.password:
-            client.login(self.user, self.password)
-
-        client.sendmail(message.sender, message.to, str(message))
-        client.quit()
-
-
-def mail_admins(subject, message, fail_silently=False):
-    """Send a message to the admins in conf.ADMINS."""
-    from celery import conf
-
-    if not conf.ADMINS:
-        return
+    def send(self, message, fail_silently=False):
+        try:
+            client = smtplib.SMTP(self.host, self.port)
 
-    to = [admin_email for _, admin_email in conf.ADMINS]
-    message = Message(sender=conf.SERVER_EMAIL, to=to,
-                      subject=subject, body=message)
+            if self.user and self.password:
+                client.login(self.user, self.password)
 
-    try:
-        mailer = Mailer(conf.EMAIL_HOST, conf.EMAIL_PORT,
-                        conf.EMAIL_HOST_USER,
-                        conf.EMAIL_HOST_PASSWORD)
-        mailer.send(message)
-    except Exception, exc:
-        if not fail_silently:
-            raise
-        warnings.warn(SendmailWarning(
-            "Mail could not be sent: %r %r" % (
-                exc, {"To": to, "Subject": subject})))
+            client.sendmail(message.sender, message.to, str(message))
+            client.quit()
+        except Exception, exc:
+            if not fail_silently:
+                raise
+            warnings.warn(SendmailWarning(
+                "E-mail could not be sent: %r %r" % (exc, message)))

+ 22 - 21
celery/worker/__init__.py

@@ -13,6 +13,7 @@ from celery import log
 from celery import registry
 from celery import platform
 from celery import signals
+from celery.defaults import app_or_default
 from celery.utils import noop, instantiate
 
 from celery.worker import state
@@ -61,47 +62,47 @@ class WorkController(object):
             task_soft_time_limit=None, max_tasks_per_child=None,
             pool_putlocks=None, db=None, prefetch_multiplier=None,
             eta_scheduler_precision=None, queues=None,
-            disable_rate_limits=None, defaults=None):
+            disable_rate_limits=None, app=None):
 
-        if defaults is None:
-            from celery import conf as defaults
-        self.defaults = defaults
+        self.app = app_or_default(app)
+        conf = self.app.conf
+        queues = queues or self.app.get_queues()
 
         # Options
         self.loglevel = loglevel or self.loglevel
-        self.concurrency = concurrency or defaults.CELERYD_CONCURRENCY
-        self.logfile = logfile or defaults.CELERYD_LOG_FILE
+        self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
+        self.logfile = logfile or conf.CELERYD_LOG_FILE
         self.logger = log.get_default_logger()
         if send_events is None:
-            send_events = defaults.SEND_EVENTS
+            send_events = conf.CELERY_SEND_EVENTS
         self.send_events = send_events
-        self.pool_cls = pool_cls or defaults.CELERYD_POOL
-        self.listener_cls = listener_cls or defaults.CELERYD_LISTENER
-        self.mediator_cls = mediator_cls or defaults.CELERYD_MEDIATOR
+        self.pool_cls = pool_cls or conf.CELERYD_POOL
+        self.listener_cls = listener_cls or conf.CELERYD_LISTENER
+        self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
         self.eta_scheduler_cls = eta_scheduler_cls or \
-                                    defaults.CELERYD_ETA_SCHEDULER
+                                    conf.CELERYD_ETA_SCHEDULER
         self.schedule_filename = schedule_filename or \
-                                    defaults.CELERYBEAT_SCHEDULE_FILENAME
+                                    conf.CELERYBEAT_SCHEDULE_FILENAME
         self.hostname = hostname or socket.gethostname()
         self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
         self.task_time_limit = task_time_limit or \
-                                defaults.CELERYD_TASK_TIME_LIMIT
+                                conf.CELERYD_TASK_TIME_LIMIT
         self.task_soft_time_limit = task_soft_time_limit or \
-                                defaults.CELERYD_TASK_SOFT_TIME_LIMIT
+                                conf.CELERYD_TASK_SOFT_TIME_LIMIT
         self.max_tasks_per_child = max_tasks_per_child or \
-                                defaults.CELERYD_MAX_TASKS_PER_CHILD
+                                conf.CELERYD_MAX_TASKS_PER_CHILD
         self.pool_putlocks = pool_putlocks or \
-                                defaults.CELERYD_POOL_PUTLOCKS
+                                conf.CELERYD_POOL_PUTLOCKS
         self.eta_scheduler_precision = eta_scheduler_precision or \
-                                defaults.CELERYD_ETA_SCHEDULER_PRECISION
+                                conf.CELERYD_ETA_SCHEDULER_PRECISION
         self.prefetch_multiplier = prefetch_multiplier or \
-                                defaults.CELERYD_PREFETCH_MULTIPLIER
+                                conf.CELERYD_PREFETCH_MULTIPLIER
         self.timer_debug = log.SilenceRepeated(self.logger.debug,
                                                max_iterations=10)
-        self.db = db or defaults.CELERYD_STATE_DB
+        self.db = db or conf.CELERYD_STATE_DB
         self.disable_rate_limits = disable_rate_limits or \
-                                defaults.DISABLE_RATE_LIMITS
+                                conf.CELERY_DISABLE_RATE_LIMITS
         self.queues = queues
 
         self._finalize = Finalize(self, self.stop, exitpriority=1)
@@ -150,7 +151,7 @@ class WorkController(object):
                                     initial_prefetch_count=prefetch_count,
                                     pool=self.pool,
                                     queues=self.queues,
-                                    defaults=self.defaults)
+                                    app=self.app)
 
         # The order is important here;
         #   the first in the list is the first to start,

+ 2 - 2
celery/worker/control/builtins.py

@@ -1,7 +1,6 @@
 from datetime import datetime
 
 from celery import log
-from celery.backends import default_backend
 from celery.registry import tasks
 from celery.utils import timeutils, LOG_LEVELS
 from celery.worker import state
@@ -26,8 +25,9 @@ def diagnose(panel, timeout=None, **kwargs):
 @Panel.register
 def revoke(panel, task_id, task_name=None, **kwargs):
     """Revoke task by task id."""
+    app = panel.listener.app
     revoked.add(task_id)
-    backend = default_backend
+    backend = app.backend
     if task_name: # Use custom task backend (if any)
         try:
             backend = tasks[task_name].backend

+ 9 - 8
celery/worker/job.py

@@ -9,12 +9,11 @@ from datetime import datetime
 from celery import log
 from celery import platform
 from celery.datastructures import ExceptionInfo
+from celery.defaults import app_or_default, default_app
 from celery.execute.trace import TaskTrace
-from celery.loaders import current_loader
 from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import truncate_text, maybe_iso8601
-from celery.utils.mail import mail_admins
 from celery.worker import state
 
 # pep8.py borks on a inline signature separator and
@@ -74,7 +73,7 @@ class WorkerTaskTrace(TaskTrace):
     """
 
     def __init__(self, *args, **kwargs):
-        self.loader = kwargs.get("loader") or current_loader()
+        self.loader = kwargs.get("loader") or default_app.loader
         self.hostname = kwargs.get("hostname") or socket.gethostname()
         super(WorkerTaskTrace, self).__init__(*args, **kwargs)
 
@@ -213,7 +212,8 @@ class TaskRequest(object):
     def __init__(self, task_name, task_id, args, kwargs,
             on_ack=noop, retries=0, delivery_info=None, hostname=None,
             email_subject=None, email_body=None, logger=None,
-            eventer=None, eta=None, expires=None, **opts):
+            eventer=None, eta=None, expires=None, app=None, **opts):
+        self.app = app_or_default(app)
         self.task_name = task_name
         self.task_id = task_id
         self.retries = retries
@@ -252,7 +252,7 @@ class TaskRequest(object):
 
     @classmethod
     def from_message(cls, message, message_data, logger=None, eventer=None,
-            hostname=None):
+            hostname=None, app=None):
         """Create a :class:`TaskRequest` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
@@ -282,7 +282,7 @@ class TaskRequest(object):
                    retries=retries, on_ack=message.ack,
                    delivery_info=delivery_info, logger=logger,
                    eventer=eventer, hostname=hostname,
-                   eta=eta, expires=expires)
+                   eta=eta, expires=expires, app=app)
 
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
@@ -339,7 +339,8 @@ class TaskRequest(object):
             self.acknowledge()
 
         tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
-                                 **{"hostname": self.hostname})
+                                 **{"hostname": self.hostname,
+                                    "loader": self.app.loader})
         retval = tracer.execute()
         self.acknowledge()
         return retval
@@ -452,7 +453,7 @@ class TaskRequest(object):
                     return
             subject = self.email_subject.strip() % context
             body = self.email_body.strip() % context
-            return mail_admins(subject, body, fail_silently=fail_silently)
+            self.app.mail_admins(subject, body, fail_silently=fail_silently)
 
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (

+ 11 - 11
celery/worker/listener.py

@@ -80,6 +80,7 @@ import warnings
 
 from carrot.connection import AMQPConnectionException
 
+from celery.defaults import app_or_default
 from celery.utils import noop, retry_over_time
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
@@ -199,12 +200,9 @@ class CarrotListener(object):
 
     def __init__(self, ready_queue, eta_schedule, logger,
             init_callback=noop, send_events=False, hostname=None,
-            initial_prefetch_count=2, pool=None, queues=None, defaults=None):
-
-        if defaults is None:
-            from celery import conf as defaults
-        self.defaults = defaults
+            initial_prefetch_count=2, pool=None, queues=None, app=None):
 
+        self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
         self.ready_queue = ready_queue
@@ -220,7 +218,7 @@ class CarrotListener(object):
         self.control_dispatch = ControlDispatch(logger=logger,
                                                 hostname=self.hostname,
                                                 listener=self)
-        self.queues = queues
+        self.queues = queues or self.app.get_queues()
 
     def start(self):
         """Start the consumer.
@@ -292,6 +290,7 @@ class CarrotListener(object):
         if message_data.get("task"):
             try:
                 task = TaskRequest.from_message(message, message_data,
+                                                app=self.app,
                                                 logger=self.logger,
                                                 hostname=self.hostname,
                                                 eventer=self.event_dispatcher)
@@ -381,8 +380,9 @@ class CarrotListener(object):
 
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
-        self.task_consumer = get_consumer_set(connection=self.connection,
-                                              queues=self.queues)
+        self.task_consumer = self.app.get_consumer_set(
+                                        connection=self.connection,
+                                        queues=self.queues)
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,
                        self.initial_prefetch_count, self.logger)
@@ -432,16 +432,16 @@ class CarrotListener(object):
 
         def _establish_connection():
             """Establish a connection to the broker."""
-            conn = establish_connection(defaults=self.defaults)
+            conn = self.app.broker_connection()
             conn.connect() # Connection is established lazily, so connect.
             return conn
 
-        if not self.defaults.BROKER_CONNECTION_RETRY:
+        if not self.app.conf.BROKER_CONNECTION_RETRY:
             return _establish_connection()
 
         conn = retry_over_time(_establish_connection, (socket.error, IOError),
                     errback=_connection_error_handler,
-                    max_retries=self.defaults.BROKER_CONNECTION_MAX_RETRIES)
+                    max_retries=self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
         return conn
 
     def stop(self):