Jelajahi Sumber

Merge branch 'master' into code-reload

Ask Solem 13 tahun lalu
induk
melakukan
517aed7ad1
71 mengubah file dengan 1597 tambahan dan 859 penghapusan
  1. 1 0
      AUTHORS
  2. 17 0
      Changelog
  3. 37 4
      README.rst
  4. 2 0
      celery/actors.py
  5. 9 21
      celery/app/__init__.py
  6. 1 1
      celery/app/amqp.py
  7. 7 7
      celery/app/base.py
  8. 47 16
      celery/app/task/__init__.py
  9. 3 3
      celery/apps/beat.py
  10. 2 2
      celery/apps/worker.py
  11. 8 8
      celery/backends/__init__.py
  12. 0 2
      celery/bin/base.py
  13. 1 1
      celery/bin/celerybeat.py
  14. 1 1
      celery/bin/celeryd.py
  15. 5 5
      celery/concurrency/__init__.py
  16. 3 3
      celery/concurrency/base.py
  17. 2 5
      celery/concurrency/processes/pool.py
  18. 1 1
      celery/concurrency/solo.py
  19. 47 0
      celery/contrib/bundles.py
  20. 6 0
      celery/events/__init__.py
  21. 4 0
      celery/exceptions.py
  22. 190 98
      celery/execute/trace.py
  23. 3 3
      celery/loaders/__init__.py
  24. 14 10
      celery/log.py
  25. 28 13
      celery/platforms.py
  26. 0 7
      celery/registry.py
  27. 12 5
      celery/schedules.py
  28. 2 2
      celery/tests/functional/case.py
  29. 0 1
      celery/tests/test_concurrency/test_concurrency_solo.py
  30. 0 4
      celery/tests/test_task/__init__.py
  31. 20 12
      celery/tests/test_task/test_execute_trace.py
  32. 3 2
      celery/tests/test_utils/__init__.py
  33. 6 6
      celery/tests/test_utils/test_utils_timeutils.py
  34. 4 0
      celery/tests/test_worker/__init__.py
  35. 2 2
      celery/tests/test_worker/test_worker_control.py
  36. 3 0
      celery/tests/test_worker/test_worker_heartbeat.py
  37. 77 58
      celery/tests/test_worker/test_worker_job.py
  38. 23 8
      celery/utils/__init__.py
  39. 142 0
      celery/utils/coroutine.py
  40. 6 0
      celery/utils/encoding.py
  41. 8 9
      celery/utils/timeutils.py
  42. 4 1
      celery/worker/__init__.py
  43. 22 26
      celery/worker/consumer.py
  44. 11 4
      celery/worker/heartbeat.py
  45. 172 244
      celery/worker/job.py
  46. 6 3
      celery/worker/mediator.py
  47. 2 4
      celery/worker/state.py
  48. 19 0
      celery/worker/strategy.py
  49. 1 1
      contrib/generic-init.d/celerybeat
  50. 1 1
      contrib/generic-init.d/celeryd
  51. 1 1
      contrib/generic-init.d/celeryevcam
  52. 1 1
      contrib/release/doc4allmods
  53. 1 1
      docs/configuration.rst
  54. 21 4
      docs/contributing.rst
  55. 57 0
      docs/getting-started/brokers/beanstalk.rst
  56. 55 0
      docs/getting-started/brokers/couchdb.rst
  57. 58 0
      docs/getting-started/brokers/django.rst
  58. 21 0
      docs/getting-started/brokers/index.rst
  59. 56 0
      docs/getting-started/brokers/mongodb.rst
  60. 23 11
      docs/getting-started/brokers/rabbitmq.rst
  61. 52 0
      docs/getting-started/brokers/redis.rst
  62. 74 0
      docs/getting-started/brokers/sqlalchemy.rst
  63. 11 14
      docs/getting-started/first-steps-with-celery.rst
  64. 1 1
      docs/getting-started/index.rst
  65. 36 3
      docs/includes/introduction.txt
  66. 3 128
      docs/tutorials/otherqueues.rst
  67. 12 16
      docs/userguide/tasks.rst
  68. 63 29
      funtests/benchmarks/bench_worker.py
  69. 0 1
      requirements/docs.txt
  70. 2 0
      requirements/pkgutils.txt
  71. 64 45
      setup.py

+ 1 - 0
AUTHORS

@@ -57,6 +57,7 @@ Joshua Ginsberg <jag@flowtheory.net>
 Juan Ignacio Catalano <catalanojuan@gmail.com>
 Juarez Bochi <jbochi@gmail.com>
 Jude Nagurney <jude@pwan.org>
+Julien Poissonnier <julien@caffeine.lu>
 Kevin Tran <hekevintran@gmail.com>
 Kornelijus Survila <kornholijo@gmail.com>
 Leo Dirac <leo@banyanbranch.com>

+ 17 - 0
Changelog

@@ -184,6 +184,23 @@ News
 * Moved some common threading functionality to new module
   :mod:`celery.utils.threads`
 
+.. _version-2.4.5:
+
+2.4.5
+=====
+:release-date: 2011-12-02 05:00 P.M GMT
+:by: Ask Solem
+
+* Periodic task interval schedules were accidentally rounded down,
+  resulting in some periodic tasks being executed early.
+
+* Logging of humanized times in the celerybeat log is now more detailed.
+
+* New :ref:`brokers` section in the Getting Started part of the Documentation
+
+    This replaces the old :ref:`tut-otherqueues` tutorial, and adds
+    documentation for MongoDB, Beanstalk and CouchDB.
+
 .. _version-2.4.4:
 
 2.4.4

+ 37 - 4
README.rst

@@ -4,7 +4,7 @@
 
 .. image:: http://cloud.github.com/downloads/ask/celery/celery_128.png
 
-:Version: 2.5.0a1
+:Version: 2.5.0b1
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/ask/celery/
@@ -111,7 +111,6 @@ Features
     +-----------------+----------------------------------------------------+
     | Fault-tolerant  | Excellent configurable error recovery when using   |
     |                 | `RabbitMQ`, ensures your tasks are never lost.     |
-    |                 | scenarios, and your tasks will never be lost.      |
     +-----------------+----------------------------------------------------+
     | Distributed     | Runs on one or more machines. Supports             |
     |                 | broker `clustering`_ and `HA`_ when used in        |
@@ -215,11 +214,45 @@ or from source.
 
 To install using `pip`,::
 
-    $ pip install Celery
+    $ pip install -U Celery
 
 To install using `easy_install`,::
 
-    $ easy_install Celery
+    $ easy_install -U Celery
+
+Bundles
+-------
+
+Celery also defines a group of bundles that can be used
+to install Celery and the dependencies for a given feature.
+
+The following bundles are available:
+
+:`celery-with-redis`_:
+    for using Redis as a broker.
+
+:`celery-with-mongodb`_:
+    for using MongoDB as a broker.
+
+:`django-celery-with-redis`_:
+    for Django, and using Redis as a broker.
+
+:`django-celery-with-mongodb`_:
+    for Django, and using MongoDB as a broker.
+
+:`bundle-celery`_:
+    convenience bundle installing *Celery* and related packages.
+
+.. _`celery-with-redis`:
+    http://pypi.python.org/pypi/celery-with-redis/
+.. _`celery-with-mongodb`:
+    http://pypi.python.org/pypi/celery-with-mongdb/
+.. _`django-celery-with-redis`:
+    http://pypi.python.org/pypi/django-celery-with-redis/
+.. _`django-celery-with-mongodb`:
+    http://pypi.python.org/pypi/django-celery-with-mongdb/
+.. _`bundle-celery`:
+    http://pypi.python.org/pypi/bundle-celery/
 
 .. _celery-installing-from-source:
 

+ 2 - 0
celery/actors.py

@@ -1,3 +1,5 @@
+from __future__ import absolute_import
+
 from celery.app import app_or_default
 
 import cl

+ 9 - 21
celery/app/__init__.py

@@ -15,9 +15,6 @@ from __future__ import absolute_import
 import os
 import threading
 
-from functools import wraps
-from inspect import getargspec
-
 from .. import registry
 from ..utils import cached_property, instantiate
 
@@ -115,14 +112,14 @@ class App(base.BaseApp):
 
     def Worker(self, **kwargs):
         """Create new :class:`~celery.apps.worker.Worker` instance."""
-        return instantiate("celery.apps.worker.Worker", app=self, **kwargs)
+        return instantiate("celery.apps.worker:Worker", app=self, **kwargs)
 
     def WorkController(self, **kwargs):
-        return instantiate("celery.worker.WorkController", app=self, **kwargs)
+        return instantiate("celery.worker:WorkController", app=self, **kwargs)
 
     def Beat(self, **kwargs):
         """Create new :class:`~celery.apps.beat.Beat` instance."""
-        return instantiate("celery.apps.beat.Beat", app=self, **kwargs)
+        return instantiate("celery.apps.beat:Beat", app=self, **kwargs)
 
     def TaskSet(self, *args, **kwargs):
         """Create new :class:`~celery.task.sets.TaskSet`."""
@@ -170,23 +167,14 @@ class App(base.BaseApp):
         def inner_create_task_cls(**options):
 
             def _create_task_cls(fun):
-                options["app"] = self
-                options.setdefault("accept_magic_kwargs", False)
                 base = options.pop("base", None) or self.Task
 
-                @wraps(fun, assigned=("__module__", "__name__"))
-                def run(self, *args, **kwargs):
-                    return fun(*args, **kwargs)
-
-                # Save the argspec for this task so we can recognize
-                # which default task kwargs we're going to pass to it later.
-                # (this happens in celery.utils.fun_takes_kwargs)
-                run.argspec = getargspec(fun)
-
-                cls_dict = dict(options, run=run,
-                                __module__=fun.__module__,
-                                __doc__=fun.__doc__)
-                T = type(fun.__name__, (base, ), cls_dict)()
+                T = type(fun.__name__, (base, ), dict({
+                        "app": self,
+                        "accept_magic_kwargs": False,
+                        "run": staticmethod(fun),
+                        "__doc__": fun.__doc__,
+                        "__module__": fun.__module__}, **options))()
                 return registry.tasks[T.name]             # global instance.
 
             return _create_task_cls

+ 1 - 1
celery/app/amqp.py

@@ -145,7 +145,7 @@ class Queues(dict):
 
 
 class TaskPublisher(messaging.Publisher):
-    auto_declare = True
+    auto_declare = False
     retry = False
     retry_policy = None
 

+ 7 - 7
celery/app/base.py

@@ -72,12 +72,12 @@ class BaseApp(object):
     IS_OSX = platforms.IS_OSX
     IS_WINDOWS = platforms.IS_WINDOWS
 
-    amqp_cls = "celery.app.amqp.AMQP"
+    amqp_cls = "celery.app.amqp:AMQP"
     backend_cls = None
-    events_cls = "celery.events.Events"
-    loader_cls = "celery.loaders.app.AppLoader"
-    log_cls = "celery.log.Logging"
-    control_cls = "celery.task.control.Control"
+    events_cls = "celery.events:Events"
+    loader_cls = "celery.loaders.app:AppLoader"
+    log_cls = "celery.log:Logging"
+    control_cls = "celery.task.control:Control"
 
     _pool = None
 
@@ -274,9 +274,9 @@ class BaseApp(object):
                                        use_tls=self.conf.EMAIL_USE_TLS)
 
     def select_queues(self, queues=None):
-        if queues is not None:
+        if queues:
             return self.amqp.queues.select_subset(queues,
-                  self.conf.CELERY_CREATE_MISSING_QUEUES)
+                                    self.conf.CELERY_CREATE_MISSING_QUEUES)
 
     def either(self, default_key, *values):
         """Fallback to the value of a configuration key if none of the

+ 47 - 16
celery/app/task/__init__.py

@@ -15,12 +15,13 @@ from __future__ import absolute_import
 import sys
 import threading
 
+from ... import states
 from ...datastructures import ExceptionInfo
 from ...exceptions import MaxRetriesExceededError, RetryTaskError
-from ...execute.trace import TaskTrace
+from ...execute.trace import eager_trace_task
 from ...registry import tasks, _unpickle_task
 from ...result import EagerResult
-from ...utils import fun_takes_kwargs, mattrgetter, uuid
+from ...utils import fun_takes_kwargs, instantiate, mattrgetter, uuid
 from ...utils.mail import ErrorMail
 
 extract_exec_options = mattrgetter("queue", "routing_key",
@@ -57,6 +58,9 @@ class Context(threading.local):
         except AttributeError:
             return default
 
+    def __repr__(self):
+        return "<Context: %r>" % (vars(self, ))
+
 
 class TaskType(type):
     """Meta class for tasks.
@@ -73,11 +77,15 @@ class TaskType(type):
         new = super(TaskType, cls).__new__
         task_module = attrs.get("__module__") or "__main__"
 
-        # Abstract class: abstract attribute should not be inherited.
+        if "__call__" in attrs:
+            # see note about __call__ below.
+            attrs["__defines_call__"] = True
+
+        # - Abstract class: abstract attribute should not be inherited.
         if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
             return new(cls, name, bases, attrs)
 
-        # Automatically generate missing/empty name.
+        # - Automatically generate missing/empty name.
         autoname = False
         if not attrs.get("name"):
             try:
@@ -88,6 +96,23 @@ class TaskType(type):
             attrs["name"] = '.'.join([module_name, name])
             autoname = True
 
+        # - Automatically generate __call__.
+        # If this or none of its bases define __call__, we simply
+        # alias it to the ``run`` method, as
+        # this means we can skip a stacktrace frame :)
+        if not (attrs.get("__call__")
+                or any(getattr(b, "__defines_call__", False) for b in bases)):
+            try:
+                attrs["__call__"] = attrs["run"]
+            except KeyError:
+
+                # the class does not yet define run,
+                # so we can't optimize this case.
+                def __call__(self, *args, **kwargs):
+                    return self.run(*args, **kwargs)
+                attrs["__call__"] = __call__
+
+        # - Create and register class.
         # Because of the way import happens (recursively)
         # we may or may not be the first time the task tries to register
         # with the framework.  There should only be one class for each task
@@ -117,6 +142,7 @@ class BaseTask(object):
 
     """
     __metaclass__ = TaskType
+    __tracer__ = None
 
     ErrorMail = ErrorMail
     MaxRetriesExceededError = MaxRetriesExceededError
@@ -246,8 +272,8 @@ class BaseTask(object):
     #: The type of task *(no longer used)*.
     type = "regular"
 
-    def __call__(self, *args, **kwargs):
-        return self.run(*args, **kwargs)
+    #: Execution strategy used, or the qualified name of one.
+    Strategy = "celery.worker.strategy:default"
 
     def __reduce__(self):
         return (_unpickle_task, (self.name, ), None)
@@ -256,6 +282,9 @@ class BaseTask(object):
         """The body of the task executed by workers."""
         raise NotImplementedError("Tasks must define the run method.")
 
+    def start_strategy(self, app, consumer):
+        return instantiate(self.Strategy, self, app, consumer)
+
     @classmethod
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
             **kwargs):
@@ -533,9 +562,11 @@ class BaseTask(object):
                         "eta": eta})
 
         if max_retries is not None and options["retries"] > max_retries:
-            raise exc or self.MaxRetriesExceededError(
-                            "Can't retry %s[%s] args:%s kwargs:%s" % (
-                                self.name, options["task_id"], args, kwargs))
+            if exc:
+                raise
+            raise self.MaxRetriesExceededError(
+                    "Can't retry %s[%s] args:%s kwargs:%s" % (
+                        self.name, options["task_id"], args, kwargs))
 
         # If task was executed eagerly using apply(),
         # then the retry must also be executed eagerly.
@@ -591,13 +622,14 @@ class BaseTask(object):
                                         if key in supported_keys)
             kwargs.update(extend_with)
 
-        trace = TaskTrace(task.name, task_id, args, kwargs,
-                          task=task, request=request, propagate=throw)
-        retval = trace.execute()
+        retval, info = eager_trace_task(task, task_id, args, kwargs,
+                                        request=request, propagate=throw)
         if isinstance(retval, ExceptionInfo):
             retval = retval.exception
-        return EagerResult(task_id, retval, trace.status,
-                           traceback=trace.strtb)
+        state, tb = states.SUCCESS, ''
+        if info is not None:
+            state, tb = info.state, info.strtb
+        return EagerResult(task_id, retval, state, traceback=tb)
 
     @classmethod
     def AsyncResult(self, task_id):
@@ -655,8 +687,7 @@ class BaseTask(object):
         The return value of this handler is ignored.
 
         """
-        if self.request.chord:
-            self.backend.on_chord_part_return(self)
+        pass
 
     def on_failure(self, exc, task_id, args, kwargs, einfo):
         """Error handler.

+ 3 - 3
celery/apps/beat.py

@@ -9,7 +9,7 @@ import traceback
 from .. import __version__, platforms
 from .. import beat
 from ..app import app_or_default
-from ..utils import get_full_cls_name, LOG_LEVELS
+from ..utils import LOG_LEVELS, qualname
 from ..utils.timeutils import humanize_seconds
 
 STARTUP_INFO_FMT = """
@@ -104,8 +104,8 @@ class Beat(object):
             "conninfo": self.app.broker_connection().as_uri(),
             "logfile": self.logfile or "[stderr]",
             "loglevel": LOG_LEVELS[self.loglevel],
-            "loader": get_full_cls_name(self.app.loader.__class__),
-            "scheduler": get_full_cls_name(scheduler.__class__),
+            "loader": qualname(self.app.loader),
+            "scheduler": qualname(scheduler),
             "scheduler_info": scheduler.info,
             "hmax_interval": humanize_seconds(beat.max_interval),
             "max_interval": beat.max_interval,

+ 2 - 2
celery/apps/worker.py

@@ -15,7 +15,7 @@ import warnings
 from .. import __version__, platforms, signals
 from ..app import app_or_default
 from ..exceptions import ImproperlyConfigured, SystemTerminate
-from ..utils import get_full_cls_name, isatty, LOG_LEVELS, cry
+from ..utils import isatty, LOG_LEVELS, cry, qualname
 from ..worker import WorkController
 
 try:
@@ -222,7 +222,7 @@ class Worker(object):
             "logfile": self.logfile or "[stderr]",
             "celerybeat": "ON" if self.run_clockservice else "OFF",
             "events": "ON" if self.events else "OFF",
-            "loader": get_full_cls_name(self.loader.__class__),
+            "loader": qualname(self.loader),
             "queues": app.amqp.queues.format(indent=18, indent_first=False),
         }
 

+ 8 - 8
celery/backends/__init__.py

@@ -7,14 +7,14 @@ from ..utils import get_cls_by_name
 from ..utils.functional import memoize
 
 BACKEND_ALIASES = {
-    "amqp": "celery.backends.amqp.AMQPBackend",
-    "cache": "celery.backends.cache.CacheBackend",
-    "redis": "celery.backends.redis.RedisBackend",
-    "mongodb": "celery.backends.mongodb.MongoBackend",
-    "tyrant": "celery.backends.tyrant.TyrantBackend",
-    "database": "celery.backends.database.DatabaseBackend",
-    "cassandra": "celery.backends.cassandra.CassandraBackend",
-    "disabled": "celery.backends.base.DisabledBackend",
+    "amqp": "celery.backends.amqp:AMQPBackend",
+    "cache": "celery.backends.cache:CacheBackend",
+    "redis": "celery.backends.redis:RedisBackend",
+    "mongodb": "celery.backends.mongodb:MongoBackend",
+    "tyrant": "celery.backends.tyrant:TyrantBackend",
+    "database": "celery.backends.database:DatabaseBackend",
+    "cassandra": "celery.backends.cassandra:CassandraBackend",
+    "disabled": "celery.backends.base:DisabledBackend",
 }
 
 

+ 0 - 2
celery/bin/base.py

@@ -116,8 +116,6 @@ class Command(object):
             sys.stderr.write(
                 "\nUnrecognized command line arguments: %s\n" % (
                     ", ".join(args), ))
-            import traceback
-            traceback.print_stack(file=sys.stderr)
             sys.stderr.write("\nTry --help?\n")
             sys.exit(1)
         return self.run(*args, **vars(options))

+ 1 - 1
celery/bin/celerybeat.py

@@ -77,7 +77,7 @@ class BeatCommand(Command):
                 default=None,
                 action="store", dest="scheduler_cls",
                 help="Scheduler class. Default is "
-                     "celery.beat.PersistentScheduler"),
+                     "celery.beat:PersistentScheduler"),
             Option('-l', '--loglevel',
                 default=conf.CELERYBEAT_LOG_LEVEL,
                 action="store", dest="loglevel",

+ 1 - 1
celery/bin/celeryd.py

@@ -140,7 +140,7 @@ class WorkerCommand(Command):
                 default=None,
                 action="store", dest="scheduler_cls",
                 help="Scheduler class. Default is "
-                     "celery.beat.PersistentScheduler"),
+                     "celery.beat:PersistentScheduler"),
             Option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
                 action="store", dest="db",
                 help="Path to the state database. The extension '.db' will "

+ 5 - 5
celery/concurrency/__init__.py

@@ -4,11 +4,11 @@ from __future__ import absolute_import
 from ..utils import get_cls_by_name
 
 ALIASES = {
-    "processes": "celery.concurrency.processes.TaskPool",
-    "eventlet": "celery.concurrency.eventlet.TaskPool",
-    "gevent": "celery.concurrency.gevent.TaskPool",
-    "threads": "celery.concurrency.threads.TaskPool",
-    "solo": "celery.concurrency.solo.TaskPool",
+    "processes": "celery.concurrency.processes:TaskPool",
+    "eventlet": "celery.concurrency.eventlet:TaskPool",
+    "gevent": "celery.concurrency.gevent:TaskPool",
+    "threads": "celery.concurrency.threads:TaskPool",
+    "solo": "celery.concurrency.solo:TaskPool",
 }
 
 

+ 3 - 3
celery/concurrency/base.py

@@ -16,7 +16,7 @@ from ..utils.encoding import safe_repr
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
-        accept_callback=None, pid=None):
+        accept_callback=None, pid=None, **_):
     if accept_callback:
         accept_callback(pid or os.getpid(), time.time())
     callback(target(*args, **kwargs))
@@ -41,7 +41,7 @@ class BasePool(object):
         self.putlocks = putlocks
         self.logger = logger or log.get_default_logger()
         self.options = options
-        self.does_debug = self.logger.isEnabledFor(logging.DEBUG)
+        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
 
     def on_start(self):
         pass
@@ -91,7 +91,7 @@ class BasePool(object):
         on_ready = partial(self.on_ready, callback, errback)
         on_worker_error = partial(self.on_worker_error, errback)
 
-        if self.does_debug:
+        if self._does_debug:
             self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)",
                             target, safe_repr(args), safe_repr(kwargs))
 

+ 2 - 5
celery/concurrency/processes/pool.py

@@ -807,11 +807,8 @@ class Pool(object):
             warnings.warn(UserWarning("Soft timeouts are not supported: "
                     "on this platform: It does not have the SIGUSR1 signal."))
             soft_timeout = None
-        if waitforslot and self._putlock is not None:
-            while 1:
-                if self._state != RUN or self._putlock.acquire(False):
-                    break
-                time.sleep(1.0)
+        if waitforslot and self._putlock is not None and self._state == RUN:
+            self._putlock.acquire()
         if self._state == RUN:
             result = ApplyResult(self._cache, callback,
                                  accept_callback, timeout_callback,

+ 1 - 1
celery/concurrency/solo.py

@@ -15,7 +15,7 @@ class TaskPool(BasePool):
 
     def _get_info(self):
         return {"max-concurrency": 1,
-                "processes": [self.pid],
+                "processes": [os.getpid()],
                 "max-tasks-per-child": None,
                 "put-guarded-by-semaphore": True,
                 "timeouts": ()}

+ 47 - 0
celery/contrib/bundles.py

@@ -0,0 +1,47 @@
+from __future__ import absolute_import
+
+from celery import VERSION
+from bundle.extensions import Dist
+
+
+defaults = {"author": "Celery Project",
+            "author_email": "bundles@celeryproject.org",
+            "url": "http://celeryproject.org",
+            "license": "BSD"}
+celery = Dist("celery", VERSION, **defaults)
+django_celery = Dist("django-celery", VERSION, **defaults)
+flask_celery = Dist("Flask-Celery", VERSION, **defaults)
+
+bundles = [
+    celery.Bundle("celery-with-redis",
+        "Bundle installing the dependencies for Celery and Redis",
+        requires=["redis>=2.4.4"]),
+    celery.Bundle("celery-with-mongodb",
+        "Bundle installing the dependencies for Celery and MongoDB",
+        requires=["pymongo"]),
+    celery.Bundle("celery-with-couchdb",
+        "Bundle installing the dependencies for Celery and CouchDB",
+        requires=["couchdb"]),
+    celery.Bundle("celery-with-beanstalk",
+        "Bundle installing the dependencies for Celery and Beanstalk",
+        requires=["beanstalkc"]),
+
+    django_celery.Bundle("django-celery-with-redis",
+        "Bundle installing the dependencies for Django-Celery and Redis",
+        requires=["redis>=2.4.4"]),
+    django_celery.Bundle("django-celery-with-mongodb",
+        "Bundle installing the dependencies for Django-Celery and MongoDB",
+        requires=["pymongo"]),
+    django_celery.Bundle("django-celery-with-couchdb",
+        "Bundle installing the dependencies for Django-Celery and CouchDB",
+        requires=["couchdb"]),
+    django_celery.Bundle("django-celery-with-beanstalk",
+        "Bundle installing the dependencies for Django-Celery and Beanstalk",
+        requires=["beanstalkc"]),
+
+    celery.Bundle("bundle-celery",
+        "Bundle that installs Celery related modules",
+        requires=[django_celery, flask_celery,
+                  "django", "setproctitle", "celerymon",
+                  "cyme", "kombu-sqlalchemy", "django-kombu"]),
+]

+ 6 - 0
celery/events/__init__.py

@@ -77,6 +77,8 @@ class EventDispatcher(object):
         self.publisher = None
         self._outbound_buffer = deque()
         self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
+        self.on_enabled = set()
+        self.on_disabled = set()
 
         self.enabled = enabled
         if self.enabled:
@@ -93,11 +95,15 @@ class EventDispatcher(object):
                                   exchange=event_exchange,
                                   serializer=self.serializer)
         self.enabled = True
+        for callback in self.on_enabled:
+            callback()
 
     def disable(self):
         if self.enabled:
             self.enabled = False
             self.close()
+            for callback in self.on_disabled:
+                callback()
 
     def send(self, type, **fields):
         """Send event.

+ 4 - 0
celery/exceptions.py

@@ -84,6 +84,10 @@ class NotConfigured(UserWarning):
     """Celery has not been configured, as no config module has been found."""
 
 
+class InvalidTaskError(Exception):
+    """The task has invalid data or is not properly constructed."""
+
+
 class CPendingDeprecationWarning(PendingDeprecationWarning):
     pass
 

+ 190 - 98
celery/execute/trace.py

@@ -12,131 +12,223 @@
 """
 from __future__ import absolute_import
 
+# ## ---
+# BE WARNED: You are probably going to suffer a heartattack just
+#            by looking at this code!
+#
+# This is the heart of the worker, the inner loop so to speak.
+# It used to be split up into nice little classes and methods,
+# but in the end it only resulted in bad performance, and horrible tracebacks.
+
+import os
+import socket
 import sys
 import traceback
+import warnings
 
+from .. import current_app
 from .. import states, signals
 from ..datastructures import ExceptionInfo
 from ..exceptions import RetryTaskError
 from ..registry import tasks
+from ..utils.serialization import get_pickleable_exception
+
+send_prerun = signals.task_prerun.send
+prerun_receivers = signals.task_prerun.receivers
+send_postrun = signals.task_postrun.send
+postrun_receivers = signals.task_postrun.receivers
+STARTED = states.STARTED
+SUCCESS = states.SUCCESS
+RETRY = states.RETRY
+FAILURE = states.FAILURE
+EXCEPTION_STATES = states.EXCEPTION_STATES
+_pid = None
+
+
+def getpid():
+    global _pid
+    if _pid is None:
+        _pid = os.getpid()
+    return _pid
 
 
 class TraceInfo(object):
+    __slots__ = ("state", "retval", "exc_info",
+                 "exc_type", "exc_value", "tb", "strtb")
 
-    def __init__(self, status=states.PENDING, retval=None, exc_info=None):
-        self.status = status
+    def __init__(self, state, retval=None, exc_info=None):
+        self.state = state
         self.retval = retval
         self.exc_info = exc_info
-        self.exc_type = None
-        self.exc_value = None
-        self.tb = None
-        self.strtb = None
-        if self.exc_info:
+        if exc_info:
             self.exc_type, self.exc_value, self.tb = exc_info
-            self.strtb = "\n".join(traceback.format_exception(*exc_info))
+        else:
+            self.exc_type = self.exc_value = self.tb = None
 
-    @classmethod
-    def trace(cls, fun, args, kwargs, propagate=False):
-        """Trace the execution of a function, calling the appropiate callback
-        if the function raises retry, an failure or returned successfully.
+    def handle_error_state(self, task, eager=False):
+        store_errors = not eager
+        if task.ignore_result:
+            store_errors = task.store_errors_even_if_ignored
 
-        :keyword propagate: If true, errors will propagate to the caller.
+        return {
+            RETRY: self.handle_retry,
+            FAILURE: self.handle_failure,
+        }[self.state](task, store_errors=store_errors)
 
-        """
-        try:
-            return cls(states.SUCCESS, retval=fun(*args, **kwargs))
-        except RetryTaskError, exc:
-            return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
-        except Exception, exc:
-            if propagate:
-                raise
-            return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
-        except BaseException, exc:
-            raise
-        except:  # pragma: no cover
-            # For Python2.5 where raising strings are still allowed
-            # (but deprecated)
-            if propagate:
-                raise
-            return cls(states.FAILURE, retval=None, exc_info=sys.exc_info())
-
-
-class TaskTrace(object):
-
-    def __init__(self, task_name, task_id, args, kwargs, task=None,
-            request=None, propagate=None, **_):
-        self.task_id = task_id
-        self.task_name = task_name
-        self.args = args
-        self.kwargs = kwargs
-        self.task = task or tasks[self.task_name]
-        self.request = request or {}
-        self.status = states.PENDING
-        self.strtb = None
-        self.propagate = propagate
-        self._trace_handlers = {states.FAILURE: self.handle_failure,
-                                states.RETRY: self.handle_retry,
-                                states.SUCCESS: self.handle_success}
-
-    def __call__(self):
-        return self.execute()
-
-    def execute(self):
-        self.task.request.update(self.request, args=self.args,
-                                 called_directly=False, kwargs=self.kwargs)
-        signals.task_prerun.send(sender=self.task, task_id=self.task_id,
-                                 task=self.task, args=self.args,
-                                 kwargs=self.kwargs)
-        retval = self._trace()
-
-        signals.task_postrun.send(sender=self.task, task_id=self.task_id,
-                                  task=self.task, args=self.args,
-                                  kwargs=self.kwargs, retval=retval)
-        self.task.request.clear()
-        return retval
-
-    def _trace(self):
-        trace = TraceInfo.trace(self.task, self.args, self.kwargs,
-                                propagate=self.propagate)
-        self.status = trace.status
-        self.strtb = trace.strtb
-        handler = self._trace_handlers[trace.status]
-        r = handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
-        self.handle_after_return(trace.status, trace.retval,
-                                 trace.exc_type, trace.tb, trace.strtb,
-                                 einfo=trace.exc_info)
-        return r
-
-    def handle_after_return(self, status, retval, type_, tb, strtb,
-            einfo=None):
-        if status in states.EXCEPTION_STATES:
-            einfo = ExceptionInfo(einfo)
-        self.task.after_return(status, retval, self.task_id,
-                               self.args, self.kwargs, einfo)
-
-    def handle_success(self, retval, *args):
-        """Handle successful execution."""
-        self.task.on_success(retval, self.task_id, self.args, self.kwargs)
-        return retval
-
-    def handle_retry(self, exc, type_, tb, strtb):
+    def handle_retry(self, task, store_errors=True):
         """Handle retry exception."""
         # Create a simpler version of the RetryTaskError that stringifies
         # the original exception instead of including the exception instance.
         # This is for reporting the retry in logs, email etc, while
         # guaranteeing pickleability.
-        message, orig_exc = exc.args
+        req = task.request
+        exc, type_, tb = self.retval, self.exc_type, self.tb
+        message, orig_exc = self.retval.args
+        if store_errors:
+            task.backend.mark_as_retry(req.id, orig_exc, self.strtb)
         expanded_msg = "%s: %s" % (message, str(orig_exc))
         einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
-        self.task.on_retry(exc, self.task_id, self.args, self.kwargs, einfo)
+        task.on_retry(exc, req.id, req.args, req.kwargs, einfo)
         return einfo
 
-    def handle_failure(self, exc, type_, tb, strtb):
+    def handle_failure(self, task, store_errors=True):
         """Handle exception."""
+        req = task.request
+        exc, type_, tb = self.retval, self.exc_type, self.tb
+        if store_errors:
+            task.backend.mark_as_failure(req.id, exc, self.strtb)
+        exc = get_pickleable_exception(exc)
         einfo = ExceptionInfo((type_, exc, tb))
-        self.task.on_failure(exc, self.task_id, self.args, self.kwargs, einfo)
-        signals.task_failure.send(sender=self.task, task_id=self.task_id,
-                                  exception=exc, args=self.args,
-                                  kwargs=self.kwargs, traceback=tb,
+        task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
+        signals.task_failure.send(sender=task, task_id=req.id,
+                                  exception=exc, args=req.args,
+                                  kwargs=req.kwargs, traceback=tb,
                                   einfo=einfo)
         return einfo
+
+    @property
+    def strtb(self):
+        if self.exc_info:
+            return '\n'.join(traceback.format_exception(*self.exc_info))
+        return ''
+
+
+def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
+        Info=TraceInfo, eager=False, propagate=False):
+    task = task or tasks[name]
+    loader = loader or current_app.loader
+    backend = task.backend
+    ignore_result = task.ignore_result
+    track_started = task.track_started
+    track_started = not eager and (task.track_started and not ignore_result)
+    publish_result = not eager and not ignore_result
+    hostname = hostname or socket.gethostname()
+
+    loader_task_init = loader.on_task_init
+    loader_cleanup = loader.on_process_cleanup
+
+    task_on_success = task.on_success
+    task_after_return = task.after_return
+    task_request = task.request
+
+    store_result = backend.store_result
+    backend_cleanup = backend.process_cleanup
+
+    pid = os.getpid()
+
+    update_request = task_request.update
+    clear_request = task_request.clear
+    on_chord_part_return = backend.on_chord_part_return
+
+    def trace_task(uuid, args, kwargs, request=None):
+        R = I = None
+        try:
+            update_request(request or {}, args=args,
+                           called_directly=False, kwargs=kwargs)
+            try:
+                # -*- PRE -*-
+                send_prerun(sender=task, task_id=uuid, task=task,
+                            args=args, kwargs=kwargs)
+                loader_task_init(uuid, task)
+                if track_started:
+                    store_result(uuid, {"pid": pid,
+                                        "hostname": hostname}, STARTED)
+
+                # -*- TRACE -*-
+                try:
+                    R = retval = task(*args, **kwargs)
+                    state, einfo = SUCCESS, None
+                    task_on_success(retval, uuid, args, kwargs)
+                    if publish_result:
+                        store_result(uuid, retval, SUCCESS)
+                except RetryTaskError, exc:
+                    I = Info(RETRY, exc, sys.exc_info())
+                    state, retval, einfo = I.state, I.retval, I.exc_info
+                    R = I.handle_error_state(task, eager=eager)
+                except Exception, exc:
+                    if propagate:
+                        raise
+                    I = Info(FAILURE, exc, sys.exc_info())
+                    state, retval, einfo = I.state, I.retval, I.exc_info
+                    R = I.handle_error_state(task, eager=eager)
+                except BaseException, exc:
+                    raise
+                except:
+                    # pragma: no cover
+                    # For Python2.5 where raising strings are still allowed
+                    # (but deprecated)
+                    if propagate:
+                        raise
+                    I = Info(FAILURE, None, sys.exc_info())
+                    state, retval, einfo = I.state, I.retval, I.exc_info
+                    R = I.handle_error_state(task, eager=eager)
+
+                # -* POST *-
+                if task_request.chord:
+                    on_chord_part_return(task)
+                task_after_return(state, retval, uuid, args, kwargs, einfo)
+                send_postrun(sender=task, task_id=uuid, task=task,
+                            args=args, kwargs=kwargs, retval=retval)
+            finally:
+                clear_request()
+                if not eager:
+                    try:
+                        backend_cleanup()
+                        loader_cleanup()
+                    except (KeyboardInterrupt, SystemExit, MemoryError):
+                        raise
+                    except Exception, exc:
+                        logger = current_app.log.get_default_logger()
+                        logger.error("Process cleanup failed: %r", exc,
+                                     exc_info=sys.exc_info())
+        except Exception, exc:
+            if eager:
+                raise
+            R = report_internal_error(task, exc)
+        return R, I
+
+    return trace_task
+
+
+def trace_task(task, uuid, args, kwargs, request=None, **opts):
+    try:
+        if task.__tracer__ is None:
+            task.__tracer__ = build_tracer(task.name, task, **opts)
+        return task.__tracer__(uuid, args, kwargs, request)
+    except Exception, exc:
+        return report_internal_error(task, exc), None
+
+
+def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
+    opts.setdefault("eager", True)
+    return build_tracer(task.name, task, **opts)(
+            uuid, args, kwargs, request)
+
+
+def report_internal_error(task, exc):
+    _type, _value, _tb = sys.exc_info()
+    _value = task.backend.prepare_exception(exc)
+    exc_info = ExceptionInfo((_type, _value, _tb))
+    warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
+        map(str, (exc.__class__, exc, exc_info.traceback))))
+    return exc_info

+ 3 - 3
celery/loaders/__init__.py

@@ -15,9 +15,9 @@ from __future__ import absolute_import
 from .. import current_app
 from ..utils import deprecated, get_cls_by_name
 
-LOADER_ALIASES = {"app": "celery.loaders.app.AppLoader",
-                  "default": "celery.loaders.default.Loader",
-                  "django": "djcelery.loaders.DjangoLoader"}
+LOADER_ALIASES = {"app": "celery.loaders.app:AppLoader",
+                  "default": "celery.loaders.default:Loader",
+                  "django": "djcelery.loaders:DjangoLoader"}
 
 
 def get_loader_cls(loader):

+ 14 - 10
celery/log.py

@@ -24,6 +24,12 @@ from .utils.term import colored
 is_py3k = sys.version_info >= (3, 0)
 
 
+def mlevel(level):
+    if level and not isinstance(level, int):
+        return LOG_LEVELS[level.upper()]
+    return level
+
+
 class ColorFormatter(logging.Formatter):
     #: Loglevel -> Color mapping.
     COLORS = colored().names
@@ -71,7 +77,7 @@ class Logging(object):
 
     def __init__(self, app):
         self.app = app
-        self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
+        self.loglevel = mlevel(self.app.conf.CELERYD_LOG_LEVEL)
         self.format = self.app.conf.CELERYD_LOG_FORMAT
         self.task_format = self.app.conf.CELERYD_TASK_LOG_FORMAT
         self.colorize = self.app.conf.CELERYD_LOG_COLOR
@@ -92,14 +98,14 @@ class Logging(object):
     def get_task_logger(self, loglevel=None, name=None):
         logger = logging.getLogger(name or "celery.task.default")
         if loglevel is not None:
-            logger.setLevel(loglevel)
+            logger.setLevel(mlevel(loglevel))
         return logger
 
     def setup_logging_subsystem(self, loglevel=None, logfile=None,
             format=None, colorize=None, **kwargs):
         if Logging._setup:
             return
-        loglevel = loglevel or self.loglevel
+        loglevel = mlevel(loglevel or self.loglevel)
         format = format or self.format
         if colorize is None:
             colorize = self.supports_color(logfile)
@@ -120,7 +126,7 @@ class Logging(object):
             mp = mputil.get_logger() if mputil else None
             for logger in filter(None, (root, mp)):
                 self._setup_logger(logger, logfile, format, colorize, **kwargs)
-                logger.setLevel(loglevel)
+                logger.setLevel(mlevel(loglevel))
                 signals.after_setup_logger.send(sender=None, logger=logger,
                                         loglevel=loglevel, logfile=logfile,
                                         format=format, colorize=colorize)
@@ -144,7 +150,7 @@ class Logging(object):
         """
         logger = logging.getLogger(name)
         if loglevel is not None:
-            logger.setLevel(loglevel)
+            logger.setLevel(mlevel(loglevel))
         return logger
 
     def setup_logger(self, loglevel=None, logfile=None,
@@ -157,7 +163,7 @@ class Logging(object):
         Returns logger object.
 
         """
-        loglevel = loglevel or self.loglevel
+        loglevel = mlevel(loglevel or self.loglevel)
         format = format or self.format
         if colorize is None:
             colorize = self.supports_color(logfile)
@@ -179,7 +185,7 @@ class Logging(object):
         Returns logger object.
 
         """
-        loglevel = loglevel or self.loglevel
+        loglevel = mlevel(loglevel or self.loglevel)
         format = format or self.task_format
         if colorize is None:
             colorize = self.supports_color(logfile)
@@ -247,9 +253,7 @@ class LoggingProxy(object):
 
     def __init__(self, logger, loglevel=None):
         self.logger = logger
-        self.loglevel = loglevel or self.logger.level or self.loglevel
-        if not isinstance(self.loglevel, int):
-            self.loglevel = LOG_LEVELS[self.loglevel.upper()]
+        self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
         self._safewrap_handlers()
 
     def _safewrap_handlers(self):

+ 28 - 13
celery/platforms.py

@@ -21,11 +21,18 @@ import sys
 
 from .local import try_import
 
+from kombu.utils.limits import TokenBucket
+
 _setproctitle = try_import("setproctitle")
 resource = try_import("resource")
 pwd = try_import("pwd")
 grp = try_import("grp")
 
+try:
+    from multiprocessing.process import current_process
+except ImportError:
+    current_process = None  # noqa
+
 SYSTEM = _platform.system()
 IS_OSX = SYSTEM == "Darwin"
 IS_WINDOWS = SYSTEM == "Windows"
@@ -34,6 +41,8 @@ DAEMON_UMASK = 0
 DAEMON_WORKDIR = "/"
 DAEMON_REDIRECT_TO = getattr(os, "devnull", "/dev/null")
 
+_setps_bucket = TokenBucket(0.5)  # 30/m, every 2 seconds
+
 
 def pyimplementation():
     if hasattr(_platform, "python_implementation"):
@@ -552,21 +561,27 @@ def set_process_title(progname, info=None):
     return proctitle
 
 
-def set_mp_process_title(progname, info=None, hostname=None):
-    """Set the ps name using the multiprocessing process name.
+if os.environ.get("NOSETPS"):
 
-    Only works if :mod:`setproctitle` is installed.
+    def set_mp_process_title(*a, **k):
+        pass
+else:
 
-    """
-    if hostname:
-        progname = "%s@%s" % (progname, hostname.split(".")[0])
-    try:
-        from multiprocessing.process import current_process
-    except ImportError:
-        return set_process_title(progname, info=info)
-    else:
-        return set_process_title("%s:%s" % (progname,
-                                            current_process().name), info=info)
+    def set_mp_process_title(progname, info=None, hostname=None,  # noqa
+            rate_limit=False):
+        """Set the ps name using the multiprocessing process name.
+
+        Only works if :mod:`setproctitle` is installed.
+
+        """
+        if not rate_limit or _setps_bucket.can_consume(1):
+            if hostname:
+                progname = "%s@%s" % (progname, hostname.split(".")[0])
+            if current_process is not None:
+                return set_process_title(
+                    "%s:%s" % (progname, current_process().name), info=info)
+            else:
+                return set_process_title(progname, info=info)
 
 
 def shellsplit(s, posix=True):

+ 0 - 7
celery/registry.py

@@ -17,7 +17,6 @@ from .exceptions import NotRegistered
 
 
 class TaskRegistry(dict):
-
     NotRegistered = NotRegistered
 
     def regular(self):
@@ -59,12 +58,6 @@ class TaskRegistry(dict):
         return dict((name, task) for name, task in self.iteritems()
                                     if task.type == type)
 
-    def __getitem__(self, key):
-        try:
-            return dict.__getitem__(self, key)
-        except KeyError:
-            raise self.NotRegistered(key)
-
     def pop(self, key, *args):
         try:
             return dict.pop(self, key, *args)

+ 12 - 5
celery/schedules.py

@@ -18,7 +18,7 @@ from datetime import datetime, timedelta
 from dateutil.relativedelta import relativedelta
 
 from .utils import is_iterable
-from .utils.timeutils import (timedelta_seconds, weekday,
+from .utils.timeutils import (timedelta_seconds, weekday, maybe_timedelta,
                               remaining, humanize_seconds)
 
 
@@ -30,7 +30,7 @@ class schedule(object):
     relative = False
 
     def __init__(self, run_every=None, relative=False):
-        self.run_every = run_every
+        self.run_every = maybe_timedelta(run_every)
         self.relative = relative
 
     def remaining_estimate(self, last_run_at):
@@ -62,18 +62,25 @@ class schedule(object):
         rem_delta = self.remaining_estimate(last_run_at)
         rem = timedelta_seconds(rem_delta)
         if rem == 0:
-            return True, timedelta_seconds(self.run_every)
+            return True, self.seconds
         return False, rem
 
     def __repr__(self):
-        return "<freq: %s>" % (
-                    humanize_seconds(timedelta_seconds(self.run_every)), )
+        return "<freq: %s>" % self.human_seconds
 
     def __eq__(self, other):
         if isinstance(other, schedule):
             return self.run_every == other.run_every
         return self.run_every == other
 
+    @property
+    def seconds(self):
+        return timedelta_seconds(self.run_every)
+
+    @property
+    def human_seconds(self):
+        return humanize_seconds(self.seconds)
+
 
 class crontab_parser(object):
     """Parser for crontab expressions. Any expression of the form 'groups'

+ 2 - 2
celery/tests/functional/case.py

@@ -13,7 +13,7 @@ from time import time
 
 from celery.exceptions import TimeoutError
 from celery.task.control import ping, flatten_reply, inspect
-from celery.utils import get_full_cls_name
+from celery.utils import qualname
 
 from celery.tests.utils import unittest
 
@@ -84,7 +84,7 @@ class Worker(object):
     def managed(cls, hostname=None, caller=None):
         hostname = hostname or socket.gethostname()
         if caller:
-            hostname = ".".join([get_full_cls_name(caller), hostname])
+            hostname = ".".join([qualname(caller), hostname])
         else:
             hostname += str(cls.next_worker_id())
         worker = cls(hostname)

+ 0 - 1
celery/tests/test_concurrency/test_concurrency_solo.py

@@ -12,7 +12,6 @@ class test_solo_TaskPool(unittest.TestCase):
     def test_on_start(self):
         x = solo.TaskPool()
         x.on_start()
-        self.assertTrue(x.pid)
 
     def test_on_apply(self):
         x = solo.TaskPool()

+ 0 - 4
celery/tests/test_task/__init__.py

@@ -4,8 +4,6 @@ from __future__ import with_statement
 from datetime import datetime, timedelta
 from functools import wraps
 
-from mock import Mock
-
 from celery import task
 from celery.app import app_or_default
 from celery.task import task as task_dec
@@ -343,10 +341,8 @@ class TestCeleryTasks(unittest.TestCase):
 
     def test_after_return(self):
         task = self.createTaskCls("T1", "c.unittest.t.after_return")()
-        task.backend = Mock()
         task.request.chord = return_True_task.subtask()
         task.after_return("SUCCESS", 1.0, "foobar", (), {}, None)
-        task.backend.on_chord_part_return.assert_called_with(task)
         task.request.clear()
 
     def test_send_task_sent_event(self):

+ 20 - 12
celery/tests/test_task/test_execute_trace.py

@@ -1,26 +1,34 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
-import operator
-
+from celery import current_app
 from celery import states
 from celery.exceptions import RetryTaskError
-from celery.execute.trace import TraceInfo
+from celery.execute.trace import eager_trace_task
 from celery.tests.utils import unittest
 
-trace = TraceInfo.trace
+
+@current_app.task
+def add(x, y):
+    return x + y
 
 
+@current_app.task
 def raises(exc):
     raise exc
 
 
-class test_TraceInfo(unittest.TestCase):
+def trace(task, args=(), kwargs={}, propagate=False):
+    return eager_trace_task(task, "id-1", args, kwargs,
+                      propagate=propagate)
+
+
+class test_trace(unittest.TestCase):
 
     def test_trace_successful(self):
-        info = trace(operator.add, (2, 2), {})
-        self.assertEqual(info.status, states.SUCCESS)
-        self.assertEqual(info.retval, 4)
+        retval, info = trace(add, (2, 2), {})
+        self.assertIsNone(info)
+        self.assertEqual(retval, 4)
 
     def test_trace_SystemExit(self):
         with self.assertRaises(SystemExit):
@@ -28,14 +36,14 @@ class test_TraceInfo(unittest.TestCase):
 
     def test_trace_RetryTaskError(self):
         exc = RetryTaskError("foo", "bar")
-        info = trace(raises, (exc, ), {})
-        self.assertEqual(info.status, states.RETRY)
+        _, info = trace(raises, (exc, ), {})
+        self.assertEqual(info.state, states.RETRY)
         self.assertIs(info.retval, exc)
 
     def test_trace_exception(self):
         exc = KeyError("foo")
-        info = trace(raises, (exc, ), {})
-        self.assertEqual(info.status, states.FAILURE)
+        _, info = trace(raises, (exc, ), {})
+        self.assertEqual(info.state, states.FAILURE)
         self.assertIs(info.retval, exc)
 
     def test_trace_exception_propagate(self):

+ 3 - 2
celery/tests/test_utils/__init__.py

@@ -33,9 +33,10 @@ class test_chunks(unittest.TestCase):
 
 class test_utils(unittest.TestCase):
 
-    def test_get_full_cls_name(self):
+    def test_qualname(self):
         Class = type("Fox", (object, ), {"__module__": "quick.brown"})
-        self.assertEqual(utils.get_full_cls_name(Class), "quick.brown.Fox")
+        self.assertEqual(utils.qualname(Class), "quick.brown.Fox")
+        self.assertEqual(utils.qualname(Class()), "quick.brown.Fox")
 
     def test_is_iterable(self):
         for a in "f", ["f"], ("f", ), {"f": "f"}:

+ 6 - 6
celery/tests/test_utils/test_utils_timeutils.py

@@ -33,12 +33,12 @@ class test_timeutils(unittest.TestCase):
         self.assertEqual(timeutils.timedelta_seconds(delta), 0)
 
     def test_humanize_seconds(self):
-        t = ((4 * 60 * 60 * 24, "4 days"),
-             (1 * 60 * 60 * 24, "1 day"),
-             (4 * 60 * 60, "4 hours"),
-             (1 * 60 * 60, "1 hour"),
-             (4 * 60, "4 minutes"),
-             (1 * 60, "1 minute"),
+        t = ((4 * 60 * 60 * 24, "4.00 days"),
+             (1 * 60 * 60 * 24, "1.00 day"),
+             (4 * 60 * 60, "4.00 hours"),
+             (1 * 60 * 60, "1.00 hour"),
+             (4 * 60, "4.00 minutes"),
+             (1 * 60, "1.00 minute"),
              (4, "4.00 seconds"),
              (1, "1.00 second"),
              (4.3567631221, "4.36 seconds"),

+ 4 - 0
celery/tests/test_worker/__init__.py

@@ -297,6 +297,7 @@ class test_Consumer(unittest.TestCase):
                                    eta=datetime.now().isoformat())
         l.event_dispatcher = Mock()
         l.pidbox_node = MockNode()
+        l.update_strategies()
 
         l.receive_message(m.decode(), m)
         self.assertTrue(m.acknowledged)
@@ -308,6 +309,7 @@ class test_Consumer(unittest.TestCase):
                            send_events=False)
         m = create_message(Mock(), task=foo_task.name,
                            args=(1, 2), kwargs="foobarbaz", id=1)
+        l.update_strategies()
         l.event_dispatcher = Mock()
         l.pidbox_node = MockNode()
 
@@ -336,6 +338,7 @@ class test_Consumer(unittest.TestCase):
                            send_events=False)
         m = create_message(Mock(), task=foo_task.name,
                            args=[2, 4, 8], kwargs={})
+        l.update_strategies()
 
         l.event_dispatcher = Mock()
         l.receive_message(m.decode(), m)
@@ -463,6 +466,7 @@ class test_Consumer(unittest.TestCase):
         l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
         l.event_dispatcher = Mock()
         l.enabled = False
+        l.update_strategies()
         l.receive_message(m.decode(), m)
         l.eta_schedule.stop()
 

+ 2 - 2
celery/tests/test_worker/test_worker_control.py

@@ -40,8 +40,8 @@ class Consumer(object):
 
     def __init__(self):
         self.ready_queue = FastQueue()
-        self.ready_queue.put(TaskRequest(task_name=mytask.name,
-                                         task_id=uuid(),
+        self.ready_queue.put(TaskRequest(mytask.name,
+                                         uuid(),
                                          args=(2, 2),
                                          kwargs={}))
         self.eta_schedule = Timer()

+ 3 - 0
celery/tests/test_worker/test_worker_heartbeat.py

@@ -10,6 +10,9 @@ class MockDispatcher(object):
 
     def __init__(self):
         self.sent = []
+        self.on_enabled = set()
+        self.on_disabled = set()
+        self.enabled = True
 
     def send(self, msg, **_fields):
         self.sent.append(msg)

+ 77 - 58
celery/tests/test_worker/test_worker_job.py

@@ -7,6 +7,7 @@ import logging
 import os
 import sys
 import time
+import warnings
 
 from datetime import datetime, timedelta
 
@@ -18,16 +19,17 @@ from celery import states
 from celery.app import app_or_default
 from celery.concurrency.base import BasePool
 from celery.datastructures import ExceptionInfo
-from celery.task import task as task_dec
-from celery.exceptions import RetryTaskError, NotRegistered, WorkerLostError
+from celery.exceptions import (RetryTaskError,
+                               WorkerLostError, InvalidTaskError)
+from celery.execute.trace import eager_trace_task, TraceInfo
 from celery.log import setup_logger
+from celery.registry import tasks
 from celery.result import AsyncResult
+from celery.task import task as task_dec
 from celery.task.base import Task
 from celery.utils import uuid
-from celery.utils.encoding import from_utf8
-from celery.worker.job import (WorkerTaskTrace, TaskRequest,
-                               InvalidTaskError, execute_and_trace,
-                               default_encode)
+from celery.utils.encoding import from_utf8, default_encode
+from celery.worker.job import TaskRequest, execute_and_trace
 from celery.worker.state import revoked
 
 from celery.tests.compat import catch_warnings
@@ -39,11 +41,11 @@ scratch = {"ACK": False}
 some_kwargs_scratchpad = {}
 
 
-def jail(task_id, task_name, args, kwargs):
-    return WorkerTaskTrace(task_name, task_id, args, kwargs)()
+def jail(task_id, name, args, kwargs):
+    return eager_trace_task(tasks[name], task_id, args, kwargs, eager=False)[0]
 
 
-def on_ack():
+def on_ack(*args, **kwargs):
     scratch["ACK"] = True
 
 
@@ -109,7 +111,7 @@ class test_RetryTaskError(unittest.TestCase):
             self.assertEqual(ret.exc, exc)
 
 
-class test_WorkerTaskTrace(unittest.TestCase):
+class test_trace_task(unittest.TestCase):
 
     def test_process_cleanup_fails(self):
         backend = mytask.backend
@@ -122,7 +124,8 @@ class test_WorkerTaskTrace(unittest.TestCase):
                 tid = uuid()
                 ret = jail(tid, mytask.name, [2], {})
                 self.assertEqual(ret, 4)
-                mytask.backend.mark_as_done.assert_called_with(tid, 4)
+                mytask.backend.store_result.assert_called_with(tid, 4,
+                                                               states.SUCCESS)
                 logs = sio.getvalue().strip()
                 self.assertIn("Process cleanup failed", logs)
         finally:
@@ -143,15 +146,16 @@ class test_WorkerTaskTrace(unittest.TestCase):
         self.assertEqual(ret, 4)
 
     def test_marked_as_started(self):
-        mytask.track_started = True
 
         class Backend(mytask.backend.__class__):
             _started = []
 
-            def mark_as_started(self, tid, *args, **kwargs):
-                self._started.append(tid)
+            def store_result(self, tid, meta, state):
+                if state == states.STARTED:
+                    self._started.append(tid)
 
         prev, mytask.backend = mytask.backend, Backend()
+        mytask.track_started = True
 
         try:
             tid = uuid()
@@ -168,17 +172,26 @@ class test_WorkerTaskTrace(unittest.TestCase):
             mytask.ignore_result = False
 
     def test_execute_jail_failure(self):
-        ret = jail(uuid(), mytask_raising.name,
-                   [4], {})
-        self.assertIsInstance(ret, ExceptionInfo)
-        self.assertTupleEqual(ret.exception.args, (4, ))
+        u = uuid()
+        mytask_raising.request.update({"id": u})
+        try:
+            ret = jail(u, mytask_raising.name,
+                    [4], {})
+            self.assertIsInstance(ret, ExceptionInfo)
+            self.assertTupleEqual(ret.exception.args, (4, ))
+        finally:
+            mytask_raising.request.clear()
 
     def test_execute_ignore_result(self):
         task_id = uuid()
-        ret = jail(id, MyTaskIgnoreResult.name,
-                   [4], {})
-        self.assertEqual(ret, 256)
-        self.assertFalse(AsyncResult(task_id).ready())
+        MyTaskIgnoreResult.request.update({"id": task_id})
+        try:
+            ret = jail(task_id, MyTaskIgnoreResult.name,
+                       [4], {})
+            self.assertEqual(ret, 256)
+            self.assertFalse(AsyncResult(task_id).ready())
+        finally:
+            MyTaskIgnoreResult.request.clear()
 
 
 class MockEventDispatcher(object):
@@ -200,10 +213,10 @@ class test_TaskRequest(unittest.TestCase):
         mytask.ignore_result = True
         try:
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-            self.assertFalse(tw._store_errors)
+            self.assertFalse(tw.store_errors)
             mytask.store_errors_even_if_ignored = True
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-            self.assertTrue(tw._store_errors)
+            self.assertTrue(tw.store_errors)
         finally:
             mytask.ignore_result = False
             mytask.store_errors_even_if_ignored = False
@@ -449,32 +462,33 @@ class test_TaskRequest(unittest.TestCase):
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             tw.logger = MockLogger()
         finally:
-            mytask.ignore_result = False
             tw.on_timeout(soft=True, timeout=1336)
             self.assertEqual(mytask.backend.get_status(tw.task_id),
                              states.PENDING)
+            mytask.ignore_result = False
 
     def test_execute_and_trace(self):
         res = execute_and_trace(mytask.name, uuid(), [4], {})
         self.assertEqual(res, 4 ** 4)
 
     def test_execute_safe_catches_exception(self):
-        old_exec = WorkerTaskTrace.execute
+        warnings.resetwarnings()
 
         def _error_exec(self, *args, **kwargs):
             raise KeyError("baz")
 
-        WorkerTaskTrace.execute = _error_exec
-        try:
-            with catch_warnings(record=True) as log:
-                res = execute_and_trace(mytask.name, uuid(),
-                                        [4], {})
-                self.assertIsInstance(res, ExceptionInfo)
-                self.assertTrue(log)
-                self.assertIn("Exception outside", log[0].message.args[0])
-                self.assertIn("KeyError", log[0].message.args[0])
-        finally:
-            WorkerTaskTrace.execute = old_exec
+        @task_dec
+        def raising():
+            raise KeyError("baz")
+        raising.request = None
+
+        with catch_warnings(record=True) as log:
+            res = execute_and_trace(raising.name, uuid(),
+                                    [], {})
+            self.assertIsInstance(res, ExceptionInfo)
+            self.assertTrue(log)
+            self.assertIn("Exception outside", log[0].message.args[0])
+            self.assertIn("AttributeError", log[0].message.args[0])
 
     def create_exception(self, exc):
         try:
@@ -485,27 +499,31 @@ class test_TaskRequest(unittest.TestCase):
     def test_worker_task_trace_handle_retry(self):
         from celery.exceptions import RetryTaskError
         tid = uuid()
-        w = WorkerTaskTrace(mytask.name, tid, [4], {})
-        type_, value_, tb_ = self.create_exception(ValueError("foo"))
-        type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
-                                                                  exc=value_))
-        w._store_errors = False
-        w.handle_retry(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
-        w._store_errors = True
-        w.handle_retry(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(tid), states.RETRY)
+        mytask.request.update({"id": tid})
+        try:
+            _, value_, _ = self.create_exception(ValueError("foo"))
+            einfo = self.create_exception(RetryTaskError(str(value_),
+                                          exc=value_))
+            w = TraceInfo(states.RETRY, einfo[1], einfo)
+            w.handle_retry(mytask, store_errors=False)
+            self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
+            w.handle_retry(mytask, store_errors=True)
+            self.assertEqual(mytask.backend.get_status(tid), states.RETRY)
+        finally:
+            mytask.request.clear()
 
     def test_worker_task_trace_handle_failure(self):
         tid = uuid()
-        w = WorkerTaskTrace(mytask.name, tid, [4], {})
-        type_, value_, tb_ = self.create_exception(ValueError("foo"))
-        w._store_errors = False
-        w.handle_failure(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
-        w._store_errors = True
-        w.handle_failure(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(tid), states.FAILURE)
+        mytask.request.update({"id": tid})
+        try:
+            einfo = self.create_exception(ValueError("foo"))
+            w = TraceInfo(states.FAILURE, einfo[1], einfo)
+            w.handle_failure(mytask, store_errors=False)
+            self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
+            w.handle_failure(mytask, store_errors=True)
+            self.assertEqual(mytask.backend.get_status(tid), states.FAILURE)
+        finally:
+            mytask.request.clear()
 
     def test_task_wrapper_mail_attrs(self):
         tw = TaskRequest(mytask.name, uuid(), [], {})
@@ -533,8 +551,9 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(tw.task_id, body["id"])
         self.assertEqual(tw.args, body["args"])
         us = from_utf8(us)
-        self.assertEqual(tw.kwargs.keys()[0], us)
-        self.assertIsInstance(tw.kwargs.keys()[0], str)
+        if sys.version_info < (2, 6):
+            self.assertEqual(tw.kwargs.keys()[0], us)
+            self.assertIsInstance(tw.kwargs.keys()[0], str)
         self.assertTrue(tw.logger)
 
     def test_from_message_empty_args(self):
@@ -561,7 +580,7 @@ class test_TaskRequest(unittest.TestCase):
         m = Message(None, body=anyjson.serialize(body), backend="foo",
                           content_type="application/json",
                           content_encoding="utf-8")
-        with self.assertRaises(NotRegistered):
+        with self.assertRaises(KeyError):
             TaskRequest.from_message(m, m.decode())
 
     def test_execute(self):

+ 23 - 8
celery/utils/__init__.py

@@ -71,7 +71,7 @@ def deprecated(description=None, deprecation=None, removal=None,
 
         @wraps(fun)
         def __inner(*args, **kwargs):
-            warn_deprecated(description=description or get_full_cls_name(fun),
+            warn_deprecated(description=description or qualname(fun),
                             deprecation=deprecation,
                             removal=removal,
                             alternative=alternative)
@@ -169,7 +169,7 @@ def noop(*args, **kwargs):
     pass
 
 
-if sys.version_info >= (3, 0):
+if sys.version_info >= (2, 6):
 
     def kwdict(kwargs):
         return kwargs
@@ -264,10 +264,19 @@ def mattrgetter(*attrs):
                                 for attr in attrs)
 
 
-def get_full_cls_name(cls):
-    """With a class, get its full module and class name."""
-    return ".".join([cls.__module__,
-                     cls.__name__])
+if sys.version_info >= (3, 3):
+
+    def qualname(obj):
+        return obj.__qualname__
+
+else:
+
+    def qualname(obj):  # noqa
+        if not hasattr(obj, "__name__") and hasattr(obj, "__class__"):
+            return qualname(obj.__class__)
+
+        return '.'.join([obj.__module__, obj.__name__])
+get_full_cls_name = qualname  # XXX Compat
 
 
 def fun_takes_kwargs(fun, kwlist=[]):
@@ -299,7 +308,8 @@ def fun_takes_kwargs(fun, kwlist=[]):
     return filter(partial(operator.contains, args), kwlist)
 
 
-def get_cls_by_name(name, aliases={}, imp=None, package=None, **kwargs):
+def get_cls_by_name(name, aliases={}, imp=None, package=None,
+        sep='.', **kwargs):
     """Get class by name.
 
     The name should be the full dot-separated path to the class::
@@ -311,6 +321,10 @@ def get_cls_by_name(name, aliases={}, imp=None, package=None, **kwargs):
         celery.concurrency.processes.TaskPool
                                     ^- class name
 
+    or using ':' to separate module and symbol::
+
+        celery.concurrency.processes:TaskPool
+
     If `aliases` is provided, a dict containing short name/long name
     mappings, the name is looked up in the aliases first.
 
@@ -336,7 +350,8 @@ def get_cls_by_name(name, aliases={}, imp=None, package=None, **kwargs):
         return name                                 # already a class
 
     name = aliases.get(name) or name
-    module_name, _, cls_name = name.rpartition(".")
+    sep = ':' if ':' in name else sep
+    module_name, _, cls_name = name.rpartition(sep)
     if not module_name and package:
         module_name = package
     try:

+ 142 - 0
celery/utils/coroutine.py

@@ -0,0 +1,142 @@
+from __future__ import absolute_import
+
+from functools import wraps
+from Queue import Queue
+
+from celery.utils import cached_property
+
+
+def coroutine(fun):
+    """Decorator that turns a generator into a coroutine that is
+    started automatically, and that can send values back to the caller.
+
+    **Example coroutine that returns values to caller**::
+
+        @coroutine
+        def adder(self):
+            while 1:
+            x, y = (yield)
+            self.give(x + y)
+
+        >>> c = adder()
+
+        # call sends value and returns the result.
+        >>> c.call(4, 4)
+        8
+
+        # or you can send the value and get the result later.
+        >>> c.send(4, 4)
+        >>> c.get()
+        8
+
+
+    **Example sink (input-only coroutine)**::
+
+        @coroutine
+        def uniq():
+            seen = set()
+            while 1:
+                line = (yield)
+                if line not in seen:
+                    seen.add(line)
+                    print(line)
+
+        >>> u = uniq()
+        >>> [u.send(l) for l in [1, 2, 2, 3]]
+        [1, 2, 3]
+
+    **Example chaining coroutines**::
+
+        @coroutine
+        def uniq(callback):
+            seen = set()
+            while 1:
+                line = (yield)
+                if line not in seen:
+                    callback.send(line)
+                    seen.add(line)
+
+        @coroutine
+        def uppercaser(callback):
+            while 1:
+                line = (yield)
+                callback.send(str(line).upper())
+
+        @coroutine
+        def printer():
+            while 1:
+                line = (yield)
+                print(line)
+
+        >>> pipe = uniq(uppercaser(printer()))
+        >>> for line in file("AUTHORS").readlines():
+                pipe.send(line)
+
+    """
+    @wraps(fun)
+    def start(*args, **kwargs):
+        return Coroutine.start_from(fun, *args, **kwargs)
+    return start
+
+
+class Coroutine(object):
+    _gen = None
+    started = False
+
+    def bind(self, generator):
+        self._gen = generator
+
+    def _next(self):
+        return self._gen.next()
+    next = __next__ = _next
+
+    def start(self):
+        if self.started:
+            raise ValueError("coroutine already started")
+        self.next()
+        self.started = True
+        return self
+
+    def send1(self, value):
+        return self._gen.send(value)
+
+    def call1(self, value, timeout=None):
+        self.send1(value)
+        return self.get(timeout=timeout)
+
+    def send(self, *args):
+        return self._gen.send(args)
+
+    def call(self, *args, **opts):
+        self.send(*args)
+        return self.get(**opts)
+
+    @classmethod
+    def start_from(cls, fun, *args, **kwargs):
+        coro = cls()
+        coro.bind(fun(coro, *args, **kwargs))
+        return coro.start()
+
+    @cached_property
+    def __output__(self):
+        return Queue()
+
+    @property
+    def give(self):
+        return self.__output__.put_nowait
+
+    @property
+    def get(self):
+        return self.__output__.get
+
+if __name__ == "__main__":
+
+    @coroutine
+    def adder(self):
+        while 1:
+            x, y = (yield)
+            self.give(x + y)
+
+    x = adder()
+    for i in xrange(10):
+        print(x.call(i, i))

+ 6 - 0
celery/utils/encoding.py

@@ -39,6 +39,9 @@ if is_py3k:
             return str_to_bytes(s)
         return s
 
+    def default_encode(obj):
+        return obj
+
     str_t = str
     bytes_t = bytes
 
@@ -55,6 +58,9 @@ else:
     def from_utf8(s, *args, **kwargs):  # noqa
         return s.encode("utf-8", *args, **kwargs)
 
+    def default_encode(obj):            # noqa
+        return unicode(obj, default_encoding())
+
     str_t = unicode
     bytes_t = str
     ensure_bytes = str_to_bytes

+ 8 - 9
celery/utils/timeutils.py

@@ -11,8 +11,6 @@
 """
 from __future__ import absolute_import
 
-import math
-
 from kombu.utils import cached_property
 
 from datetime import datetime, timedelta
@@ -35,10 +33,10 @@ RATE_MODIFIER_MAP = {"s": lambda n: n,
 
 HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, "total_seconds")
 
-TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
-              ("hour", 60 * 60, lambda n: int(math.ceil(n))),
-              ("minute", 60, lambda n: int(math.ceil(n))),
-              ("second", 1, lambda n: "%.2f" % n))
+TIME_UNITS = (("day", 60 * 60 * 24.0, lambda n: "%.2f" % n),
+              ("hour", 60 * 60.0, lambda n: "%.2f" % n),
+              ("minute", 60.0, lambda n: "%.2f" % n),
+              ("second", 1.0, lambda n: "%.2f" % n))
 
 
 class UnknownTimezone(Exception):
@@ -134,14 +132,14 @@ def delta_resolution(dt, delta):
     return dt
 
 
-def remaining(start, ends_in, now=None, relative=True):
+def remaining(start, ends_in, now=None, relative=False):
     """Calculate the remaining time for a start date and a timedelta.
 
     e.g. "how many seconds left for 30 seconds after start?"
 
     :param start: Start :class:`~datetime.datetime`.
     :param ends_in: The end delta as a :class:`~datetime.timedelta`.
-    :keyword relative: If set to :const:`False`, the end time will be
+    :keyword relative: If enabled the end time will be
         calculated using :func:`delta_resolution` (i.e. rounded to the
         resolution of `ends_in`).
     :keyword now: Function returning the current time and date,
@@ -151,7 +149,7 @@ def remaining(start, ends_in, now=None, relative=True):
     now = now or datetime.utcnow()
 
     end_date = start + ends_in
-    if not relative:
+    if relative:
         end_date = delta_resolution(end_date, ends_in)
     return end_date - now
 
@@ -187,6 +185,7 @@ def weekday(name):
 def humanize_seconds(secs, prefix=""):
     """Show seconds in human form, e.g. 60 is "1 minute", 7200 is "2
     hours"."""
+    secs = float(secs)
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
             w = secs / divider

+ 4 - 1
celery/worker/__init__.py

@@ -252,7 +252,6 @@ class WorkController(object):
         # and they must be stopped in reverse order.
         self.components = filter(None, (self.pool,
                                         self.mediator,
-                                        self.scheduler,
                                         self.beat,
                                         self.autoscaler,
                                         self.consumer))
@@ -341,3 +340,7 @@ class WorkController(object):
 
     def on_timer_tick(self, delay):
         self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)
+
+    @property
+    def state(self):
+        return state

+ 22 - 26
celery/worker/consumer.py

@@ -76,6 +76,7 @@ up and running.
 from __future__ import absolute_import
 from __future__ import with_statement
 
+import logging
 import socket
 import sys
 import threading
@@ -84,12 +85,12 @@ import warnings
 
 from ..app import app_or_default
 from ..datastructures import AttributeDict
-from ..exceptions import NotRegistered
+from ..exceptions import InvalidTaskError
+from ..registry import tasks
 from ..utils import noop
 from ..utils import timer2
 from ..utils.encoding import safe_repr
 from . import state
-from .job import TaskRequest, InvalidTaskError
 from .control import Panel
 from .heartbeat import Heart
 
@@ -293,6 +294,14 @@ class Consumer(object):
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
 
+        self._does_info = self.logger.isEnabledFor(logging.INFO)
+        self.strategies = {}
+
+    def update_strategies(self):
+        S = self.strategies
+        for task in tasks.itervalues():
+            S[task.name] = task.start_strategy(self.app, self)
+
     def start(self):
         """Start the consumer.
 
@@ -341,7 +350,8 @@ class Consumer(object):
         if task.revoked():
             return
 
-        self.logger.info("Got task from broker: %s", task.shortinfo())
+        if self._does_info:
+            self.logger.info("Got task from broker: %s", task.shortinfo())
 
         if self.event_dispatcher.enabled:
             self.event_dispatcher.send("task-received", uuid=task.task_id,
@@ -399,43 +409,26 @@ class Consumer(object):
         :param message: The kombu message object.
 
         """
-        # need to guard against errors occurring while acking the message.
-        def ack():
-            try:
-                message.ack()
-            except self.connection_errors + (AttributeError, ), exc:
-                self.logger.critical(
-                    "Couldn't ack %r: %s reason:%r",
-                        message.delivery_tag,
-                        self._message_report(body, message), exc)
-
         try:
-            body["task"]
+            name = body["task"]
         except (KeyError, TypeError):
             warnings.warn(RuntimeWarning(
                 "Received and deleted unknown message. Wrong destination?!? \
                 the full contents of the message body was: %s" % (
                  self._message_report(body, message), )))
-            ack()
+            message.ack_log_error(self.logger, self.connection_errors)
             return
 
         try:
-            task = TaskRequest.from_message(message, body, ack,
-                                            app=self.app,
-                                            logger=self.logger,
-                                            hostname=self.hostname,
-                                            eventer=self.event_dispatcher)
-
-        except NotRegistered, exc:
+            self.strategies[name](message, body, message.ack_log_error)
+        except KeyError, exc:
             self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
                               exc_info=sys.exc_info())
-            ack()
+            message.ack_log_error(self.logger, self.connection_errors)
         except InvalidTaskError, exc:
             self.logger.error(INVALID_TASK_ERROR, str(exc), safe_repr(body),
                               exc_info=sys.exc_info())
-            ack()
-        else:
-            self.on_task(task)
+            message.ack_log_error(self.logger, self.connection_errors)
 
     def maybe_conn_error(self, fun):
         """Applies function but ignores any connection or channel
@@ -603,6 +596,9 @@ class Consumer(object):
         # Restart heartbeat thread.
         self.restart_heartbeat()
 
+        # reload all task's execution strategies.
+        self.update_strategies()
+
         # We're back!
         self._state = RUN
 

+ 11 - 4
celery/worker/heartbeat.py

@@ -31,16 +31,23 @@ class Heart(object):
         self.interval = interval or 30
         self.tref = None
 
+        # Make event dispatcher start/stop us when it's
+        # enabled/disabled.
+        self.eventer.on_enabled.add(self.start)
+        self.eventer.on_disabled.add(self.stop)
+
     def _send(self, event):
         return self.eventer.send(event, **SOFTWARE_INFO)
 
     def start(self):
-        self._send("worker-online")
-        self.tref = self.timer.apply_interval(self.interval * 1000.0,
-                self._send, ("worker-heartbeat", ))
+        if self.eventer.enabled:
+            self._send("worker-online")
+            self.tref = self.timer.apply_interval(self.interval * 1000.0,
+                    self._send, ("worker-heartbeat", ))
 
     def stop(self):
         if self.tref is not None:
             self.timer.cancel(self.tref)
             self.tref = None
-        self._send("worker-offline")
+        if self.eventer.enabled:
+            self._send("worker-offline")

+ 172 - 244
celery/worker/job.py

@@ -12,162 +12,51 @@
 """
 from __future__ import absolute_import
 
-import os
-import sys
+import logging
 import time
 import socket
-import warnings
 
 from datetime import datetime
+from operator import itemgetter
 
-from .. import current_app
 from .. import exceptions
-from .. import platforms
-from .. import registry
+from ..registry import tasks
 from ..app import app_or_default
-from ..datastructures import ExceptionInfo
-from ..execute.trace import TaskTrace
-from ..utils import noop, kwdict, fun_takes_kwargs, truncate_text
-from ..utils.encoding import safe_repr, safe_str, default_encoding
+from ..execute.trace import build_tracer, trace_task, report_internal_error
+from ..platforms import set_mp_process_title as setps
+from ..utils import (noop, kwdict, fun_takes_kwargs,
+                     cached_property, truncate_text)
+from ..utils.encoding import safe_repr, safe_str
 from ..utils.timeutils import maybe_iso8601, timezone
-from ..utils.serialization import get_pickleable_exception
 
 from . import state
 
-#: Keys to keep from the message delivery info.  The values
-#: of these keys must be pickleable.
-WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
+# Localize
+tz_to_local = timezone.to_local
+tz_or_local = timezone.tz_or_local
+tz_utc = timezone.utc
 
 
-class InvalidTaskError(Exception):
-    """The task has invalid data or is not properly constructed."""
-    pass
-
-
-if sys.version_info >= (3, 0):
-
-    def default_encode(obj):
-        return obj
-else:
-
-    def default_encode(obj):  # noqa
-        return unicode(obj, default_encoding())
-
-
-class WorkerTaskTrace(TaskTrace):
-    """Wraps the task in a jail, catches all exceptions, and
-    saves the status and result of the task execution to the task
-    meta backend.
-
-    If the call was successful, it saves the result to the task result
-    backend, and sets the task status to `"SUCCESS"`.
-
-    If the call raises :exc:`~celery.exceptions.RetryTaskError`, it extracts
-    the original exception, uses that as the result and sets the task status
-    to `"RETRY"`.
-
-    If the call results in an exception, it saves the exception as the task
-    result, and sets the task status to `"FAILURE"`.
-
-    :param task_name: The name of the task to execute.
-    :param task_id: The unique id of the task.
-    :param args: List of positional args to pass on to the function.
-    :param kwargs: Keyword arguments mapping to pass on to the function.
-
-    :keyword loader: Custom loader to use, if not specified the current app
-      loader will be used.
-    :keyword hostname: Custom hostname to use, if not specified the system
-      hostname will be used.
-
-    :returns: the evaluated functions return value on success, or
-        the exception instance on failure.
-
-    """
-
-    #: Current loader.
-    loader = None
-
-    #: Hostname to report as.
-    hostname = None
-
-    def __init__(self, *args, **kwargs):
-        self.loader = kwargs.get("loader") or current_app.loader
-        self.hostname = kwargs.get("hostname") or socket.gethostname()
-        super(WorkerTaskTrace, self).__init__(*args, **kwargs)
-
-        self._store_errors = True
-        if self.task.ignore_result:
-            self._store_errors = self.task.store_errors_even_if_ignored
-        self.super = super(WorkerTaskTrace, self)
-
-    def execute_safe(self, *args, **kwargs):
-        """Same as :meth:`execute`, but catches errors."""
-        try:
-            return self.execute(*args, **kwargs)
-        except Exception, exc:
-            _type, _value, _tb = sys.exc_info()
-            _value = self.task.backend.prepare_exception(exc)
-            exc_info = ExceptionInfo((_type, _value, _tb))
-            warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
-                map(str, (exc.__class__, exc, exc_info.traceback))))
-            return exc_info
-
-    def execute(self):
-        """Execute, trace and store the result of the task."""
-        self.loader.on_task_init(self.task_id, self.task)
-        if self.task.track_started:
-            if not self.task.ignore_result:
-                self.task.backend.mark_as_started(self.task_id,
-                                                  pid=os.getpid(),
-                                                  hostname=self.hostname)
-        try:
-            return super(WorkerTaskTrace, self).execute()
-        finally:
-            try:
-                self.task.backend.process_cleanup()
-                self.loader.on_process_cleanup()
-            except (KeyboardInterrupt, SystemExit, MemoryError):
-                raise
-            except Exception, exc:
-                logger = current_app.log.get_default_logger()
-                logger.error("Process cleanup failed: %r", exc,
-                             exc_info=sys.exc_info())
-
-    def handle_success(self, retval, *args):
-        """Handle successful execution."""
-        if not self.task.ignore_result:
-            self.task.backend.mark_as_done(self.task_id, retval)
-        return self.super.handle_success(retval, *args)
-
-    def handle_retry(self, exc, type_, tb, strtb):
-        """Handle retry exception."""
-        message, orig_exc = exc.args
-        if self._store_errors:
-            self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
-        return self.super.handle_retry(exc, type_, tb, strtb)
-
-    def handle_failure(self, exc, type_, tb, strtb):
-        """Handle exception."""
-        if self._store_errors:
-            self.task.backend.mark_as_failure(self.task_id, exc, strtb)
-        exc = get_pickleable_exception(exc)
-        return self.super.handle_failure(exc, type_, tb, strtb)
-
-
-def execute_and_trace(task_name, *args, **kwargs):
+def execute_and_trace(name, uuid, args, kwargs, request=None, **opts):
     """This is a pickleable method used as a target when applying to pools.
 
     It's the same as::
 
-        >>> WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
+        >>> trace_task(name, *args, **kwargs)[0]
 
     """
-    hostname = kwargs.get("hostname")
-    platforms.set_mp_process_title("celeryd", task_name, hostname=hostname)
+    task = tasks[name]
     try:
-        return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
-    finally:
-        platforms.set_mp_process_title("celeryd", "-idle-", hostname)
+        hostname = opts.get("hostname")
+        setps("celeryd", name, hostname, rate_limit=True)
+        try:
+            if task.__tracer__ is None:
+                task.__tracer__ = build_tracer(name, task, **opts)
+            return task.__tracer__(uuid, args, kwargs, request)[0]
+        finally:
+            setps("celeryd", "-idle-", hostname, rate_limit=True)
+    except Exception, exc:
+        return report_internal_error(task, exc)
 
 
 class TaskRequest(object):
@@ -176,14 +65,14 @@ class TaskRequest(object):
     #: Kind of task.  Must be a name registered in the task registry.
     name = None
 
-    #: The task class (set by constructor using :attr:`task_name`).
+    #: The task class (set by constructor using :attr:`name`).
     task = None
 
     #: UUID of the task.
-    task_id = None
+    id = None
 
     #: UUID of the taskset that this task belongs to.
-    taskset_id = None
+    taskset = None
 
     #: List of positional arguments to apply to the task.
     args = None
@@ -238,80 +127,83 @@ class TaskRequest(object):
     _already_revoked = False
     _terminate_on_ack = None
 
-    def __init__(self, task_name, task_id, args, kwargs,
-            on_ack=noop, retries=0, delivery_info=None, hostname=None,
+    def __init__(self, task, id, args=[], kwargs={},
+            on_ack=noop, retries=0, delivery_info={}, hostname=None,
             logger=None, eventer=None, eta=None, expires=None, app=None,
-            taskset_id=None, chord=None, utc=False, **opts):
-        self.app = app_or_default(app)
-        self.task_name = task_name
-        self.task_id = task_id
-        self.taskset_id = taskset_id
-        self.retries = retries
+            taskset=None, chord=None, utc=False, connection_errors=None,
+            **opts):
+        try:
+            kwargs.items
+        except AttributeError:
+            raise exceptions.InvalidTaskError(
+                    "Task keyword arguments is not a mapping")
+        self.app = app or app_or_default(app)
+        self.name = task
+        self.id = id
         self.args = args
-        self.kwargs = kwargs
-        self.eta = eta
-        self.expires = expires
+        self.kwargs = kwdict(kwargs)
+        self.taskset = taskset
+        self.retries = retries
         self.chord = chord
         self.on_ack = on_ack
         self.delivery_info = {} if delivery_info is None else delivery_info
         self.hostname = hostname or socket.gethostname()
         self.logger = logger or self.app.log.get_default_logger()
         self.eventer = eventer
+        self.connection_errors = connection_errors or ()
 
-        self.task = registry.tasks[self.task_name]
-        self._store_errors = True
-        if self.task.ignore_result:
-            self._store_errors = self.task.store_errors_even_if_ignored
+        task = self.task = tasks[task]
 
         # timezone means the message is timezone-aware, and the only timezone
         # supported at this point is UTC.
-        self.tzlocal = timezone.tz_or_local(self.app.conf.CELERY_TIMEZONE)
-        tz = timezone.utc if utc else self.tzlocal
-        if self.eta is not None:
-            self.eta = timezone.to_local(self.eta, self.tzlocal, tz)
-        if self.expires is not None:
-            self.expires = timezone.to_local(self.expires, self.tzlocal, tz)
+        if eta is not None:
+            tz = tz_utc if utc else self.tzlocal
+            self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
+        if expires is not None:
+            tz = tz_utc if utc else self.tzlocal
+            self.expires = tz_to_local(maybe_iso8601(expires),
+                                       self.tzlocal, tz)
+
+        # shortcuts
+        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
+        self._does_info = self.logger.isEnabledFor(logging.INFO)
+
+        self.request_dict = {"hostname": self.hostname,
+                             "id": id, "taskset": taskset,
+                             "retries": retries, "is_eager": False,
+                             "delivery_info": delivery_info, "chord": chord}
 
     @classmethod
-    def from_message(cls, message, body, on_ack=noop, **kw):
+    def from_message(cls, message, body, on_ack=noop, delivery_info={},
+            logger=None, hostname=None, eventer=None, app=None,
+            connection_errors=None):
         """Create request from a task message.
 
         :raises UnknownTaskError: if the message does not describe a task,
             the message is also rejected.
 
         """
-        delivery_info = getattr(message, "delivery_info", {})
-        delivery_info = dict((key, delivery_info.get(key))
-                                for key in WANTED_DELIVERY_INFO)
+        try:
+            D = message.delivery_info
+            delivery_info = {"exchange": D.get("exchange"),
+                             "routing_key": D.get("routing_key")}
+        except (AttributeError, KeyError):
+            pass
 
-        kwargs = body.get("kwargs", {})
-        if not hasattr(kwargs, "items"):
-            raise InvalidTaskError("Task keyword arguments is not a mapping.")
         try:
-            task_name = body["task"]
-            task_id = body["id"]
-        except KeyError, exc:
-            raise InvalidTaskError(
-                "Task message is missing required field %r" % (exc, ))
-
-        return cls(task_name=task_name,
-                   task_id=task_id,
-                   taskset_id=body.get("taskset", None),
-                   args=body.get("args", []),
-                   kwargs=kwdict(kwargs),
-                   chord=body.get("chord"),
-                   retries=body.get("retries", 0),
-                   eta=maybe_iso8601(body.get("eta")),
-                   expires=maybe_iso8601(body.get("expires")),
-                   on_ack=on_ack,
-                   delivery_info=delivery_info,
-                   utc=body.get("utc", None),
-                   **kw)
+            return cls(on_ack=on_ack, logger=logger, eventer=eventer, app=app,
+                       delivery_info=delivery_info, hostname=hostname,
+                       connection_errors=connection_errors, **body)
+        except TypeError:
+            for f in ("task", "id"):
+                if f not in body:
+                    raise exceptions.InvalidTaskError(
+                        "Task message is missing required field %r" % (f, ))
 
     def get_instance_attrs(self, loglevel, logfile):
         return {"logfile": logfile, "loglevel": loglevel,
                 "hostname": self.hostname,
-                "id": self.task_id, "taskset": self.taskset_id,
+                "id": self.id, "taskset": self.taskset,
                 "retries": self.retries, "is_eager": False,
                 "delivery_info": self.delivery_info, "chord": self.chord}
 
@@ -327,13 +219,11 @@ class TaskRequest(object):
         in version 3.0.
 
         """
-        if not self.task.accept_magic_kwargs:
-            return self.kwargs
         kwargs = dict(self.kwargs)
         default_kwargs = {"logfile": logfile,
                           "loglevel": loglevel,
-                          "task_id": self.task_id,
-                          "task_name": self.task_name,
+                          "task_id": self.id,
+                          "task_name": self.name,
                           "task_retries": self.retries,
                           "task_is_eager": False,
                           "delivery_info": self.delivery_info}
@@ -356,13 +246,16 @@ class TaskRequest(object):
         """
         if self.revoked():
             return
+        request = self.request_dict
 
-        args = self._get_tracer_args(loglevel, logfile)
-        instance_attrs = self.get_instance_attrs(loglevel, logfile)
+        kwargs = self.kwargs
+        if self.task.accept_magic_kwargs:
+            kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+        request.update({"loglevel": loglevel, "logfile": logfile})
         result = pool.apply_async(execute_and_trace,
-                                  args=args,
+                                  args=(self.name, self.id, self.args, kwargs),
                                   kwargs={"hostname": self.hostname,
-                                          "request": instance_attrs},
+                                          "request": request},
                                   accept_callback=self.on_accepted,
                                   timeout_callback=self.on_timeout,
                                   callback=self.on_success,
@@ -372,7 +265,7 @@ class TaskRequest(object):
         return result
 
     def execute(self, loglevel=None, logfile=None):
-        """Execute the task in a :class:`WorkerTaskTrace`.
+        """Execute the task in a :func:`~celery.execute.trace.trace_task`.
 
         :keyword loglevel: The loglevel used by the task.
 
@@ -387,20 +280,19 @@ class TaskRequest(object):
             self.acknowledge()
 
         instance_attrs = self.get_instance_attrs(loglevel, logfile)
-        tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
-                                 **{"hostname": self.hostname,
-                                    "loader": self.app.loader,
-                                    "request": instance_attrs})
-        retval = tracer.execute()
+        retval, _ = trace_task(*self._get_tracer_args(loglevel, logfile, True),
+                               **{"hostname": self.hostname,
+                                  "loader": self.app.loader,
+                                  "request": instance_attrs})
         self.acknowledge()
         return retval
 
     def maybe_expire(self):
         """If expired, mark the task as revoked."""
         if self.expires and datetime.now(self.tzlocal) > self.expires:
-            state.revoked.add(self.task_id)
-            if self._store_errors:
-                self.task.backend.mark_as_revoked(self.task_id)
+            state.revoked.add(self.id)
+            if self.store_errors:
+                self.task.backend.mark_as_revoked(self.id)
 
     def terminate(self, pool, signal=None):
         if self.time_start:
@@ -414,10 +306,10 @@ class TaskRequest(object):
             return True
         if self.expires:
             self.maybe_expire()
-        if self.task_id in state.revoked:
+        if self.id in state.revoked:
             self.logger.warn("Skipping revoked task: %s[%s]",
-                             self.task_name, self.task_id)
-            self.send_event("task-revoked", uuid=self.task_id)
+                             self.name, self.id)
+            self.send_event("task-revoked", uuid=self.id)
             self.acknowledge()
             self._already_revoked = True
             return True
@@ -434,9 +326,10 @@ class TaskRequest(object):
         state.task_accepted(self)
         if not self.task.acks_late:
             self.acknowledge()
-        self.send_event("task-started", uuid=self.task_id, pid=pid)
-        self.logger.debug("Task accepted: %s[%s] pid:%r",
-                          self.task_name, self.task_id, pid)
+        self.send_event("task-started", uuid=self.id, pid=pid)
+        if self._does_debug:
+            self.logger.debug("Task accepted: %s[%s] pid:%r",
+                              self.name, self.id, pid)
         if self._terminate_on_ack is not None:
             _, pool, signal = self._terminate_on_ack
             self.terminate(pool, signal)
@@ -446,15 +339,15 @@ class TaskRequest(object):
         state.task_ready(self)
         if soft:
             self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]",
-                                timeout, self.task_name, self.task_id)
+                                timeout, self.name, self.id)
             exc = exceptions.SoftTimeLimitExceeded(timeout)
         else:
             self.logger.error("Hard time limit (%ss) exceeded for %s[%s]",
-                              timeout, self.task_name, self.task_id)
+                              timeout, self.name, self.id)
             exc = exceptions.TimeLimitExceeded(timeout)
 
-        if self._store_errors:
-            self.task.backend.mark_as_failure(self.task_id, exc)
+        if self.store_errors:
+            self.task.backend.mark_as_failure(self.id, exc)
 
     def on_success(self, ret_value):
         """Handler called if the task was successfully processed."""
@@ -464,26 +357,28 @@ class TaskRequest(object):
             self.acknowledge()
 
         runtime = self.time_start and (time.time() - self.time_start) or 0
-        self.send_event("task-succeeded", uuid=self.task_id,
+        self.send_event("task-succeeded", uuid=self.id,
                         result=safe_repr(ret_value), runtime=runtime)
 
-        self.logger.info(self.success_msg.strip(),
-                         {"id": self.task_id,
-                          "name": self.task_name,
-                          "return_value": self.repr_result(ret_value),
-                          "runtime": runtime})
+        if self._does_info:
+            self.logger.info(self.success_msg.strip(),
+                            {"id": self.id,
+                             "name": self.name,
+                             "return_value": self.repr_result(ret_value),
+                             "runtime": runtime})
 
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
-        self.send_event("task-retried", uuid=self.task_id,
+        self.send_event("task-retried", uuid=self.id,
                          exception=safe_repr(exc_info.exception.exc),
                          traceback=safe_str(exc_info.traceback))
 
-        self.logger.info(self.retry_msg.strip(),
-                         {"id": self.task_id,
-                         "name": self.task_name,
-                         "exc": safe_repr(exc_info.exception.exc)},
-                         exc_info=exc_info)
+        if self._does_info:
+            self.logger.info(self.retry_msg.strip(),
+                            {"id": self.id,
+                             "name": self.name,
+                             "exc": safe_repr(exc_info.exception.exc)},
+                            exc_info=exc_info)
 
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""
@@ -498,16 +393,16 @@ class TaskRequest(object):
         # This is a special case as the process would not have had
         # time to write the result.
         if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
-                self._store_errors:
-            self.task.backend.mark_as_failure(self.task_id, exc_info.exception)
+                self.store_errors:
+            self.task.backend.mark_as_failure(self.id, exc_info.exception)
 
-        self.send_event("task-failed", uuid=self.task_id,
+        self.send_event("task-failed", uuid=self.id,
                          exception=safe_repr(exc_info.exception),
                          traceback=safe_str(exc_info.traceback))
 
         context = {"hostname": self.hostname,
-                   "id": self.task_id,
-                   "name": self.task_name,
+                   "id": self.id,
+                   "name": self.name,
                    "exc": safe_repr(exc_info.exception),
                    "traceback": safe_str(exc_info.traceback),
                    "args": safe_repr(self.args),
@@ -515,17 +410,17 @@ class TaskRequest(object):
 
         self.logger.error(self.error_msg.strip(), context,
                           exc_info=exc_info.exc_info,
-                          extra={"data": {"id": self.task_id,
-                                          "name": self.task_name,
+                          extra={"data": {"id": self.id,
+                                          "name": self.name,
                                           "hostname": self.hostname}})
 
-        task_obj = registry.tasks.get(self.task_name, object)
+        task_obj = tasks.get(self.name, object)
         task_obj.send_error_email(context, exc_info.exception)
 
     def acknowledge(self):
         """Acknowledge task."""
         if not self.acknowledged:
-            self.on_ack()
+            self.on_ack(self.logger, self.connection_errors)
             self.acknowledged = True
 
     def repr_result(self, result, maxlen=46):
@@ -534,8 +429,8 @@ class TaskRequest(object):
         return truncate_text(safe_repr(result), maxlen)
 
     def info(self, safe=False):
-        return {"id": self.task_id,
-                "name": self.task_name,
+        return {"id": self.id,
+                "name": self.name,
                 "args": self.args if safe else safe_repr(self.args),
                 "kwargs": self.kwargs if safe else safe_repr(self.kwargs),
                 "hostname": self.hostname,
@@ -546,8 +441,7 @@ class TaskRequest(object):
 
     def shortinfo(self):
         return "%s[%s]%s%s" % (
-                    self.task_name,
-                    self.task_id,
+                    self.name, self.id,
                     " eta:[%s]" % (self.eta, ) if self.eta else "",
                     " expires:[%s]" % (self.expires, ) if self.expires else "")
     __str__ = shortinfo
@@ -555,9 +449,43 @@ class TaskRequest(object):
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
                 self.__class__.__name__,
-                self.task_name, self.task_id, self.args, self.kwargs)
+                self.name, self.id, self.args, self.kwargs)
+
+    def _get_tracer_args(self, loglevel=None, logfile=None, use_real=False):
+        """Get the task trace args for this task."""
+        kwargs = self.kwargs
+        if self.task.accept_magic_kwargs:
+            kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+        first = self.task if use_real else self.name
+        return first, self.id, self.args, kwargs
+
+    @cached_property
+    def tzlocal(self):
+        return tz_or_local(self.app.conf.CELERY_TIMEZONE)
+
+    @property
+    def store_errors(self):
+        return (not self.task.ignore_result
+                or self.task.store_errors_even_if_ignored)
+
+    def _compat_get_task_id(self):
+        return self.id
+
+    def _compat_set_task_id(self, value):
+        self.id = value
+
+    def _compat_get_task_name(self):
+        return self.name
+
+    def _compat_set_task_name(self, value):
+        self.name = value
+
+    def _compat_get_taskset_id(self):
+        return self.taskset
+
+    def _compat_set_taskset_id(self, value):
+        self.taskset = value
 
-    def _get_tracer_args(self, loglevel=None, logfile=None):
-        """Get the :class:`WorkerTaskTrace` tracer for this task."""
-        task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        return self.task_name, self.task_id, self.args, task_func_kwargs
+    task_id = property(_compat_get_task_id, _compat_set_task_id)
+    task_name = property(_compat_get_task_name, _compat_set_task_name)
+    taskset_id = property(_compat_get_taskset_id, _compat_set_taskset_id)

+ 6 - 3
celery/worker/mediator.py

@@ -18,6 +18,7 @@
 """
 from __future__ import absolute_import
 
+import logging
 import sys
 import traceback
 
@@ -40,6 +41,7 @@ class Mediator(bgThread):
         self.logger = logger or self.app.log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
+        self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
         super(Mediator, self).__init__()
 
     def body(self):
@@ -51,9 +53,10 @@ class Mediator(bgThread):
         if task.revoked():
             return
 
-        self.logger.debug(
-            "Mediator: Running callback for task: %s[%s]" % (
-                task.task_name, task.task_id))
+        if self._does_debug:
+            self.logger.debug(
+                "Mediator: Running callback for task: %s[%s]" % (
+                    task.task_name, task.task_id))
 
         try:
             self.callback(task)

+ 2 - 4
celery/worker/state.py

@@ -48,10 +48,8 @@ total_count = defaultdict(lambda: 0)
 #: the list of currently revoked tasks.  Persistent if statedb set.
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
-
-def task_reserved(request):
-    """Updates global state when a task has been reserved."""
-    reserved_requests.add(request)
+#: Updates global state when a task has been reserved.
+task_reserved = reserved_requests.add
 
 
 def task_accepted(request):

+ 19 - 0
celery/worker/strategy.py

@@ -0,0 +1,19 @@
+from __future__ import absolute_import
+
+from .job import TaskRequest
+
+
+def default(task, app, consumer):
+    logger = consumer.logger
+    hostname = consumer.hostname
+    eventer = consumer.event_dispatcher
+    Request = TaskRequest.from_message
+    handle = consumer.on_task
+    connection_errors = consumer.connection_errors
+
+    def task_message_handler(M, B, A):
+        handle(Request(M, B, A, app=app, logger=logger,
+                                hostname=hostname, eventer=eventer,
+                                connection_errors=connection_errors))
+
+    return task_message_handler

+ 1 - 1
contrib/generic-init.d/celerybeat

@@ -65,7 +65,7 @@ if [ -n "$CELERYBEAT_USER" ]; then
 fi
 if [ -n "$CELERYBEAT_GROUP" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --gid $CELERYBEAT_GROUP"
-    chown ":$CELERYBEAT_GROUP" $CELERYBEAT_LOG_DIR $CELERYBEAT_PID_DIR
+    chgrp "$CELERYBEAT_GROUP" $CELERYBEAT_LOG_DIR $CELERYBEAT_PID_DIR
 fi
 
 CELERYBEAT_CHDIR=${CELERYBEAT_CHDIR:-$CELERYD_CHDIR}

+ 1 - 1
contrib/generic-init.d/celeryd

@@ -65,7 +65,7 @@ if [ -n "$CELERYD_USER" ]; then
 fi
 if [ -n "$CELERYD_GROUP" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --gid=$CELERYD_GROUP"
-    chown ":$CELERYD_GROUP" $CELERYD_LOG_DIR $CELERYD_PID_DIR
+    chgrp "$CELERYD_GROUP" $CELERYD_LOG_DIR $CELERYD_PID_DIR
 fi
 
 if [ -n "$CELERYD_CHDIR" ]; then

+ 1 - 1
contrib/generic-init.d/celeryevcam

@@ -137,7 +137,7 @@ if [ -n "$CELERYEV_USER" ]; then
 fi
 if [ -n "$CELERYEV_GROUP" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --gid $CELERYEV_GROUP"
-    chown "$CELERYEV_GROUP" $CELERYBEAT_LOG_DIR $CELERYEV_PID_DIR
+    chgrp "$CELERYEV_GROUP" $CELERYBEAT_LOG_DIR $CELERYEV_PID_DIR
 fi
 
 CELERYEV_CHDIR=${CELERYEV_CHDIR:-$CELERYD_CHDIR}

+ 1 - 1
contrib/release/doc4allmods

@@ -7,7 +7,7 @@ SKIP_FILES="celery.backends.pyredis.rst
             celery.bin.celeryd_detach.rst
             celery.concurrency.processes._win.rst
             celery.contrib.rst
-            celery.contrib.batches.rst
+            celery.contrib.bundles.rst
             celery.worker.control.rst
             celery.worker.control.builtins.rst
             celery.worker.control.registry.rst

+ 1 - 1
docs/configuration.rst

@@ -610,7 +610,7 @@ CELERY_DEFAULT_DELIVERY_MODE
 Can be `transient` or `persistent`.  The default is to send
 persistent messages.
 
-.. _conf-broker-connection:
+.. _conf-broker-settings:
 
 Broker Settings
 ---------------

+ 21 - 4
docs/contributing.rst

@@ -553,15 +553,15 @@ is following the conventions.
 
 * Lines should not exceed 78 columns.
 
-  You can enforce this in :program:`vim` by setting the ``textwidth`` option::
+  You can enforce this in :program:`vim` by setting the ``textwidth`` option:
 
   .. code-block:: vim
 
         set textwidth=78
 
-    If adhering to this limit makes the code less readable, you have one more
-    character to go on, which means 78 is a soft limit, and 79 is the hard
-    limit :)
+  If adhering to this limit makes the code less readable, you have one more
+  character to go on, which means 78 is a soft limit, and 79 is the hard
+  limit :)
 
 * Import order
 
@@ -682,3 +682,20 @@ following:
     for series 2.4.
 
 * Also add the previous version under the "versions" tab.
+
+
+Updating bundles
+----------------
+
+First you need to make sure the bundle entrypoints have been installed,
+but either running `develop`, or `install`::
+
+    $ python setup.py develop
+
+Then make sure that you have your PyPI credentials stored in
+:file:`~/.pypirc`, and execute the command::
+
+    $ python setup.py upload_bundles
+
+If you broke something and need to update new versions of the bundles,
+then you can use ``upload_bundles_fix``.

+ 57 - 0
docs/getting-started/brokers/beanstalk.rst

@@ -0,0 +1,57 @@
+.. _broker-beanstalk:
+
+=================
+ Using Beanstalk
+=================
+
+.. _broker-beanstalk-installation:
+
+Installation
+============
+
+For the Beanstalk support you have to install additional dependencies.
+You can install both Celery and these dependencies in one go using
+either the `celery-with-beanstalk`_, or the `django-celery-with-beanstalk`
+bundles::
+
+    $ pip install -U celery-with-beanstalk
+
+.. _`celery-with-beanstalk`:
+    http://pypi.python.org/pypi/celery-with-beanstalk
+.. _`django-celery-with-beanstalk`:
+    http://pypi.python.org/pypi/django-celery-with-beanstalk
+
+.. _broker-beanstalk-configuration:
+
+Configuration
+=============
+
+Configuration is easy, set the transport, and configure the location of
+your CouchDB database::
+
+    BROKER_URL = "beanstalk://localhost:11300"
+
+Where the URL is in the format of::
+
+    beanstalk://hostname:port
+
+The host name will default to ``localhost`` and the port to 11300,
+and so they are optional.
+
+.. _beanstalk-results-configuration:
+
+Results
+-------
+
+Using Beanstalk to store task state and results is currently **not supported**.
+
+.. _broker-beanstalk-limitations:
+
+Limitations
+===========
+
+The Beanstalk message transport does not currently support:
+
+    * Remote control commands (celeryctl, broadcast)
+    * Authentication
+

+ 55 - 0
docs/getting-started/brokers/couchdb.rst

@@ -0,0 +1,55 @@
+.. _broker-couchdb:
+
+===============
+ Using CouchDB
+===============
+
+.. _broker-couchdb-installation:
+
+Installation
+============
+
+For the CouchDB support you have to install additional dependencies.
+You can install both Celery and these dependencies in one go using
+either the `celery-with-couchdb`_, or the `django-celery-with-couchdb` bundles::
+
+    $ pip install -U celery-with-couchdb
+
+.. _`celery-with-couchdb`:
+    http://pypi.python.org/pypi/celery-with-couchdb
+.. _`django-celery-with-couchdb`:
+    http://pypi.python.org/pypi/django-celery-with-couchdb
+
+.. _broker-couchdb-configuration:
+
+Configuration
+=============
+
+Configuration is easy, set the transport, and configure the location of
+your CouchDB database::
+
+    BROKER_URL = "couchdb://localhost:5984/database_name"
+
+Where the URL is in the format of::
+
+    couchdb://userid:password@hostname:port/database_name
+
+The host name will default to ``localhost`` and the port to 5984,
+and so they are optional.  userid and password are also optional,
+but needed if your CouchDB server requires authentication.
+
+.. _couchdb-results-configuration:
+
+Results
+-------
+
+Storing task state and results in CouchDB is currently **not supported**.
+
+.. _broker-couchdb-limitations:
+
+Limitations
+===========
+
+The Beanstalk message transport does not currently support:
+
+    * Remote control commands (celeryctl, broadcast)

+ 58 - 0
docs/getting-started/brokers/django.rst

@@ -0,0 +1,58 @@
+.. _broker-django:
+
+===========================
+ Using the Django Database
+===========================
+
+.. _broker-django-installation:
+
+Installation
+============
+
+For the Django database transport support you have to install the
+`django-kombu` library::
+
+    $ pip install -U django-kombu
+
+.. _broker-django-configuration:
+
+Configuration
+=============
+
+The database transport uses the Django `DATABASE_*` settings for database
+configuration values.
+
+#. Set your broker transport::
+
+    BROKER_URL = "django://"
+
+#. Add :mod:`djkombu` to `INSTALLED_APPS`::
+
+    INSTALLED_APPS = ("djkombu", )
+
+#. Verify your database settings::
+
+    DATABASE_ENGINE = "mysql"
+    DATABASE_NAME = "mydb"
+    DATABASE_USER = "myuser"
+    DATABASE_PASSWORD = "secret"
+
+  The above is just an example, if you haven't configured your database before
+  you should read the Django database settings reference:
+  http://docs.djangoproject.com/en/1.1/ref/settings/#database-engine
+
+#. Sync your database schema::
+
+    $ python manage.py syncdb
+
+.. _broker-django-limitations:
+
+Limitations
+===========
+
+The Django database transport does not currently support:
+
+    * Remote control commands (celeryev, broadcast)
+    * Events, including the Django Admin monitor.
+    * Using more than a few workers (can lead to messages being executed
+      multiple times).

+ 21 - 0
docs/getting-started/brokers/index.rst

@@ -0,0 +1,21 @@
+.. _brokers:
+
+=====================
+ Brokers
+=====================
+
+:Release: |version|
+:Date: |today|
+
+Celery supports several message transport alternatives.
+
+.. toctree::
+    :maxdepth: 1
+
+    rabbitmq
+    redis
+    sqlalchemy
+    django
+    mongodb
+    couchdb
+    beanstalk

+ 56 - 0
docs/getting-started/brokers/mongodb.rst

@@ -0,0 +1,56 @@
+.. _broker-mongodb:
+
+===============
+ Using MongoDB
+===============
+
+.. _broker-mongodb-installation:
+
+Installation
+============
+
+For the MongoDB support you have to install additional dependencies.
+You can install both Celery and these dependencies in one go using
+either the `celery-with-mongodb`_, or the `django-celery-with-mongodb` bundles::
+
+    $ pip install -U celery-with-mongodb
+
+.. _`celery-with-mongodb`:
+    http://pypi.python.org/pypi/celery-with-mongodb
+.. _`django-celery-with-mongodb`:
+    http://pypi.python.org/pypi/django-celery-with-mongodb
+
+.. _broker-mongodb-configuration:
+
+Configuration
+=============
+
+Configuration is easy, set the transport, and configure the location of
+your MongoDB database::
+
+    BROKER_URL = "mongodb://localhost:27017/database_name"
+
+Where the URL is in the format of::
+
+    mongodb://userid:password@hostname:port/database_name
+
+The host name will default to ``localhost`` and the port to 27017,
+and so they are optional.  userid and password are also optional,
+but needed if your MongoDB server requires authentication.
+
+.. _mongodb-results-configuration:
+
+Results
+-------
+
+If you also want to store the state and return values of tasks in MongoDB,
+you should see :ref:`conf-mongodb-result-backend`.
+
+.. _broker-mongodb-limitations:
+
+Limitations
+===========
+
+The mongodb message transport currently does not support:
+
+    * Remote control commands (celeryctl, broadcast)

+ 23 - 11
docs/getting-started/broker-installation.rst → docs/getting-started/brokers/rabbitmq.rst

@@ -1,16 +1,29 @@
-.. _broker-installation:
+.. _broker-rabbitmq:
 
-=====================
- Broker Installation
-=====================
+================
+ Using RabbitMQ
+================
 
 .. contents::
     :local:
 
+Installation & Configuration
+============================
+
+RabbitMQ is the default broker so it does not require any additional
+dependencies or initial configuration, other than the URL location of
+the broker instance you want to use::
+
+    >>> BROKER_URL = "amqp://guest:guest@localhost:5672//"
+
+For a description of broker URLs and a full list of the
+various broker configuration options available to Celery,
+see :ref:`conf-broker-settings`.
+
 .. _installing-rabbitmq:
 
-Installing RabbitMQ
-===================
+Installing the RabbitMQ Server
+==============================
 
 See `Installing RabbitMQ`_ over at RabbitMQ's website. For Mac OS X
 see `Installing RabbitMQ on OS X`_.
@@ -28,7 +41,7 @@ see `Installing RabbitMQ on OS X`_.
 .. _rabbitmq-configuration:
 
 Setting up RabbitMQ
-===================
+-------------------
 
 To use celery we need to create a RabbitMQ user, a virtual host and
 allow that user access to that virtual host::
@@ -48,7 +61,7 @@ See the RabbitMQ `Admin Guide`_ for more information about `access control`_.
 .. _rabbitmq-osx-installation:
 
 Installing RabbitMQ on OS X
-===========================
+---------------------------
 
 The easiest way to install RabbitMQ on Snow Leopard is using `Homebrew`_; the new
 and shiny package management system for OS X.
@@ -89,7 +102,7 @@ Finally, we can install rabbitmq using :program:`brew`::
 .. _rabbitmq-osx-system-hostname:
 
 Configuring the system host name
---------------------------------
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 If you're using a DHCP server that is giving you a random host name, you need
 to permanently configure the host name. This is because RabbitMQ uses the host name
@@ -126,7 +139,7 @@ then RabbitMQ will try to use `rabbit@23`, which is an illegal host name.
 .. _rabbitmq-osx-start-stop:
 
 Starting/Stopping the RabbitMQ server
--------------------------------------
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 To start the server::
 
@@ -143,4 +156,3 @@ Never use :program:`kill` to stop the RabbitMQ server, but rather use the
     $ sudo rabbitmqctl stop
 
 When the server is running, you can continue reading `Setting up RabbitMQ`_.
-

+ 52 - 0
docs/getting-started/brokers/redis.rst

@@ -0,0 +1,52 @@
+.. _broker-redis:
+
+=============
+ Using Redis
+=============
+
+.. _broker-redis-installation:
+
+Installation
+============
+
+For the Redis support you have to install additional dependencies.
+You can install both Celery and these dependencies in one go using
+ehter the `celery-with-redis`_, or the `django-celery-with-redis` bundles::
+
+    $ pip install -U celery-with-redis
+
+.. _`celery-with-redis`:
+    http://pypi.python.org/pypi/celery-with-redis
+.. _`django-celery-with-redis`:
+    http://pypi.python.org/pypi/django-celery-with-redis
+
+.. _broker-redis-configuration:
+
+Configuration
+=============
+
+Configuration is easy, set the transport, and configure the location of
+your Redis database::
+
+    BROKER_URL = "redis://localhost:6379/0"
+
+
+Where the URL is in the format of::
+
+    redis://userid:password@hostname:port/db_number
+
+.. _redis-results-configuration:
+
+Results
+-------
+
+If you also want to store the state and return values of tasks in Redis,
+you should configure these settings::
+
+    CELERY_RESULT_BACKEND = "redis"
+    CELERY_REDIS_HOST = "localhost"
+    CELERY_REDIS_PORT = 6379
+    CELERY_REDIS_DB = 0
+
+For a complete list of options supported by the Redis result backend see
+:ref:`conf-redis-result-backend`

+ 74 - 0
docs/getting-started/brokers/sqlalchemy.rst

@@ -0,0 +1,74 @@
+.. _broker-sqlalchemy:
+
+==================
+ Using SQLAlchemy
+==================
+
+.. _broker-sqlalchemy-installation:
+
+Installation
+============
+
+For the SQLAlchemy transport you have to install the
+`kombu-sqlalchemy` library::
+
+    $ pip install -U kombu-sqlalchemy
+
+.. _broker-sqlalchemy-configuration:
+
+Configuration
+=============
+
+This transport uses only the :setting:`BROKER_HOST` setting, which have to be
+an SQLAlchemy database URI.
+
+#. Set your broker transport::
+
+    BROKER_TRANSPORT = "sqlalchemy"
+
+#. Configure the database URI::
+
+    BROKER_HOST = "sqlite:///celerydb.sqlite"
+
+Please see `SQLAlchemy: Supported Databases`_ for a table of supported databases.
+Some other `SQLAlchemy Connection String`_, examples:
+
+.. code-block:: python
+
+    # sqlite (filename)
+    BROKER_HOST = "sqlite:///celerydb.sqlite"
+
+    # mysql
+    BROKER_HOST = "mysql://scott:tiger@localhost/foo"
+
+    # postgresql
+    BROKER_HOST = "postgresql://scott:tiger@localhost/mydatabase"
+
+    # oracle
+    BROKER_HOST = "oracle://scott:tiger@127.0.0.1:1521/sidname"
+
+.. _`SQLAlchemy: Supported Databases`:
+    http://www.sqlalchemy.org/docs/core/engines.html#supported-databases
+
+.. _`SQLAlchemy Connection String`:
+    http://www.sqlalchemy.org/docs/core/engines.html#database-urls
+
+.. _sqlalchemy-results-configuration:
+
+Results
+-------
+
+To store results in the database as well, you should configure the result
+backend.  See :ref:`conf-database-result-backend`.
+
+.. _broker-sqlalchemy-limitations:
+
+Limitations
+===========
+
+The SQLAlchemy database transport does not currently support:
+
+    * Remote control commands (celeryev, broadcast)
+    * Events, including the Django Admin monitor.
+    * Using more than a few workers (can lead to messages being executed
+      multiple times).

+ 11 - 14
docs/getting-started/first-steps-with-celery.rst

@@ -18,35 +18,32 @@ messages.
 
 There are several choices available, including:
 
-* `RabbitMQ`_
+* :ref:`broker-rabbitmq`
 
-Feature-complete, safe and durable. If not losing tasks
+`RabbitMQ`_ is feature-complete, safe and durable. If not losing tasks
 is important to you, then this is your best option.
 
-See :ref:`broker-installation` for more about installing and configuring
-RabbitMQ.
+* :ref:`broker-redis`
 
-* `Redis`_
-
-Also feature-complete, but power failures or abrubt termination
+`Redis`_ is also feature-complete, but power failures or abrupt termination
 may result in data loss.
 
-See :ref:`otherqueues-redis` for configuration details.
-
-* Databases
+* :ref:`broker-sqlalchemy`
+* :ref:`broker-django`
 
 Using a database as a message queue is not recommended, but can be sufficient
-for very small installations.  Celery can use the SQLAlchemy and Django ORMS.
-See :ref:`otherqueues-sqlalchemy` or :ref:`otherqueues-django`.
+for very small installations.  Celery can use the SQLAlchemy and Django ORM.
 
 * and more.
 
 In addition to the above, there are several other transport implementations
-to choose from, including CouchDB, Beanstalk, MongoDB, and SQS.  See the Kombu
-documentation for more information.
+to choose from, including :ref:`broker-couchdb`, :ref:`broker-beanstalk`,
+:ref:`broker-mongodb`, and SQS.  There is a `Transport Comparison`_
+in the Kombu documentation.
 
 .. _`RabbitMQ`: http://www.rabbitmq.com/
 .. _`Redis`: http://redis.io/
+.. _`Transport Comparison`: http://kombu.rtfd.org/transport-comparison
 
 .. _celerytut-simple-tasks:
 

+ 1 - 1
docs/getting-started/index.rst

@@ -9,6 +9,6 @@
     :maxdepth: 2
 
     introduction
-    broker-installation
+    brokers/index
     first-steps-with-celery
     resources

+ 36 - 3
docs/includes/introduction.txt

@@ -106,7 +106,6 @@ Features
     +-----------------+----------------------------------------------------+
     | Fault-tolerant  | Excellent configurable error recovery when using   |
     |                 | `RabbitMQ`, ensures your tasks are never lost.     |
-    |                 | scenarios, and your tasks will never be lost.      |
     +-----------------+----------------------------------------------------+
     | Distributed     | Runs on one or more machines. Supports             |
     |                 | broker `clustering`_ and `HA`_ when used in        |
@@ -210,11 +209,45 @@ or from source.
 
 To install using `pip`,::
 
-    $ pip install Celery
+    $ pip install -U Celery
 
 To install using `easy_install`,::
 
-    $ easy_install Celery
+    $ easy_install -U Celery
+
+Bundles
+-------
+
+Celery also defines a group of bundles that can be used
+to install Celery and the dependencies for a given feature.
+
+The following bundles are available:
+
+:`celery-with-redis`_:
+    for using Redis as a broker.
+
+:`celery-with-mongodb`_:
+    for using MongoDB as a broker.
+
+:`django-celery-with-redis`_:
+    for Django, and using Redis as a broker.
+
+:`django-celery-with-mongodb`_:
+    for Django, and using MongoDB as a broker.
+
+:`bundle-celery`_:
+    convenience bundle installing *Celery* and related packages.
+
+.. _`celery-with-redis`:
+    http://pypi.python.org/pypi/celery-with-redis/
+.. _`celery-with-mongodb`:
+    http://pypi.python.org/pypi/celery-with-mongdb/
+.. _`django-celery-with-redis`:
+    http://pypi.python.org/pypi/django-celery-with-redis/
+.. _`django-celery-with-mongodb`:
+    http://pypi.python.org/pypi/django-celery-with-mongdb/
+.. _`bundle-celery`:
+    http://pypi.python.org/pypi/bundle-celery/
 
 .. _celery-installing-from-source:
 

+ 3 - 128
docs/tutorials/otherqueues.rst

@@ -4,148 +4,23 @@
  Using Celery with Redis/Database as the messaging queue.
 ==========================================================
 
-.. contents::
-    :local:
-
 .. _otherqueues-redis:
 
 Redis
 =====
 
-For the Redis support you have to install the Python redis client::
-
-    $ pip install -U redis
-
-.. _otherqueues-redis-conf:
-
-Configuration
--------------
-
-Configuration is easy, set the transport, and configure the location of
-your Redis database::
-
-    BROKER_URL = "redis://localhost:6379/0"
-
-
-Where the URL is in the format of::
-
-    redis://userid:password@hostname:port/db_number
-
-
-Results
-~~~~~~~
-
-You probably also want to store results in Redis::
-
-    CELERY_RESULT_BACKEND = "redis"
-    CELERY_REDIS_HOST = "localhost"
-    CELERY_REDIS_PORT = 6379
-    CELERY_REDIS_DB = 0
-
-For a complete list of options supported by the Redis result backend see
-:ref:`conf-redis-result-backend`
-
-If you don't intend to consume results you should disable them::
-
-    CELERY_IGNORE_RESULT = True
+This section has been moved to :ref:`broker-redis`.
 
 .. _otherqueues-sqlalchemy:
 
 SQLAlchemy
 ==========
 
-.. _otherqueues-sqlalchemy-conf:
-
-For the SQLAlchemy transport you have to install the
-`kombu-sqlalchemy` library::
-
-    $ pip install -U kombu-sqlalchemy
-
-Configuration
--------------
-
-This transport uses only the :setting:`BROKER_HOST` setting, which have to be
-an SQLAlchemy database URI.
-
-#. Set your broker transport::
-
-    BROKER_TRANSPORT = "sqlalchemy"
-
-#. Configure the database URI::
-
-    BROKER_HOST = "sqlite:///celerydb.sqlite"
-
-Please see `SQLAlchemy: Supported Databases`_ for a table of supported databases.
-Some other `SQLAlchemy Connection String`_, examples:
-
-.. code-block:: python
-
-    # sqlite (filename)
-    BROKER_HOST = "sqlite:///celerydb.sqlite"
-
-    # mysql
-    BROKER_HOST = "mysql://scott:tiger@localhost/foo"
-
-    # postgresql
-    BROKER_HOST = "postgresql://scott:tiger@localhost/mydatabase"
-
-    # oracle
-    BROKER_HOST = "oracle://scott:tiger@127.0.0.1:1521/sidname"
-
-.. _`SQLAlchemy: Supported Databases`:
-    http://www.sqlalchemy.org/docs/core/engines.html#supported-databases
-
-.. _`SQLAlchemy Connection String`:
-    http://www.sqlalchemy.org/docs/core/engines.html#database-urls
-
-Results
-~~~~~~~
-
-To store results in the database as well, you should configure the result
-backend.  See :ref:`conf-database-result-backend`.
-
-If you don't intend to consume results you should disable them::
-
-    CELERY_IGNORE_RESULT = True
+This section has been moved to :ref:`broker-sqlalchemy`.
 
 .. _otherqueues-django:
 
 Django Database
 ===============
 
-.. _otherqueues-django-conf:
-
-For the Django database transport support you have to install the
-`django-kombu` library::
-
-    $ pip install -U django-kombu
-
-Configuration
--------------
-
-The database backend uses the Django `DATABASE_*` settings for database
-configuration values.
-
-#. Set your broker transport::
-
-    BROKER_TRANSPORT = "django"
-
-#. Add :mod:`djkombu` to `INSTALLED_APPS`::
-
-    INSTALLED_APPS = ("djkombu", )
-
-
-#. Verify you database settings::
-
-    DATABASE_ENGINE = "mysql"
-    DATABASE_NAME = "mydb"
-    DATABASE_USER = "myuser"
-    DATABASE_PASSWORD = "secret"
-
-  The above is just an example, if you haven't configured your database before
-  you should read the Django database settings reference:
-  http://docs.djangoproject.com/en/1.1/ref/settings/#database-engine
-
-#. Sync your database schema.
-
-    $ python manage.py syncdb
+This section has been moved to :ref:`broker-django`.

+ 12 - 16
docs/userguide/tasks.rst

@@ -74,7 +74,7 @@ attributes:
                 task.  Used by e.g. :meth:`~celery.task.base.BaseTask.retry`
                 to resend the task to the same destination queue.
 
-  **NOTE** As some messaging backends doesn't have advanced routing
+  **NOTE** As some messaging backends don't have advanced routing
   capabilities, you can't trust the availability of keys in this mapping.
 
 
@@ -483,12 +483,11 @@ in the :state:`FAILED` state, is implied to have been in the
 :state:`STARTED` state at some point).
 
 There are also sets of states, like the set of
-:state:`failure states <FAILURE_STATES>`, and the set of
-:state:`ready states <READY_STATES>`.
+:state:`FAILURE_STATES`, and the set of :state:`READY_STATES`.
 
 The client uses the membership of these sets to decide whether
 the exception should be re-raised (:state:`PROPAGATE_STATES`), or whether
-the result can be cached (it can if the task is ready).
+the state can be cached (it can if the task is ready).
 
 You can also define :ref:`custom-states`.
 
@@ -533,13 +532,10 @@ backend:
   may have to increase the Erlang process limit, and the maximum number of file
   descriptors your OS allows.
 
-* Old results will not be cleaned automatically, so you must make sure to
-  consume the results or else the number of queues will eventually go out of
-  control.  If you're running RabbitMQ 2.1.1 or higher you can take advantage
-  of the ``x-expires`` argument to queues, which will expire queues after a
-  certain time limit after they are unused.  The queue expiry can be set (in
-  seconds) by the :setting:`CELERY_TASK_RESULT_EXPIRES` setting (not
-  enabled by default).
+* Old results will be cleaned automatically, based on the
+  :setting:`CELERY_TASK_RESULT_EXPIRES` setting.  By default this is set to
+  expire after 1 day: if you have a very busy cluster you should lower
+  this value.
 
 For a list of options supported by the AMQP result backend, please see
 :ref:`conf-amqp-result-backend`.
@@ -556,7 +552,7 @@ limitations.
   increase the polling intervals of operations such as `result.wait()`, and
   `tasksetresult.join()`
 
-* Some databases uses a default transaction isolation level that
+* Some databases use a default transaction isolation level that
   is not suitable for polling tables for changes.
 
   In MySQL the default transaction isolation level is `REPEATABLE-READ`, which
@@ -576,7 +572,7 @@ PENDING
 ~~~~~~~
 
 Task is waiting for execution or unknown.
-Any task id that is not know is implied to be in the pending state.
+Any task id that is not known is implied to be in the pending state.
 
 .. state:: STARTED
 
@@ -644,7 +640,7 @@ you could have a look at :mod:`abortable tasks <~celery.contrib.abortable>`
 which defines its own custom :state:`ABORTED` state.
 
 Use :meth:`Task.update_state <celery.task.base.BaseTask.update_state>` to
-update a tasks state::
+update a task's state::
 
     @task
     def upload_files(filenames):
@@ -666,7 +662,7 @@ Creating pickleable exceptions
 A little known Python fact is that exceptions must behave a certain
 way to support being pickled.
 
-Tasks that raises exceptions that are not pickleable will not work
+Tasks that raise exceptions that are not pickleable will not work
 properly when Pickle is used as the serializer.
 
 To make sure that your exceptions are pickleable the exception
@@ -722,7 +718,7 @@ Creating custom task classes
 ============================
 
 All tasks inherit from the :class:`celery.task.Task` class.
-The tasks body is its :meth:`run` method.
+The task's body is its :meth:`run` method.
 
 The following code,
 

+ 63 - 29
funtests/benchmarks/bench_worker.py

@@ -2,70 +2,104 @@ import os
 import sys
 import time
 
+os.environ["NOSETPS"] = "yes"
+
+import anyjson
+JSONIMP = os.environ.get("JSONIMP")
+if JSONIMP:
+    anyjson.force_implementation(JSONIMP)
+
+print("anyjson implementation: %r" % (anyjson.implementation.name, ))
+
 from celery import Celery
 
+DEFAULT_ITS = 20000
+
 celery = Celery(__name__)
-celery.conf.update(BROKER_TRANSPORT="amqp",
+celery.conf.update(BROKER_TRANSPORT="librabbitmq",
                    BROKER_POOL_LIMIT=10,
+                   CELERYD_POOL="solo",
                    CELERY_PREFETCH_MULTIPLIER=0,
                    CELERY_DISABLE_RATE_LIMITS=True,
-                   CELERY_BACKEND=None)
-
-
-@celery.task()
-def shutdown_worker():
-    print("SHUTTING DOWN")
-    raise SystemExit()
+                   CELERY_DEFAULT_DELIVERY_MODE=1,
+                   CELERY_QUEUES = {
+                       "bench.worker": {
+                           "exchange": "bench.worker",
+                           "routing_key": "bench.worker",
+                           "no_ack": True,
+                           "exchange_durable": False,
+                           "queue_durable": False,
+                        }
+                   },
+                   CELERY_TASK_SERIALIZER="json",
+                   CELERY_DEFAULT_QUEUE="bench.worker",
+                   CELERY_BACKEND=None,
+                   )#CELERY_MESSAGE_COMPRESSION="zlib")
+
+
+def tdiff(then):
+    return time.time() - then
 
 
 @celery.task(cur=0, time_start=None, queue="bench.worker")
 def it(_, n):
     i = it.cur  # use internal counter, as ordering can be skewed
                 # by previous runs, or the broker.
-    if i and not i % 1000:
-        print >> sys.stderr, i
+    if i and not i % 5000:
+        print >> sys.stderr, "(%s so far: %ss)" % (i, tdiff(it.subt))
+        it.subt = time.time()
     if not i:
-        it.time_start = time.time()
+        it.subt = it.time_start = time.time()
     elif i == n - 1:
-        print("consume: %s" % (time.time() - it.time_start, ))
-        shutdown_worker.delay()
+        total = tdiff(it.time_start)
+        print >> sys.stderr, "(%s so far: %ss)" % (i, tdiff(it.subt))
+        print("-- process %s tasks: %ss total, %s tasks/s} " % (
+                n, total, n / (total + .0)))
+        sys.exit()
     it.cur += 1
 
 
-def bench_apply(n=50000):
+def bench_apply(n=DEFAULT_ITS):
     time_start = time.time()
     celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
-    print("apply: %s" % (time.time() - time_start, ))
+    print("-- apply %s tasks: %ss" % (n, time.time() - time_start, ))
 
 
-def bench_consume(n=50000):
-    from celery.worker import WorkController
-    from celery.worker import state
-
-    import logging
-    #celery.log.setup_logging_subsystem(loglevel=logging.DEBUG)
-    worker = celery.WorkController(pool_cls="solo",
+def bench_work(n=DEFAULT_ITS, loglevel=None):
+    loglevel = os.environ.get("BENCH_LOGLEVEL") or loglevel
+    if loglevel:
+        celery.log.setup_logging_subsystem(loglevel=loglevel)
+    worker = celery.WorkController(concurrency=15,
                                    queues=["bench.worker"])
 
     try:
         print("STARTING WORKER")
         worker.start()
     except SystemExit:
-        assert sum(state.total_count.values()) == n + 1
+        assert sum(worker.state.total_count.values()) == n + 1
 
 
-def bench_both(n=50000):
+def bench_both(n=DEFAULT_ITS):
     bench_apply(n)
-    bench_consumer(n)
+    bench_work(n)
 
 
 def main(argv=sys.argv):
+    n = DEFAULT_ITS
     if len(argv) < 2:
-        print("Usage: %s [apply|consume|both]" % (os.path.basename(argv[0]), ))
+        print("Usage: %s [apply|work|both] [n=20k]" % (
+                os.path.basename(argv[0]), ))
         return sys.exit(1)
-    return {"apply": bench_apply,
-            "consume": bench_consume,
-            "both": bench_both}[argv[1]]()
+    try:
+        try:
+            n = int(argv[2])
+        except IndexError:
+            pass
+        return {"apply": bench_apply,
+                "work": bench_work,
+                "both": bench_both}[argv[1]](n=n)
+    except KeyboardInterrupt:
+        pass
 
 
 if __name__ == "__main__":

+ 0 - 1
requirements/docs.txt

@@ -1,4 +1,3 @@
 Sphinx
 sphinxcontrib-issuetracker>=0.9
-Sphinx-PyPI-upload
 SQLAlchemy

+ 2 - 0
requirements/pkgutils.txt

@@ -1,3 +1,5 @@
 paver
 flake8
 tox
+Sphinx-PyPI-upload
+bundle>=1.1.0

+ 64 - 45
setup.py

@@ -5,16 +5,6 @@ import sys
 import codecs
 import platform
 
-extra = {}
-tests_require = ["nose", "nose-cover3", "sqlalchemy", "mock"]
-is_py3k  = sys.version_info >= (3, 0)
-if is_py3k:
-    extra.update(use_2to3=True)
-elif sys.version_info < (2, 7):
-    tests_require.append("unittest2")
-elif sys.version_info <= (2, 5):
-    tests_require.append("simplejson")
-
 if sys.version_info < (2, 5):
     raise Exception("Celery requires Python 2.5 or higher.")
 
@@ -28,7 +18,48 @@ except ImportError:
     from setuptools import setup, find_packages           # noqa
     from setuptools.command.test import test              # noqa
 
-# -- Parse meta
+NAME = "celery"
+entrypoints = {}
+extra = {}
+
+# -*- Classifiers -*-
+
+classes = """
+    Development Status :: 5 - Production/Stable
+    License :: OSI Approved :: BSD License
+    Topic :: System :: Distributed Computing
+    Topic :: Software Development :: Object Brokering
+    Intended Audience :: Developers
+    Intended Audience :: Information Technology
+    Intended Audience :: Science/Research
+    Intended Audience :: Financial and Insurance Industry
+    Intended Audience :: Healthcare Industry
+    Environment :: No Input/Output (Daemon)
+    Environment :: Console
+    Programming Language :: Python
+    Programming Language :: Python :: 2
+    Programming Language :: Python :: 2.5
+    Programming Language :: Python :: 2.6
+    Programming Language :: Python :: 2.7
+    Programming Language :: Python :: 3
+    Programming Language :: Python :: 3.2
+    Programming Language :: Python :: Implementation :: CPython
+    Programming Language :: Python :: Implementation :: PyPy
+    Programming Language :: Python :: Implementation :: Jython
+    Operating System :: OS Independent
+    Operating System :: POSIX
+    Operating System :: Microsoft :: Windows
+    Operating System :: MacOS :: MacOS X
+"""
+classifiers = [s.strip() for s in classes.split('\n') if s]
+
+# -*- Python 3 -*-
+is_py3k  = sys.version_info >= (3, 0)
+if is_py3k:
+    extra.update(use_2to3=True)
+
+# -*- Distribution Meta -*-
+
 import re
 re_meta = re.compile(r'__(\w+?)__\s*=\s*(.*)')
 re_vers = re.compile(r'VERSION\s*=\s*\((.*?)\)')
@@ -64,8 +95,8 @@ try:
                 meta.update(handler(m))
 finally:
     meta_fh.close()
-# --
 
+# -*- Custom Commands -*-
 
 class quicktest(test):
     extra_env = dict(SKIP_RLIMITS=1, QUICKTEST=1)
@@ -75,6 +106,8 @@ class quicktest(test):
             os.environ[env_name] = str(env_value)
         test.run(self, *args, **kwargs)
 
+# -*- Installation Dependencies -*-
+
 install_requires = []
 try:
     import importlib  # noqa
@@ -101,13 +134,24 @@ if is_jython:
     install_requires.append("threadpool")
     install_requires.append("simplejson")
 
+# -*- Tests Requires -*-
+
+tests_require = ["nose", "nose-cover3", "sqlalchemy", "mock"]
+if sys.version_info < (2, 7):
+    tests_require.append("unittest2")
+elif sys.version_info <= (2, 5):
+    tests_require.append("simplejson")
+
+# -*- Long Description -*-
+
 if os.path.exists("README.rst"):
     long_description = codecs.open("README.rst", "r", "utf-8").read()
 else:
     long_description = "See http://pypi.python.org/pypi/celery"
 
+# -*- Entry Points -*- #
 
-console_scripts = [
+console_scripts = entrypoints["console_scripts"] = [
         'celerybeat = celery.bin.celerybeat:main',
         'camqadm = celery.bin.camqadm:main',
         'celeryev = celery.bin.celeryev:main',
@@ -119,6 +163,10 @@ if platform.system() == "Windows":
 else:
     console_scripts.append('celeryd = celery.bin.celeryd:main')
 
+# bundles: Only relevant for Celery developers.
+entrypoints["bundle.bundles"] = ["celery = celery.contrib.bundles:bundles"]
+
+# -*- %%% -*-
 
 setup(
     name="celery",
@@ -133,38 +181,9 @@ setup(
     zip_safe=False,
     install_requires=install_requires,
     tests_require=tests_require,
-    cmdclass={"test": test,
-              "quicktest": quicktest},
     test_suite="nose.collector",
-    classifiers=[
-        "Development Status :: 5 - Production/Stable",
-        "License :: OSI Approved :: BSD License",
-        "Topic :: System :: Distributed Computing",
-        "Topic :: Software Development :: Object Brokering",
-        "Intended Audience :: Developers",
-        "Intended Audience :: Information Technology",
-        "Intended Audience :: Science/Research",
-        "Intended Audience :: Financial and Insurance Industry",
-        "Intended Audience :: Healthcare Industry",
-        "Environment :: No Input/Output (Daemon)",
-        "Environment :: Console",
-        "Programming Language :: Python",
-        "Programming Language :: Python :: 2",
-        "Programming Language :: Python :: 2.5",
-        "Programming Language :: Python :: 2.6",
-        "Programming Language :: Python :: 2.7",
-        "Programming Language :: Python :: 3",
-        "Programming Language :: Python :: 3.2",
-        "Programming Language :: Python :: Implementation :: CPython",
-        "Programming Language :: Python :: Implementation :: PyPy",
-        "Programming Language :: Python :: Implementation :: Jython",
-        "Operating System :: OS Independent",
-        "Operating System :: POSIX",
-        "Operating System :: Microsoft :: Windows",
-        "Operating System :: MacOS :: MacOS X",
-    ],
-    entry_points={
-        'console_scripts': console_scripts,
-    },
+    cmdclass={"quicktest": quicktest},
+    classifiers=classifiers,
+    entry_points=entrypoints,
     long_description=long_description,
     **extra)