Browse Source

Documentation improvements

Ask Solem 14 years ago
parent
commit
823f45cf0a

+ 4 - 4
INSTALL

@@ -1,16 +1,16 @@
-Installing celery
+Installing Celery
 =================
 
-You can install `celery` either via the Python Package Index (PyPI)
+You can install Celery either via the Python Package Index (PyPI)
 or from source.
 
 To install using `pip`::
 
-    $ pip install celery
+    $ pip install Celery
 
 To install using `easy_install`::
 
-    $ easy_install celery
+    $ easy_install Celery
 
 If you have downloaded a source tarball you can install it
 by doing the following::

+ 47 - 54
celery/app/__init__.py

@@ -1,3 +1,13 @@
+"""
+celery.app
+==========
+
+Celery Application.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
 import os
 
 from inspect import getargspec
@@ -6,44 +16,24 @@ from celery import registry
 from celery.app import base
 from celery.utils.functional import wraps
 
+# Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
+# sets this, so it will always contain the last instantiated app,
+# and is the default app returned by :func:`app_or_default`.
 _current_app = None
 
 
 class App(base.BaseApp):
     """Celery Application.
 
-    Inherits from :class:`celery.app.base.BaseApp`.
-
     :keyword loader: The loader class, or the name of the loader class to use.
-        Default is :class:`celery.loaders.app.AppLoader`.
+                     Default is :class:`celery.loaders.app.AppLoader`.
     :keyword backend: The result store backend class, or the name of the
-        backend class to use. Default is the value of the
-        :setting:`CELERY_RESULT_BACKEND` setting.
-
-    .. attribute:: amqp
-
-        Sending/receiving messages.
-        See :class:`celery.app.amqp.AMQP`.
-
-    .. attribute:: backend
-
-        Storing/retreiving task state.
-        See :class:`celery.backend.base.BaseBackend`.
+                      backend class to use. Default is the value of the
+                      :setting:`CELERY_RESULT_BACKEND` setting.
 
-    .. attribute:: conf
+    .. seealso::
 
-        Current configuration. Supports both the dict interface and
-        attribute access.
-
-    .. attribute:: control
-
-        Controlling worker nodes.
-        See :class:`celery.task.control.Control`.
-
-    .. attribute:: log
-
-        Logging.
-        See :class:`celery.log.Logging`.
+        The app base class; :class:`~celery.app.base.BaseApp`.
 
     """
 
@@ -59,53 +49,55 @@ class App(base.BaseApp):
         return create_task_cls(app=self)
 
     def Worker(self, **kwargs):
-        """Create new :class:`celery.apps.worker.Worker` instance."""
+        """Create new :class:`~celery.apps.worker.Worker` instance."""
         from celery.apps.worker import Worker
         return Worker(app=self, **kwargs)
 
     def Beat(self, **kwargs):
-        """Create new :class:`celery.apps.beat.Beat` instance."""
+        """Create new :class:`~celery.apps.beat.Beat` instance."""
         from celery.apps.beat import Beat
         return Beat(app=self, **kwargs)
 
     def TaskSet(self, *args, **kwargs):
-        """Create new :class:`celery.task.sets.TaskSet`."""
+        """Create new :class:`~celery.task.sets.TaskSet`."""
         from celery.task.sets import TaskSet
         kwargs["app"] = self
         return TaskSet(*args, **kwargs)
 
     def worker_main(self, argv=None):
+        """Run :program:`celeryd` using `argv`.  Uses :data:`sys.argv`
+        if `argv` is not specified."""
         from celery.bin.celeryd import WorkerCommand
         return WorkerCommand(app=self).execute_from_commandline(argv)
 
     def task(self, *args, **options):
         """Decorator to create a task class out of any callable.
 
-        Examples:
+        .. admonition:: Examples
 
-        .. code-block:: python
+            .. code-block:: python
 
-            @task()
-            def refresh_feed(url):
-                return Feed.objects.get(url=url).refresh()
+                @task()
+                def refresh_feed(url):
+                    return Feed.objects.get(url=url).refresh()
 
-        With setting extra options and using retry.
+            With setting extra options and using retry.
 
-        .. code-block:: python
+            .. code-block:: python
 
-            @task(exchange="feeds")
-            def refresh_feed(url, **kwargs):
-                try:
-                    return Feed.objects.get(url=url).refresh()
-                except socket.error, exc:
-                    refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
+                @task(exchange="feeds")
+                def refresh_feed(url, **kwargs):
+                    try:
+                        return Feed.objects.get(url=url).refresh()
+                    except socket.error, exc:
+                        refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
 
-        Calling the resulting task:
+            Calling the resulting task:
 
-            >>> refresh_feed("http://example.com/rss") # Regular
-            <Feed: http://example.com/rss>
-            >>> refresh_feed.delay("http://example.com/rss") # Async
-            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
+                >>> refresh_feed("http://example.com/rss") # Regular
+                <Feed: http://example.com/rss>
+                >>> refresh_feed.delay("http://example.com/rss") # Async
+                <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
 
         """
 
@@ -135,8 +127,10 @@ class App(base.BaseApp):
             return inner_create_task_cls()(*args)
         return inner_create_task_cls(**options)
 
-# The "default" loader is the default loader used by old applications.
+#: The "default" loader is the default loader used by old applications.
 default_loader = os.environ.get("CELERY_LOADER") or "default"
+
+#: Global fallback app instance.
 default_app = App(loader=default_loader, set_as_current=False)
 
 if os.environ.get("CELERY_TRACE_APP"):
@@ -160,10 +154,9 @@ else:
     def app_or_default(app=None):
         """Returns the app provided or the default app if none.
 
-        If the environment variable :envvar:`CELERY_TRACE_APP` is set,
-        any time there is no active app and exception is raised. This
-        is used to trace app leaks (when someone forgets to pass
-        along the app instance).
+        The environment variable :envvar:`CELERY_TRACE_APP` is used to
+        trace app leaks.  When enabled an exception is raised if there
+        is no active app.
 
         """
         global _current_app

+ 9 - 0
celery/app/amqp.py

@@ -1,4 +1,13 @@
+"""
+celery.app.amqp
+===============
+
+AMQ related functionality.
 
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
 from datetime import datetime, timedelta
 from UserDict import UserDict
 

+ 59 - 90
celery/app/base.py

@@ -1,78 +1,25 @@
+"""
+celery.app.base
+===============
+
+Application Base Class.
+
+:copyright: (c) 2009 - 2010 by Ask Solem.
+:license: BSD, see LICENSE for more details.
+
+"""
 import sys
 import platform as _platform
 
 from datetime import timedelta
-from itertools import chain
 
 from celery import routes
 from celery.app.defaults import DEFAULTS
-from celery.datastructures import AttributeDictMixin
+from celery.datastructures import MultiDictView
 from celery.utils import noop, isatty
 from celery.utils.functional import wraps
 
 
-class MultiDictView(AttributeDictMixin):
-    """View for one more more dicts.
-
-    * When getting a key, the dicts are searched in order.
-    * When setting a key, the key is added to the first dict.
-
-    >>> d1 = {"x": 3"}
-    >>> d2 = {"x": 1, "y": 2, "z": 3}
-    >>> x = MultiDictView([d1, d2])
-
-    >>> x["x"]
-    3
-
-    >>>  x["y"]
-    2
-
-    """
-    dicts = None
-
-    def __init__(self, *dicts):
-        self.__dict__["dicts"] = dicts
-
-    def __getitem__(self, key):
-        for d in self.__dict__["dicts"]:
-            try:
-                return d[key]
-            except KeyError:
-                pass
-        raise KeyError(key)
-
-    def __setitem__(self, key, value):
-        self.__dict__["dicts"][0][key] = value
-
-    def get(self, key, default=None):
-        try:
-            return self[key]
-        except KeyError:
-            return default
-
-    def setdefault(self, key, default):
-        try:
-            return self[key]
-        except KeyError:
-            self[key] = default
-            return default
-
-    def update(self, *args, **kwargs):
-        return self.__dict__["dicts"][0].update(*args, **kwargs)
-
-    def __contains__(self, key):
-        for d in self.__dict__["dicts"]:
-            if key in d:
-                return True
-        return False
-
-    def __repr__(self):
-        return repr(dict(iter(self)))
-
-    def __iter__(self):
-        return chain(*[d.iteritems() for d in self.__dict__["dicts"]])
-
-
 class BaseApp(object):
     """Base class for apps."""
     SYSTEM = _platform.system()
@@ -133,23 +80,6 @@ class BaseApp(object):
         for key, value in config.items():
             self.conf[key] = value
 
-    def either(self, default_key, *values):
-        """Fallback to the value of a configuration key if none of the
-        `*values` are true."""
-        for value in values:
-            if value is not None:
-                return value
-        return self.conf.get(default_key)
-
-    def merge(self, a, b):
-        """Like `dict(a, **b)` except it will keep values from `a`
-        if the value in `b` is :const:`None`."""
-        b = dict(b)
-        for key, value in a.items():
-            if b.get(key) is None:
-                b[key] = value
-        return b
-
     def send_task(self, name, args=None, kwargs=None, countdown=None,
             eta=None, task_id=None, publisher=None, connection=None,
             connect_timeout=None, result_cls=None, expires=None,
@@ -185,6 +115,17 @@ class BaseApp(object):
         return self.with_default_connection(_do_publish)(
                 connection=connection, connect_timeout=connect_timeout)
 
+    def AsyncResult(self, task_id, backend=None):
+        """Create :class:`celery.result.BaseAsyncResult` instance."""
+        from celery.result import BaseAsyncResult
+        return BaseAsyncResult(task_id, app=self,
+                               backend=backend or self.backend)
+
+    def TaskSetResult(self, taskset_id, results, **kwargs):
+        """Create :class:`celery.result.TaskSetResult` instance."""
+        from celery.result import TaskSetResult
+        return TaskSetResult(taskset_id, results, app=self)
+
     def broker_connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, backend_cls=None):
@@ -285,16 +226,22 @@ class BaseApp(object):
                                 user=self.conf.EMAIL_USER,
                                 password=self.conf.EMAIL_PASSWORD)
 
-    def AsyncResult(self, task_id, backend=None):
-        """Create :class:`celery.result.BaseAsyncResult` instance."""
-        from celery.result import BaseAsyncResult
-        return BaseAsyncResult(task_id, app=self,
-                               backend=backend or self.backend)
+    def either(self, default_key, *values):
+        """Fallback to the value of a configuration key if none of the
+        `*values` are true."""
+        for value in values:
+            if value is not None:
+                return value
+        return self.conf.get(default_key)
 
-    def TaskSetResult(self, taskset_id, results, **kwargs):
-        """Create :class:`celery.result.TaskSetResult` instance."""
-        from celery.result import TaskSetResult
-        return TaskSetResult(taskset_id, results, app=self)
+    def merge(self, a, b):
+        """Like `dict(a, **b)` except it will keep values from `a`
+        if the value in `b` is :const:`None`."""
+        b = dict(b)
+        for key, value in a.items():
+            if b.get(key) is None:
+                b[key] = value
+        return b
 
     def _get_backend(self):
         from celery.backends import get_backend_cls
@@ -308,6 +255,11 @@ class BaseApp(object):
 
     @property
     def amqp(self):
+        """Sending/receiving messages.
+
+        See :class:`~celery.app.amqp.AMQP`.
+
+        """
         if self._amqp is None:
             from celery.app.amqp import AMQP
             self._amqp = AMQP(self)
@@ -315,12 +267,18 @@ class BaseApp(object):
 
     @property
     def backend(self):
+        """Storing/retreiving task state.
+
+        See :class:`~celery.backend.base.BaseBackend`.
+
+        """
         if self._backend is None:
             self._backend = self._get_backend()
         return self._backend
 
     @property
     def loader(self):
+        """Current loader."""
         if self._loader is None:
             from celery.loaders import get_loader_cls
             self._loader = get_loader_cls(self.loader_cls)(app=self)
@@ -328,12 +286,18 @@ class BaseApp(object):
 
     @property
     def conf(self):
+        """Current configuration (dict and attribute access)."""
         if self._conf is None:
             self._conf = self._get_config()
         return self._conf
 
     @property
     def control(self):
+        """Controlling worker nodes.
+
+        See :class:`~celery.task.control.Control`.
+
+        """
         if self._control is None:
             from celery.task.control import Control
             self._control = Control(app=self)
@@ -341,6 +305,11 @@ class BaseApp(object):
 
     @property
     def log(self):
+        """Logging utilities.
+
+        See :class:`~celery.log.Logging`.
+
+        """
         if self._log is None:
             from celery.log import Logging
             self._log = Logging(app=self)

+ 96 - 28
celery/datastructures.py

@@ -3,6 +3,7 @@ from __future__ import generators
 import time
 import traceback
 
+from itertools import chain
 from UserList import UserList
 from Queue import Queue, Empty as QueueEmpty
 
@@ -28,6 +29,7 @@ class AttributeDict(dict, AttributeDictMixin):
 
 
 class DictAttribute(object):
+    """Dict interface using attributes."""
 
     def __init__(self, obj):
         self.obj = obj
@@ -61,20 +63,80 @@ class DictAttribute(object):
         return vars(self.obj).iteritems()
 
 
+class MultiDictView(AttributeDictMixin):
+    """View for one more more dicts.
+
+    * When getting a key, the dicts are searched in order.
+    * When setting a key, the key is added to the first dict.
+
+    >>> d1 = {"x": 3"}
+    >>> d2 = {"x": 1, "y": 2, "z": 3}
+    >>> x = MultiDictView([d1, d2])
+
+    >>> x["x"]
+    3
+
+    >>>  x["y"]
+    2
+
+    """
+    dicts = None
+
+    def __init__(self, *dicts):
+        self.__dict__["dicts"] = dicts
+
+    def __getitem__(self, key):
+        for d in self.__dict__["dicts"]:
+            try:
+                return d[key]
+            except KeyError:
+                pass
+        raise KeyError(key)
+
+    def __setitem__(self, key, value):
+        self.__dict__["dicts"][0][key] = value
+
+    def get(self, key, default=None):
+        try:
+            return self[key]
+        except KeyError:
+            return default
+
+    def setdefault(self, key, default):
+        try:
+            return self[key]
+        except KeyError:
+            self[key] = default
+            return default
+
+    def update(self, *args, **kwargs):
+        return self.__dict__["dicts"][0].update(*args, **kwargs)
+
+    def __contains__(self, key):
+        for d in self.__dict__["dicts"]:
+            if key in d:
+                return True
+        return False
+
+    def __repr__(self):
+        return repr(dict(iter(self)))
+
+    def __iter__(self):
+        return chain(*[d.iteritems() for d in self.__dict__["dicts"]])
+
+
 class PositionQueue(UserList):
     """A positional queue of a specific length, with slots that are either
     filled or unfilled. When all of the positions are filled, the queue
     is considered :meth:`full`.
 
-    :param length: see :attr:`length`.
-
-
-    .. attribute:: length
-
-        The number of items required for the queue to be considered full.
+    :param length: Number of items to fill.
 
     """
 
+    #: The number of items required for the queue to be considered full.
+    length = None
+
     class UnfilledPosition(object):
         """Describes an unfilled slot."""
 
@@ -97,7 +159,7 @@ class PositionQueue(UserList):
 
     @property
     def filled(self):
-        """Returns the filled slots as a list."""
+        """All filled slots as a list."""
         return [slot for slot in self.data
                     if not isinstance(slot, self.UnfilledPosition)]
 
@@ -108,15 +170,13 @@ class ExceptionInfo(object):
     :param exc_info: The exception tuple info as returned by
         :func:`traceback.format_exception`.
 
-    .. attribute:: exception
-
-        The original exception.
-
-    .. attribute:: traceback
+    """
 
-        A traceback from the point when :attr:`exception` was raised.
+    #: The original exception.
+    exception = None
 
-    """
+    #: A traceback form the point when :attr:`exception` was raised.
+    traceback = None
 
     def __init__(self, exc_info):
         type_, exception, tb = exc_info
@@ -163,7 +223,7 @@ class SharedCounter(object):
     that you should not update the value by using a previous value, the only
     reliable operations are increment and decrement.
 
-    Example
+    Example::
 
         >>> max_clients = SharedCounter(initial_value=10)
 
@@ -177,7 +237,6 @@ class SharedCounter(object):
         >>> if client >= int(max_clients): # Max clients now at 8
         ...    wait()
 
-
         >>> max_client = max_clients + 10 # NOT OK (unsafe)
 
     """
@@ -226,7 +285,7 @@ class LimitedSet(object):
     consume too much resources.
 
     :keyword maxlen: Maximum number of members before we start
-        deleting expired members.
+                     deleting expired members.
     :keyword expires: Time in seconds, before a membership expires.
 
     """
@@ -316,22 +375,23 @@ class TokenBucket(object):
     Most of this code was stolen from an entry in the ASPN Python Cookbook:
     http://code.activestate.com/recipes/511490/
 
-    :param fill_rate: see :attr:`fill_rate`.
-    :keyword capacity: see :attr:`capacity`.
+    .. admonition:: Thread safety
 
-    .. attribute:: fill_rate
+        This implementation is not thread safe.
 
-        The rate in tokens/second that the bucket will be refilled.
+    :param fill_rate: Refill rate in tokens/second.
+    :keyword capacity: Max number of tokens.  Default is 1.
 
-    .. attribute:: capacity
-
-        Maximum number of tokens in the bucket. Default is `1`.
+    """
 
-    .. attribute:: timestamp
+    #: The rate in tokens/second that the bucket will be refilled
+    fill_rate = None
 
-        Timestamp of the last time a token was taken out of the bucket.
+    #: Maximum number of tokensin the bucket.
+    capacity = 1
 
-    """
+    #: Timestamp of the last time a token was taken out of the bucket.
+    timestamp = None
 
     def __init__(self, fill_rate, capacity=1):
         self.capacity = float(capacity)
@@ -340,6 +400,8 @@ class TokenBucket(object):
         self.timestamp = time.time()
 
     def can_consume(self, tokens=1):
+        """Returns :const:`True` if `tokens` number of tokens can be consumed
+        from the bucket."""
         if tokens <= self._get_tokens():
             self._tokens -= tokens
             return True
@@ -347,7 +409,13 @@ class TokenBucket(object):
 
     def expected_time(self, tokens=1):
         """Returns the expected time in seconds when a new token should be
-        available. *Note: consumes a token from the bucket*"""
+        available.
+
+        .. admonition:: Warning
+
+            This consumes a token from the bucket.
+
+        """
         _tokens = self._get_tokens()
         tokens = max(tokens, _tokens)
         return (tokens - _tokens) / self.fill_rate

+ 1 - 1
celery/log.py

@@ -35,7 +35,7 @@ class ColorFormatter(logging.Formatter):
     def formatException(self, ei):
         r = logging.Formatter.formatException(self, ei)
         if type(r) in [types.StringType]:
-            r = r.decode('utf-8', 'replace') # Convert to unicode
+            r = r.decode("utf-8", "replace") # Convert to unicode
         return r
 
     def format(self, record):

+ 1 - 1
celery/messaging.py

@@ -31,7 +31,7 @@ def get_consumer_set(connection, queues=None, **options):
     """Get the :class:`carrot.messaging.ConsumerSet` for a queue
     configuration.
 
-    Defaults to the queues in :const:`CELERY_QUEUES`.
+    Defaults to the queues in :setting:`CELERY_QUEUES`.
 
     """
     # FIXME: Deprecate!

+ 1 - 1
celery/registry.py

@@ -1,12 +1,12 @@
 """celery.registry"""
 import inspect
+
 from UserDict import UserDict
 
 from celery.exceptions import NotRegistered
 
 
 class TaskRegistry(UserDict):
-    """Site registry for tasks."""
 
     NotRegistered = NotRegistered
 

+ 43 - 60
celery/result.py

@@ -18,18 +18,17 @@ class BaseAsyncResult(object):
     :param task_id: see :attr:`task_id`.
     :param backend: see :attr:`backend`.
 
-    .. attribute:: task_id
-
-        The unique identifier for this task.
-
-    .. attribute:: backend
-
-        The task result backend used.
-
     """
 
+    #: Error raised for timeouts.
     TimeoutError = TimeoutError
 
+    #: The task uuid.
+    task_id = None
+
+    #: The task result backend to use.
+    backend = None
+
     def __init__(self, task_id, backend, app=None):
         self.task_id = task_id
         self.backend = backend
@@ -42,24 +41,25 @@ class BaseAsyncResult(object):
     def revoke(self, connection=None, connect_timeout=None):
         """Send revoke signal to all workers.
 
-        The workers will ignore the task if received.
+        Any worker receiving the task, or having reserved the
+        task, *must* ignore it.
 
         """
         self.app.control.revoke(self.task_id, connection=connection,
                                 connect_timeout=connect_timeout)
 
     def wait(self, timeout=None):
-        """Wait for task, and return the result when it arrives.
+        """Wait for task, and return the result.
 
         :keyword timeout: How long to wait, in seconds, before the
-            operation times out.
+                          operation times out.
 
         :raises celery.exceptions.TimeoutError: if `timeout` is not
             :const:`None` and the result does not arrive within `timeout`
             seconds.
 
-        If the remote call raised an exception then that
-        exception will be re-raised.
+        If the remote call raised an exception then that exception will
+        be re-raised.
 
         """
         return self.backend.wait_for(self.task_id, timeout=timeout)
@@ -69,8 +69,7 @@ class BaseAsyncResult(object):
         return self.wait(timeout=timeout)
 
     def ready(self):
-        """Returns :const:`True` if the task executed successfully, or raised
-        an exception.
+        """Returns :const:`True` if the task has been executed.
 
         If the task is still running, pending, or is waiting
         for retry then :const:`False` is returned.
@@ -108,19 +107,12 @@ class BaseAsyncResult(object):
     @property
     def result(self):
         """When the task has been executed, this contains the return value.
-
-        If the task raised an exception, this will be the exception instance.
-
-        """
+        If the task raised an exception, this will be the exception instance."""
         return self.backend.get_result(self.task_id)
 
     @property
     def info(self):
-        """Get state metadata.
-
-        Alias to :meth:`result`.
-
-        """
+        """Get state metadata.  Alias to :meth:`result`."""
         return self.result
 
     @property
@@ -135,9 +127,9 @@ class BaseAsyncResult(object):
 
     @property
     def state(self):
-        """The current status of the task.
+        """The tasks current state.
 
-        Can be one of the following:
+        Possible values includes:
 
             *PENDING*
 
@@ -169,18 +161,15 @@ class BaseAsyncResult(object):
 class AsyncResult(BaseAsyncResult):
     """Pending task result using the default backend.
 
-    :param task_id: see :attr:`task_id`.
+    :param task_id: The tasks uuid.
 
+    """
 
-    .. attribute:: task_id
-
-        The unique identifier for this task.
-
-    .. attribute:: backend
-
-        Instance of :class:`celery.backends.DefaultBackend`.
+    #: The tasks uuid.
+    uuid = None
 
-    """
+    #: Task result store backend to use.
+    backend = None
 
     def __init__(self, task_id, backend=None, app=None):
         app = app_or_default(app)
@@ -189,24 +178,22 @@ class AsyncResult(BaseAsyncResult):
 
 
 class TaskSetResult(object):
-    """Working with :class:`~celery.task.TaskSet` results.
+    """Working with :class:`~celery.task.sets.TaskSet` results.
 
     An instance of this class is returned by
-    `TaskSet`'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
-    inspection of the subtasks status and return values as a single entity.
-
-    :option taskset_id: see :attr:`taskset_id`.
-    :option subtasks: see :attr:`subtasks`.
+    `TaskSet`'s :meth:`~celery.task.TaskSet.apply_async()`.  It enables
+    inspection of the subtasks state and return values as a single entity.
 
-    .. attribute:: taskset_id
+    :param taskset_id: The id of the taskset.
+    :param subtasks: List of result instances.
 
-        The UUID of the taskset itself.
-
-    .. attribute:: subtasks
+    """
 
-        A list of :class:`AsyncResult` instances for all of the subtasks.
+    #: The UUID of the taskset.
+    taskset_id = None
 
-    """
+    #: A list of :class:`AsyncResult` instances for all of the subtasks.
+    subtasks = None
 
     def __init__(self, taskset_id, subtasks, app=None):
         self.taskset_id = taskset_id
@@ -278,6 +265,7 @@ class TaskSetResult(object):
             subtask.forget()
 
     def revoke(self, connection=None, connect_timeout=None):
+        """Revoke all subtasks."""
 
         def _do_revoke(connection=None, connect_timeout=None):
             for subtask in self.subtasks:
@@ -291,6 +279,7 @@ class TaskSetResult(object):
         return self.iterate()
 
     def __getitem__(self, index):
+        """`res[i] -> res.subtasks[i]`"""
         return self.subtasks[index]
 
     def iterate(self):
@@ -319,25 +308,19 @@ class TaskSetResult(object):
         """Gather the results of all tasks in the taskset,
         and returns a list ordered by the order of the set.
 
-        :keyword timeout: The number of seconds to wait for results
-            before the operation times out.
+        :keyword timeout: The number of seconds to wait for results before
+                          the operation times out.
 
         :keyword propagate: If any of the subtasks raises an exception, the
-            exception will be reraised.
+                            exception will be reraised.
 
         :raises celery.exceptions.TimeoutError: if `timeout` is not
             :const:`None` and the operation takes longer than `timeout`
             seconds.
 
-        :returns: list of return values for all subtasks in order.
-
         """
 
         time_start = time.time()
-
-        def on_timeout():
-            raise TimeoutError("The operation timed out.")
-
         results = PositionQueue(length=self.total)
 
         while True:
@@ -354,12 +337,12 @@ class TaskSetResult(object):
             else:
                 if (timeout is not None and
                         time.time() >= time_start + timeout):
-                    on_timeout()
+                    raise TimeoutError("join operation timed out.")
 
     def save(self, backend=None):
         """Save taskset result for later retrieval using :meth:`restore`.
 
-        Example:
+        Example::
 
             >>> result.save()
             >>> result = TaskSetResult.restore(task_id)
@@ -378,12 +361,12 @@ class TaskSetResult(object):
 
     @property
     def total(self):
-        """The total number of tasks in the :class:`~celery.task.TaskSet`."""
+        """Total number of subtasks in the set."""
         return len(self.subtasks)
 
 
 class EagerResult(BaseAsyncResult):
-    """Result that we know has already been executed.  """
+    """Result that we know has already been executed."""
     TimeoutError = TimeoutError
 
     def __init__(self, task_id, ret_value, status, traceback=None):

+ 4 - 4
celery/routes.py

@@ -5,13 +5,13 @@ _first_route = firstmethod("route_for_task")
 
 
 def merge(a, b):
-    """Like `dict(a, **b)` except it will keep values from `a`,
-    if the value in `b` is :const:`None`."""
+    """Like `dict(a, **b)` except it will keep values from `a`, if the value
+    in `b` is :const:`None`."""
     return dict(a, **dict((k, v) for k, v in b.iteritems() if v is not None))
 
 
 class MapRoute(object):
-    """Makes a router out of a :class:`dict`."""
+    """Creates a router out of a :class:`dict`."""
 
     def __init__(self, map):
         self.map = map
@@ -75,7 +75,7 @@ class Router(object):
 
 
 def prepare(routes):
-    """Expand ROUTES setting."""
+    """Expands the :setting:`CELERY_ROUTES` setting."""
 
     def expand_route(route):
         if isinstance(route, dict):

+ 15 - 17
celery/serialization.py

@@ -32,6 +32,7 @@ try:
 except NameError:
     _error_bases = (SystemExit, KeyboardInterrupt)
 
+#: List of base classes we probably don't want to reduce to.
 unwanted_base_classes = (StandardError, Exception) + _error_bases + (object, )
 
 
@@ -47,15 +48,15 @@ else:
 
 def find_nearest_pickleable_exception(exc):
     """With an exception instance, iterate over its super classes (by mro)
-    and find the first super exception that is pickleable. It does
+    and find the first super exception that is pickleable.  It does
     not go below :exc:`Exception` (i.e. it skips :exc:`Exception`,
-    :class:`BaseException` and :class:`object`). If that happens
+    :class:`BaseException` and :class:`object`).  If that happens
     you should use :exc:`UnpickleableException` instead.
 
     :param exc: An exception instance.
 
     :returns: the nearest exception if it's not :exc:`Exception` or below,
-        if it is it returns :const:`None`.
+              if it is it returns :const:`None`.
 
     :rtype :exc:`Exception`:
 
@@ -98,24 +99,12 @@ class UnpickleableExceptionWrapper(Exception):
     """Wraps unpickleable exceptions.
 
     :param exc_module: see :attr:`exc_module`.
-
     :param exc_cls_name: see :attr:`exc_cls_name`.
-
     :param exc_args: see :attr:`exc_args`
 
-    .. attribute:: exc_module
-
-        The module of the original exception.
-
-    .. attribute:: exc_cls_name
-
-        The name of the original exception class.
+    **Example**
 
-    .. attribute:: exc_args
-
-        The arguments for the original exception.
-
-    Example
+    .. code-block:: python
 
         >>> try:
         ...     something_raising_unpickleable_exc()
@@ -127,6 +116,15 @@ class UnpickleableExceptionWrapper(Exception):
 
     """
 
+    #: The module of the original exception.
+    exc_module = None
+
+    #: The name of the original exception class.
+    exc_cls_name = None
+
+    #: The arguments for the original exception.
+    exc_args = None
+
     def __init__(self, exc_module, exc_cls_name, exc_args):
         self.exc_module = exc_module
         self.exc_cls_name = exc_cls_name

+ 122 - 114
celery/task/base.py

@@ -117,56 +117,56 @@ class BaseTask(object):
 
     MaxRetriesExceededError = MaxRetriesExceededError
 
-    #: the application instance associated with this task class.
+    #: The application instance associated with this task class.
     app = None
 
-    #: name of the task.
+    #: Name of the task.
     name = None
 
-    #: if :const:`True` the task is an abstract base class.
+    #: If :const:`True` the task is an abstract base class.
     abstract = True
 
-    #: if disabled the worker will not forward magic keyword arguments.
+    #: If disabled the worker will not forward magic keyword arguments.
     accept_magic_kwargs = True
 
-    #: current request context when task is being executed.
+    #: Current request context (when task is executed).
     request = Context()
 
-    #: select a destination queue for this task.  The queue needs to exist
+    #: Destination queue.  The queue needs to exist
     #: in :setting:`CELERY_QUEUES`.  The `routing_key`, `exchange` and
     #: `exchange_type` attributes will be ignored if this is set.
     queue = None
 
-    #: override the apps default `routing_key` for this task.
+    #: Overrides the apps default `routing_key` for this task.
     routing_key = None
 
-    #: override the apps default `exchange` for this task.
+    #: Overrides the apps default `exchange` for this task.
     exchange = None
 
-    #: override the apps default exchange type for this task.
+    #: Overrides the apps default exchange type for this task.
     exchange_type = None
 
-    #: override the apps default delivery mode for this task. Default is
+    #: Override the apps default delivery mode for this task.  Default is
     #: `"persistent"`, but you can change this to `"transient"`, which means
     #: messages will be lost if the broker is restarted.  Consult your broker
     #: manual for any additional delivery modes.
     delivery_mode = None
 
-    #: mandatory message routing.
+    #: Mandatory message routing.
     mandatory = False
 
-    #: request immediate delivery.
+    #: Request immediate delivery.
     immediate = False
 
-    #: default message priority.  A number between 0 to 9, where 0 is the
+    #: Default message priority.  A number between 0 to 9, where 0 is the
     #: highest.  Note that RabbitMQ does not support priorities.
     priority = None
 
-    #: maximum number of retries before giving up.  If set to :const:`None`,
+    #: Maximum number of retries before giving up.  If set to :const:`None`,
     #: it will **never** stop retrying.
     max_retries = 3
 
-    #: default time in seconds before a retry of the task should be
+    #: Default time in seconds before a retry of the task should be
     #: executed.  3 minutes by default.
     default_retry_delay = 3 * 60
 
@@ -175,12 +175,12 @@ class BaseTask(object):
     #: a minute),`"100/h"` (hundred tasks an hour)
     rate_limit = None
 
-    #: if enabled the worker will not store task state and return values
+    #: If enabled the worker will not store task state and return values
     #: for this task.  Defaults to the :setting:`CELERY_IGNORE_RESULT`
     #: setting.
     ignore_result = False
 
-    #: when enabled errors will be stored even if the task is otherwise
+    #: When enabled errors will be stored even if the task is otherwise
     #: configured to ignore results.
     store_errors_even_if_ignored = False
 
@@ -190,21 +190,21 @@ class BaseTask(object):
 
     disable_error_emails = False                            # FIXME
 
-    #: list of exception types to send error e-mails for.
+    #: List of exception types to send error e-mails for.
     error_whitelist = ()
 
-    #: the name of a serializer that has been registered with
+    #: The name of a serializer that has been registered with
     #: :mod:`carrot.serialization.registry`.  Example: `"json"`.
     serializer = "pickle"
 
-    #: the result store backend used for this task.
+    #: The result store backend used for this task.
     backend = None
 
-    #: if disabled the task will not be automatically registered
+    #: If disabled the task will not be automatically registered
     #: in the task registry.
     autoregister = True
 
-    #: if enabled the task will report its status as "started" when the task
+    #: If enabled the task will report its status as "started" when the task
     #: is executed by a worker.  Disabled by default as the normal behaviour
     #: is to not report that level of granularity.  Tasks are either pending,
     #: finished, or waiting to be retried.
@@ -217,7 +217,7 @@ class BaseTask(object):
     #: :setting:`CELERY_TRACK_STARTED` setting.
     track_started = False
 
-    #: when enabled  messages for this task will be acknowledged **after**
+    #: When enabled  messages for this task will be acknowledged **after**
     #: the task has been executed, and not *just before* which is the
     #: default behavior.
     #:
@@ -229,10 +229,10 @@ class BaseTask(object):
     #: :setting:`CELERY_ACKS_LATE` setting.
     acks_late = False
 
-    #: default task expiry time.
+    #: Default task expiry time.
     expires = None
 
-    #: the type of task *(no longer used)*.
+    #: The type of task *(no longer used)*.
     type = "regular"
 
     def __call__(self, *args, **kwargs):
@@ -248,19 +248,22 @@ class BaseTask(object):
         automatically passed by the worker if the function/method
         supports them:
 
-            * task_id
-            * task_name
-            * task_retries
-            * task_is_eager
-            * logfile
-            * loglevel
-            * delivery_info
+            * `task_id`
+            * `task_name`
+            * `task_retries
+            * `task_is_eager`
+            * `logfile`
+            * `loglevel`
+            * `delivery_info`
 
-        Additional standard keyword arguments may be added in the future.
         To take these default arguments, the task can either list the ones
         it wants explicitly or just take an arbitrary list of keyword
         arguments (\*\*kwargs).
 
+        Magic keyword arguments can be disabled using the
+        :attr:`accept_magic_kwargs` flag.  The information can then
+        be found in the :attr:`request` attribute.
+
         """
         raise NotImplementedError("Tasks must define the run method.")
 
@@ -291,11 +294,11 @@ class BaseTask(object):
 
         :rtype :class:`~celery.app.amqp.TaskPublisher`:
 
-        Please be sure to close the AMQP connection when you're done
-        with this object, i.e.:
+        Please be sure to close the AMQP connection after you're done
+        with this object.  Example::
 
             >>> publisher = self.get_publisher()
-            >>> # do something with publisher
+            >>> # ... do something with publisher
             >>> publisher.connection.close()
 
         """
@@ -311,12 +314,12 @@ class BaseTask(object):
 
     @classmethod
     def get_consumer(self, connection=None, connect_timeout=None):
-        """Get a celery task message consumer.
+        """Get message consumer.
 
         :rtype :class:`~celery.app.amqp.TaskConsumer`:
 
         Please be sure to close the AMQP connection when you're done
-        with this object. i.e.:
+        with this object.  Example::
 
             >>> consumer = self.get_consumer()
             >>> # do something with consumer
@@ -330,8 +333,8 @@ class BaseTask(object):
 
     @classmethod
     def delay(self, *args, **kwargs):
-        """Shortcut to :meth:`apply_async`, with star arguments,
-        but doesn't support the extra options.
+        """Shortcut to :meth:`apply_async` giving star arguments, but without
+        options.
 
         :param \*args: positional arguments passed on to the task.
         :param \*\*kwargs: keyword arguments passed on to the task.
@@ -349,68 +352,71 @@ class BaseTask(object):
         """Run a task asynchronously by the celery daemon(s).
 
         :keyword args: The positional arguments to pass on to the
-            task (a :class:`list` or :class:`tuple`).
+                       task (a :class:`list` or :class:`tuple`).
 
         :keyword kwargs: The keyword arguments to pass on to the
-            task (a :class:`dict`)
+                         task (a :class:`dict`)
 
         :keyword countdown: Number of seconds into the future that the
-            task should execute. Defaults to immediate delivery (Do not
-            confuse that with the `immediate` setting, they are
-            unrelated).
+                            task should execute. Defaults to immediate
+                            delivery (do not confuse with the
+                            `immediate` flag, as they are unrelated).
 
-        :keyword eta: A :class:`~datetime.datetime` object that describes
-            the absolute time and date of when the task should execute.
-            May not be specified if `countdown` is also supplied. (Do
-            not confuse this with the `immediate` setting, they are
-            unrelated).
+        :keyword eta: A :class:`~datetime.datetime` object describing
+                      the absolute time and date of when the task should
+                      be executed.  May not be specified if `countdown`
+                      is also supplied.  (Do not confuse this with the
+                      `immediate` flag, as they are unrelated).
 
         :keyword expires: Either a :class:`int`, describing the number of
-            seconds, or a :class:`~datetime.datetime` object that
-            describes the absolute time and date of when the task should
-            expire. The task will not be executed after the
-            expiration time.
+                          seconds, or a :class:`~datetime.datetime` object
+                          that describes the absolute time and date of when
+                          the task should expire.  The task will not be
+                          executed after the expiration time.
 
         :keyword connection: Re-use existing broker connection instead
-            of establishing a new one. The `connect_timeout` argument
-            is not respected if this is set.
+                             of establishing a new one.  The `connect_timeout`
+                             argument is not respected if this is set.
 
-        :keyword connect_timeout: The timeout in seconds, before we give
-            up on establishing a connection to the AMQP server.
+        :keyword connect_timeout: The timeout in seconds, before we give up
+                                  on establishing a connection to the AMQP
+                                  server.
 
         :keyword routing_key: The routing key used to route the task to a
-            worker server. Defaults to the tasks
-            :attr:`routing_key` attribute.
+                              worker server.  Defaults to the
+                              :attr:`routing_key` attribute.
 
         :keyword exchange: The named exchange to send the task to.
-            Defaults to the tasks :attr:`exchange` attribute.
+                           Defaults to the :attr:`exchange` attribute.
 
-        :keyword exchange_type: The exchange type to initalize the
-            exchange if not already declared. Defaults to the tasks
-            :attr:`exchange_type` attribute.
+        :keyword exchange_type: The exchange type to initalize the exchange
+                                if not already declared.  Defaults to the
+                                :attr:`exchange_type` attribute.
 
-        :keyword immediate: Request immediate delivery. Will raise an
-            exception if the task cannot be routed to a worker
-            immediately.  (Do not confuse this parameter with
-            the `countdown` and `eta` settings, as they are
-            unrelated). Defaults to the tasks :attr:`immediate` attribute.
+        :keyword immediate: Request immediate delivery.  Will raise an
+                            exception if the task cannot be routed to a worker
+                            immediately.  (Do not confuse this parameter with
+                            the `countdown` and `eta` settings, as they are
+                            unrelated).  Defaults to the :attr:`immediate`
+                            attribute.
 
         :keyword mandatory: Mandatory routing. Raises an exception if
-            there's no running workers able to take on this task.
-            Defaults to the tasks :attr:`mandatory` attribute.
+                            there's no running workers able to take on this
+                            task.  Defaults to the :attr:`mandatory`
+                            attribute.
 
         :keyword priority: The task priority, a number between 0 and 9.
-            Defaults to the tasks :attr:`priority` attribute.
+                           Defaults to the :attr:`priority` attribute.
 
-        :keyword serializer: A string identifying the default
-            serialization method to use. Defaults to the
-            :setting:`CELERY_TASK_SERIALIZER` setting. Can be `pickle`,
-            `json`, `yaml`, or any custom serialization method
-            that has been registered with
-            :mod:`carrot.serialization.registry`. Defaults to the tasks
-            :attr:`serializer` attribute.
+        :keyword serializer: A string identifying the default serialization
+                             method to use. Can be `pickle`, `json`, `yaml`,
+                             `msgpack` or any custom serialization method
+                             that has been registered with
+                             :mod:`carrot.serialization.registry`.
+                             Defaults to :attr:`serializer` attribute.
 
-        **Note**: If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will
+        .. note::
+            If the :setting:`CELERY_ALWAYS_EAGER` setting is set, it will
             be replaced by a local :func:`apply` call instead.
 
         """
@@ -450,38 +456,41 @@ class BaseTask(object):
         :param args: Positional arguments to retry with.
         :param kwargs: Keyword arguments to retry with.
         :keyword exc: Optional exception to raise instead of
-            :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
-            restart limit has been exceeded.
+                      :exc:`~celery.exceptions.MaxRetriesExceededError`
+                      when the max restart limit has been exceeded.
         :keyword countdown: Time in seconds to delay the retry for.
         :keyword eta: Explicit time and date to run the retry at
-        (must be a :class:`~datetime.datetime` instance).
+                      (must be a :class:`~datetime.datetime` instance).
         :keyword \*\*options: Any extra options to pass on to
-            meth:`apply_async`. See :func:`celery.execute.apply_async`.
+                              meth:`apply_async`.
         :keyword throw: If this is :const:`False`, do not raise the
-            :exc:`~celery.exceptions.RetryTaskError` exception,
-            that tells the worker to mark the task as being retried.
-            Note that this means the task will be marked as failed
-            if the task raises an exception, or successful if it
-            returns.
+                        :exc:`~celery.exceptions.RetryTaskError` exception,
+                        that tells the worker to mark the task as being
+                        retried.  Note that this means the task will be
+                        marked as failed if the task raises an exception,
+                        or successful if it returns.
 
         :raises celery.exceptions.RetryTaskError: To tell the worker that
             the task has been re-sent for retry. This always happens,
             unless the `throw` keyword argument has been explicitly set
             to :const:`False`, and is considered normal operation.
 
-        Example
+        **Example**
+
+        .. code-block:: python
 
-            >>> class TwitterPostStatusTask(Task):
-            ...
-            ...     def run(self, username, password, message, **kwargs):
-            ...         twitter = Twitter(username, password)
-            ...         try:
-            ...             twitter.post_status(message)
-            ...         except twitter.FailWhale, exc:
-            ...             # Retry in 5 minutes.
-            ...             self.retry([username, password, message],
-            ...                        kwargs,
-            ...                        countdown=60 * 5, exc=exc)
+            >>> @task
+            >>> def tweet(auth, message):
+            ...     twitter = Twitter(oauth=auth)
+            ...     try:
+            ...         twitter.post_status_update(message)
+            ...     except twitter.FailWhale, exc:
+            ...         # Retry in 5 minutes.
+            ...         return tweet.retry(countdown=60 * 5, exc=exc)
+
+        Although the task will never return above as `retry` raises an
+        exception to notify the worker, we use `return` in front of the retry
+        to convey that the rest of the block will not be executed.
 
         """
         request = self.request
@@ -498,7 +507,7 @@ class BaseTask(object):
         options["retries"] = request.retries + 1
         options["task_id"] = request.id
         options["countdown"] = options.get("countdown",
-                                        self.default_retry_delay)
+                                           self.default_retry_delay)
         max_exc = exc or self.MaxRetriesExceededError(
                 "Can't retry %s[%s] args:%s kwargs:%s" % (
                     self.name, options["task_id"], args, kwargs))
@@ -527,13 +536,12 @@ class BaseTask(object):
 
         :param args: positional arguments passed on to the task.
         :param kwargs: keyword arguments passed on to the task.
-        :keyword throw: Re-raise task exceptions. Defaults to
-            the :setting:`CELERY_EAGER_PROPAGATES_EXCEPTIONS` setting.
+        :keyword throw: Re-raise task exceptions.  Defaults to
+                        the :setting:`CELERY_EAGER_PROPAGATES_EXCEPTIONS`
+                        setting.
 
         :rtype :class:`celery.result.EagerResult`:
 
-        See :func:`celery.execute.apply`.
-
         """
         args = args or []
         kwargs = kwargs or {}
@@ -607,7 +615,7 @@ class BaseTask(object):
         :param kwargs: Original keyword arguments for the retried task.
 
         :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
-        instance, containing the traceback.
+                        instance, containing the traceback.
 
         The return value of this handler is ignored.
 
@@ -623,10 +631,10 @@ class BaseTask(object):
         :param task_id: Unique id of the task.
         :param args: Original arguments for the task that failed.
         :param kwargs: Original keyword arguments for the task
-            that failed.
+                       that failed.
 
         :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
-        instance, containing the traceback (if any).
+                        instance, containing the traceback (if any).
 
         The return value of this handler is ignored.
 
@@ -642,10 +650,10 @@ class BaseTask(object):
         :param task_id: Unique id of the failed task.
         :param args: Original arguments for the task that failed.
         :param kwargs: Original keyword arguments for the task
-            that failed.
+                       that failed.
 
         :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
-            instance, containing the traceback.
+                        instance, containing the traceback.
 
         The return value of this handler is ignored.
 
@@ -679,7 +687,7 @@ class BaseTask(object):
         wrapper.execute_using_pool(pool, loglevel, logfile)
 
     def __repr__(self):
-        """repr(task)"""
+        """`repr(task)`"""
         try:
             kind = self.__class__.mro()[1].__name__
         except (AttributeError, IndexError):            # pragma: no cover
@@ -688,8 +696,8 @@ class BaseTask(object):
 
     @classmethod
     def subtask(cls, *args, **kwargs):
-        """Returns a :class:`~celery.task.sets.subtask` object for
-        this task that wraps arguments and execution options
+        """Returns :class:`~celery.task.sets.subtask` object for
+        this task, wrapping arguments and execution options
         for a single task invocation."""
         return subtask(cls, *args, **kwargs)
 

+ 3 - 3
celery/tests/test_buckets.py

@@ -45,7 +45,7 @@ class test_TokenBucketQueue(unittest.TestCase):
     @skip_if_disabled
     def empty_queue_yields_QueueEmpty(self):
         x = buckets.TokenBucketQueue(fill_rate=10)
-        self.assertRaises(buckets.QueueEmpty, x.get)
+        self.assertRaises(buckets.Empty, x.get)
 
     @skip_if_disabled
     def test_bucket__put_get(self):
@@ -135,7 +135,7 @@ class test_TaskBucket(unittest.TestCase):
     @skip_if_disabled
     def test_get_nowait(self):
         x = buckets.TaskBucket(task_registry=self.registry)
-        self.assertRaises(buckets.QueueEmpty, x.get_nowait)
+        self.assertRaises(buckets.Empty, x.get_nowait)
 
     @skip_if_disabled
     def test_refresh(self):
@@ -197,7 +197,7 @@ class test_TaskBucket(unittest.TestCase):
     @skip_if_disabled
     def test_on_empty_buckets__get_raises_empty(self):
         b = buckets.TaskBucket(task_registry=self.registry)
-        self.assertRaises(buckets.QueueEmpty, b.get, block=False)
+        self.assertRaises(buckets.Empty, b.get, block=False)
         self.assertEqual(b.qsize(), 0)
 
     @skip_if_disabled

+ 2 - 1
celery/tests/test_worker_job.py

@@ -322,7 +322,8 @@ class test_TaskRequest(unittest.TestCase):
         tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
         x = tw.success_msg % {"name": tw.task_name,
                               "id": tw.task_id,
-                              "return_value": 10}
+                              "return_value": 10,
+                              "runtime": 0.1376}
         self.assertTrue(x)
         x = tw.error_msg % {"name": tw.task_name,
                            "id": tw.task_id,

+ 31 - 53
celery/worker/__init__.py

@@ -1,8 +1,3 @@
-"""
-
-The Multiprocessing Worker Server
-
-"""
 import socket
 import logging
 import traceback
@@ -23,10 +18,13 @@ RUN = 0x1
 CLOSE = 0x2
 TERMINATE = 0x3
 
+#: List of signals to reset when a child process starts.
 WORKER_SIGRESET = frozenset(["SIGTERM",
                              "SIGHUP",
                              "SIGTTIN",
                              "SIGTTOU"])
+
+#: List of signals to ignore when a child process starts.
 WORKER_SIGIGNORE = frozenset(["SIGINT"])
 
 
@@ -50,65 +48,46 @@ def process_initializer(app, hostname):
 
 
 class WorkController(object):
-    """Executes tasks waiting in the task queue.
-
-    :param concurrency: see :attr:`concurrency`.
-    :param logfile: see :attr:`logfile`.
-    :param loglevel: see :attr:`loglevel`.
-    :param embed_clockservice: see :attr:`embed_clockservice`.
-    :param send_events: see :attr:`send_events`.
-
-    .. attribute:: concurrency
-
-        The number of simultaneous processes doing work (default:
-        :setting:`CELERYD_CONCURRENCY`)
-
-    .. attribute:: loglevel
-
-        The loglevel used (default: :const:`logging.INFO`)
-
-    .. attribute:: logfile
+    """Unmanaged worker instance."""
 
-        The logfile used, if no logfile is specified it uses `stderr`
-        (default: :setting:`CELERYD_LOG_FILE`).
+    #: The number of simultaneous processes doing work (default:
+    #: :setting:`CELERYD_CONCURRENCY`)
+    concurrency = None
 
-    .. attribute:: embed_clockservice
-
-        If :const:`True`, celerybeat is embedded, running in the main worker
-        process as a thread.
-
-    .. attribute:: send_events
-
-        Enable the sending of monitoring events, these events can be captured
-        by monitors (celerymon).
-
-    .. attribute:: logger
-
-        The :class:`logging.Logger` instance used for logging.
-
-    .. attribute:: pool
+    #: The loglevel used (default: :const:`logging.INFO`)
+    loglevel = logging.ERROR
 
-        The :class:`multiprocessing.Pool` instance used.
+    #: The logfile used, if no logfile is specified it uses `stderr`
+    #: (default: :setting:`CELERYD_LOG_FILE`).
+    logfile = None
 
-    .. attribute:: ready_queue
+    #: If :const:`True`, celerybeat is embedded, running in the main worker
+    #: process as a thread.
+    embed_clockservice = None
 
-        The :class:`Queue.Queue` that holds tasks ready for immediate
-        processing.
+    #: Enable the sending of monitoring events, these events can be captured
+    #: by monitors (celerymon).
+    send_events = False
 
-    .. attribute:: schedule_controller
+    #: The :class:`logging.Logger` instance used for logging.
+    logger = None
 
-        Instance of :class:`celery.worker.controllers.ScheduleController`.
+    #: The pool instance used.
+    pool = None
 
-    .. attribute:: mediator
+    #: The internal queue object that holds tasks ready for immediate
+    #: processing.
+    ready_queue = None
 
-        Instance of :class:`celery.worker.controllers.Mediator`.
+    #: Instance of :class:`celery.worker.controllers.ScheduleController`.
+    schedule_controller = None
 
-    .. attribute:: listener
+    #: Instance of :class:`celery.worker.controllers.Mediator`.
+    mediator = None
 
-        Instance of :class:`CarrotListener`.
+    #: Consumer instance.
+    listener = None
 
-    """
-    loglevel = logging.ERROR
     _state = None
     _running = 0
 
@@ -275,7 +254,6 @@ class WorkController(object):
         self._shutdown(warm=False)
 
     def _shutdown(self, warm=True):
-        """Gracefully shutdown the worker server."""
         what = (warm and "stopping" or "terminating").capitalize()
 
         if self._state != RUN or self._running != len(self.components):

+ 31 - 37
celery/worker/buckets.py

@@ -2,7 +2,7 @@ import threading
 import time
 
 from collections import deque
-from Queue import Queue, Empty as QueueEmpty
+from Queue import Queue, Empty
 
 from celery.datastructures import TokenBucket
 from celery.utils import timeutils
@@ -15,8 +15,8 @@ class RateLimitExceeded(Exception):
 
 class TaskBucket(object):
     """This is a collection of token buckets, each task type having
-    its own token bucket. If the task type doesn't have a rate limit,
-    it will have a plain :class:`Queue` object instead of a
+    its own token bucket.  If the task type doesn't have a rate limit,
+    it will have a plain :class:`~Queue.Queue` object instead of a
     :class:`TokenBucketQueue`.
 
     The :meth:`put` operation forwards the task to its appropriate bucket,
@@ -32,12 +32,11 @@ class TaskBucket(object):
          "video.compress": TokenBucketQueue(fill_rate=2)}
 
     The get operation will iterate over these until one of the buckets
-    is able to return an item. The underlying datastructure is a `dict`,
+    is able to return an item.  The underlying datastructure is a `dict`,
     so the order is ignored here.
 
     :param task_registry: The task registry used to get the task
-        type class for a given task name.
-
+                          type class for a given task name.
 
     """
 
@@ -65,8 +64,8 @@ class TaskBucket(object):
     def _get_immediate(self):
         try:
             return self.immediate.popleft()
-        except IndexError:                      # Empty
-            raise QueueEmpty()
+        except IndexError:
+            raise Empty()
 
     def _get(self):
         # If the first bucket is always returning items, we would never
@@ -75,8 +74,8 @@ class TaskBucket(object):
         # "immediate". This queue is always checked for cached items first.
         try:
             return 0, self._get_immediate()
-        except QueueEmpty:
-                pass
+        except Empty:
+            pass
 
         remaining_times = []
         for bucket in self.buckets.values():
@@ -85,7 +84,7 @@ class TaskBucket(object):
                 try:
                     # Just put any ready items into the immediate queue.
                     self.immediate.append(bucket.get_nowait())
-                except QueueEmpty:
+                except Empty:
                     pass
                 except RateLimitExceeded:
                     remaining_times.append(bucket.expected_time())
@@ -95,7 +94,7 @@ class TaskBucket(object):
         # Try the immediate queue again.
         try:
             return 0, self._get_immediate()
-        except QueueEmpty:
+        except Empty:
             if not remaining_times:
                 # No items in any of the buckets.
                 raise
@@ -119,14 +118,14 @@ class TaskBucket(object):
             while True:
                 try:
                     remaining_time, item = self._get()
-                except QueueEmpty:
+                except Empty:
                     if not block or did_timeout():
                         raise
                     self.not_empty.wait(timeout)
                     continue
                 if remaining_time:
                     if not block or did_timeout():
-                        raise QueueEmpty
+                        raise Empty()
                     time.sleep(min(remaining_time, timeout or 1))
                 else:
                     return item
@@ -178,8 +177,8 @@ class TaskBucket(object):
         """Add a bucket for a task type.
 
         Will read the tasks rate limit and create a :class:`TokenBucketQueue`
-        if it has one. If the task doesn't have a rate limit a regular Queue
-        will be used.
+        if it has one.  If the task doesn't have a rate limit
+        :class:`FastQueue` will be used instead.
 
         """
         if task_name not in self.buckets:
@@ -190,14 +189,17 @@ class TaskBucket(object):
         return sum(bucket.qsize() for bucket in self.buckets.values())
 
     def empty(self):
+        """Returns :const:`True` if all of the buckets are empty."""
         return all(bucket.empty() for bucket in self.buckets.values())
 
     def clear(self):
+        """Delete the data in all of the buckets."""
         for bucket in self.buckets.values():
             bucket.clear()
 
     @property
     def items(self):
+        """Flattens the data in all of the buckets into a single list."""
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
         return filter(None, chain_from_iterable(izip_longest(*[bucket.items
@@ -229,8 +231,9 @@ class TokenBucketQueue(object):
     operations.
 
     :param fill_rate: The rate in tokens/second that the bucket will
-      be refilled.
-    :keyword capacity: Maximum number of tokens in the bucket. Default is 1.
+                      be refilled.
+    :keyword capacity: Maximum number of tokens in the bucket.
+                       Default is 1.
 
     """
     RateLimitExceeded = RateLimitExceeded
@@ -242,11 +245,7 @@ class TokenBucketQueue(object):
             self.queue = Queue()
 
     def put(self, item, block=True):
-        """Put an item into the queue.
-
-        Also see :meth:`Queue.Queue.put`.
-
-        """
+        """Put an item onto the queue."""
         self.queue.put(item, block=block)
 
     def put_nowait(self, item):
@@ -254,8 +253,6 @@ class TokenBucketQueue(object):
 
         :raises Queue.Full: If a free slot is not immediately available.
 
-        Also see :meth:`Queue.Queue.put_nowait`
-
         """
         return self.put(item, block=False)
 
@@ -263,11 +260,10 @@ class TokenBucketQueue(object):
         """Remove and return an item from the queue.
 
         :raises RateLimitExceeded: If a token could not be consumed from the
-            token bucket (consuming from the queue too fast).
+                                   token bucket (consuming from the queue
+                                   too fast).
         :raises Queue.Empty: If an item is not immediately available.
 
-        Also see :meth:`Queue.Queue.get`.
-
         """
         get = block and self.queue.get or self.queue.get_nowait
 
@@ -280,26 +276,23 @@ class TokenBucketQueue(object):
         """Remove and return an item from the queue without blocking.
 
         :raises RateLimitExceeded: If a token could not be consumed from the
-            token bucket (consuming from the queue too fast).
+                                   token bucket (consuming from the queue
+                                   too fast).
         :raises Queue.Empty: If an item is not immediately available.
 
-        Also see :meth:`Queue.Queue.get_nowait`.
-
         """
         return self.get(block=False)
 
     def qsize(self):
-        """Returns the size of the queue.
-
-        See :meth:`Queue.Queue.qsize`.
-
-        """
+        """Returns the size of the queue."""
         return self.queue.qsize()
 
     def empty(self):
+        """Returns :const:`True` if the queue is empty."""
         return self.queue.empty()
 
     def clear(self):
+        """Delete all data in the queue."""
         return self.items.clear()
 
     def wait(self, block=False):
@@ -312,10 +305,11 @@ class TokenBucketQueue(object):
             time.sleep(remaining)
 
     def expected_time(self, tokens=1):
-        """Returns the expected time in seconds when a new token should be
+        """Returns the expected time in seconds of when a new token should be
         available."""
         return self._bucket.expected_time(tokens)
 
     @property
     def items(self):
+        """Underlying data.  Do not modify."""
         return self.queue.queue

+ 13 - 19
celery/worker/controllers.py

@@ -9,7 +9,7 @@ import threading
 import traceback
 
 from time import sleep, time
-from Queue import Empty as QueueEmpty
+from Queue import Empty
 
 from celery.app import app_or_default
 from celery.utils.compat import log_with_extra
@@ -63,11 +63,11 @@ class Autoscaler(threading.Thread):
     def run(self):
         while not self._shutdown.isSet():
             self.scale()
-        self._stopped.set()                 # indicate that we are stopped
+        self._stopped.set()
 
     def stop(self):
         self._shutdown.set()
-        self._stopped.wait()                # block until this thread is done
+        self._stopped.wait()
         self.join(1e100)
 
     @property
@@ -80,21 +80,15 @@ class Autoscaler(threading.Thread):
 
 
 class Mediator(threading.Thread):
-    """Thread continuously sending tasks in the queue to the pool.
+    """Thread continuously moving tasks in the ready queue to the pool."""
 
-    .. attribute:: ready_queue
+    #: The task queue, a :class:`~Queue.Queue` instance.
+    ready_queue = None
 
-        The task queue, a :class:`Queue.Queue` instance.
+    #: Callback called when a task is obtained.
+    callback = None
 
-    .. attribute:: callback
-
-        The callback used to process tasks retrieved from the
-        :attr:`ready_queue`.
-
-    """
-
-    def __init__(self, ready_queue, callback, logger=None,
-            app=None):
+    def __init__(self, ready_queue, callback, logger=None, app=None):
         threading.Thread.__init__(self)
         self.app = app_or_default(app)
         self.logger = logger or self.app.log.get_default_logger()
@@ -107,9 +101,8 @@ class Mediator(threading.Thread):
 
     def move(self):
         try:
-            # This blocks until there's a message in the queue.
             task = self.ready_queue.get(timeout=1.0)
-        except QueueEmpty:
+        except Empty:
             return
 
         if task.revoked():
@@ -131,12 +124,13 @@ class Mediator(threading.Thread):
                                            "name": task.task_name}})
 
     def run(self):
+        """Move tasks forver or until :meth:`stop` is called."""
         while not self._shutdown.isSet():
             self.move()
-        self._stopped.set()                 # indicate that we are stopped
+        self._stopped.set()
 
     def stop(self):
         """Gracefully shutdown the thread."""
         self._shutdown.set()
-        self._stopped.wait()                # block until this thread is done
+        self._stopped.wait()
         self.join(1e100)

+ 6 - 7
celery/worker/heartbeat.py

@@ -1,19 +1,18 @@
 import threading
+
 from time import time, sleep
 
 
 class Heart(threading.Thread):
-    """Thread sending heartbeats at an interval.
+    """Thread sending heartbeats at regular intervals.
 
     :param eventer: Event dispatcher used to send the event.
     :keyword interval: Time in seconds between heartbeats.
-        Default is 2 minutes.
-
-    .. attribute:: bpm
-
-        Beats per minute.
+                       Default is 2 minutes.
 
     """
+
+    #: Beats per minute.
     bpm = 0.5
 
     def __init__(self, eventer, interval=None):
@@ -64,6 +63,6 @@ class Heart(threading.Thread):
             return
         self._state = "CLOSE"
         self._shutdown.set()
-        self._stopped.wait()            # block until this thread is done
+        self._stopped.wait()            # blocks until this thread is done
         if self.isAlive():
             self.join(1e100)

+ 123 - 122
celery/worker/job.py

@@ -41,7 +41,7 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
-#: keys to keep from the message delivery info.  The values
+#: Keys to keep from the message delivery info.  The values
 #: of these keys must be pickleable.
 WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
 
@@ -82,10 +82,10 @@ class WorkerTaskTrace(TaskTrace):
 
     """
 
-    #: current loader.
+    #: Current loader.
     loader = None
 
-    #: hostname to report as.
+    #: Hostname to report as.
     hostname = None
 
     def __init__(self, *args, **kwargs):
@@ -166,10 +166,10 @@ def execute_and_trace(task_name, *args, **kwargs):
 class TaskRequest(object):
     """A request for task execution."""
 
-    #: kind of task.  Must be a name registered in the task registry.
+    #: Kind of task.  Must be a name registered in the task registry.
     name = None
 
-    #: the task class (set by constructor using :attr:`task_name`).
+    #: The task class (set by constructor using :attr:`task_name`).
     task = None
 
     #: UUID of the task.
@@ -181,51 +181,53 @@ class TaskRequest(object):
     #: Mapping of keyword arguments to apply to the task.
     kwargs = None
 
-    #: number of times the task has been retried.
+    #: Number of times the task has been retried.
     retries = 0
 
-    #: the tasks eta (for information only).
+    #: The tasks eta (for information only).
     eta = None
 
-    #: when the task expires.
+    #: When the task expires.
     expires = None
 
-    #: callback called when the task should be acknowledged.
+    #: Callback called when the task should be acknowledged.
     on_ack = None
 
-    #: the message object.  Used to acknowledge the message.
+    #: The message object.  Used to acknowledge the message.
     message = None
 
-    #: flag set when the task has been executed.
+    #: Flag set when the task has been executed.
     executed = False
 
-    #: additional delivery info, e.g. contains the path from
-    #: producer to consumer.
+    #: Additional delivery info, e.g. contains the path from
+    #: Producer to consumer.
     delivery_info = None
 
-    #: flag set when the task has been acknowledged.
+    #: Flag set when the task has been acknowledged.
     acknowledged = False
 
-    #: format string used to log task success.
-    success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
+    #: Format string used to log task success.
+    success_msg = """\
+        Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
+    """
 
-    #: format string used to log task failure.
+    #: Format string used to log task failure.
     error_msg = """\
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
     """
 
-    #: format string used to log task retry.
+    #: Format string used to log task retry.
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
-    #: format string used to generate error e-mail subjects.
+    #: Format string used to generate error e-mail subjects.
     email_subject = """\
         [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
     """
 
-    #: format string used to generate error e-mail content.
+    #: Format string used to generate error e-mail content.
     email_body = TASK_ERROR_EMAIL_BODY
 
-    #: timestamp set when the task is started.
+    #: Timestamp set when the task is started.
     time_start = None
 
     _already_revoked = False
@@ -255,58 +257,32 @@ class TaskRequest(object):
         if self.task.ignore_result:
             self._store_errors = self.task.store_errors_even_if_ignored
 
-    def maybe_expire(self):
-        if self.expires and datetime.now() > self.expires:
-            state.revoked.add(self.task_id)
-            if self._store_errors:
-                self.task.backend.mark_as_revoked(self.task_id)
-
-    def revoked(self):
-        if self._already_revoked:
-            return True
-        if self.expires:
-            self.maybe_expire()
-        if self.task_id in state.revoked:
-            self.logger.warn("Skipping revoked task: %s[%s]" % (
-                self.task_name, self.task_id))
-            self.send_event("task-revoked", uuid=self.task_id)
-            self.acknowledge()
-            self._already_revoked = True
-            return True
-        return False
-
     @classmethod
-    def from_message(cls, message, message_data, logger=None, eventer=None,
-            hostname=None, app=None):
-        """Create a :class:`TaskRequest` from a task message sent by
-        :class:`celery.app.amqp.TaskPublisher`.
+    def from_message(cls, message, message_data, **kw):
+        """Create request from a task message.
 
         :raises UnknownTaskError: if the message does not describe a task,
             the message is also rejected.
 
-        :returns :class:`TaskRequest`:
-
         """
-        task_name = message_data["task"]
-        task_id = message_data["id"]
-        args = message_data["args"]
-        kwargs = message_data["kwargs"]
-        retries = message_data.get("retries", 0)
-        eta = maybe_iso8601(message_data.get("eta"))
-        expires = maybe_iso8601(message_data.get("expires"))
-
         _delivery_info = getattr(message, "delivery_info", {})
         delivery_info = dict((key, _delivery_info.get(key))
                                 for key in WANTED_DELIVERY_INFO)
 
+        kwargs = message_data["kwargs"]
         if not hasattr(kwargs, "items"):
-            raise InvalidTaskError("Task kwargs must be a dictionary.")
-
-        return cls(task_name, task_id, args, kwdict(kwargs),
-                   retries=retries, on_ack=message.ack,
-                   delivery_info=delivery_info, logger=logger,
-                   eventer=eventer, hostname=hostname,
-                   eta=eta, expires=expires, app=app)
+            raise InvalidTaskError("Task keyword arguments is not a mapping.")
+
+        return cls(task_name=message_data["task"],
+                   task_id=message_data["id"],
+                   args=message_data["args"],
+                   kwargs=kwdict(kwargs),
+                   retries=message_data.get("retries", 0),
+                   eta=maybe_iso8601(message_data.get("eta")),
+                   expires=maybe_iso8601(message_data.get("expires")),
+                   on_ack=message.ack,
+                   delivery_info=delivery_info,
+                   **kw)
 
     def get_instance_attrs(self, loglevel, logfile):
         return {"logfile": logfile,
@@ -342,18 +318,33 @@ class TaskRequest(object):
         kwargs.update(extend_with)
         return kwargs
 
-    def _get_tracer_args(self, loglevel=None, logfile=None):
-        """Get the :class:`WorkerTaskTrace` tracer for this task."""
-        task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        return self.task_name, self.task_id, self.args, task_func_kwargs
+    def execute_using_pool(self, pool, loglevel=None, logfile=None):
+        """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
 
-    def _set_executed_bit(self):
-        """Set task as executed to make sure it's not executed again."""
-        if self.executed:
-            raise AlreadyExecutedError(
-                   "Task %s[%s] has already been executed" % (
-                       self.task_name, self.task_id))
-        self.executed = True
+        :param pool: A :class:`multiprocessing.Pool` instance.
+
+        :keyword loglevel: The loglevel used by the task.
+
+        :keyword logfile: The logfile used by the task.
+
+        """
+        if self.revoked():
+            return
+        # Make sure task has not already been executed.
+        self._set_executed_bit()
+
+        args = self._get_tracer_args(loglevel, logfile)
+        instance_attrs = self.get_instance_attrs(loglevel, logfile)
+        self.time_start = time.time()
+        result = pool.apply_async(execute_and_trace,
+                                  args=args,
+                                  kwargs={"hostname": self.hostname,
+                                          "request": instance_attrs},
+                                  accept_callback=self.on_accepted,
+                                  timeout_callback=self.on_timeout,
+                                  callbacks=[self.on_success],
+                                  errbacks=[self.on_failure])
+        return result
 
     def execute(self, loglevel=None, logfile=None):
         """Execute the task in a :class:`WorkerTaskTrace`.
@@ -365,6 +356,7 @@ class TaskRequest(object):
         """
         if self.revoked():
             return
+
         # Make sure task has not already been executed.
         self._set_executed_bit()
 
@@ -381,39 +373,34 @@ class TaskRequest(object):
         self.acknowledge()
         return retval
 
+    def maybe_expire(self):
+        """If expired, mark the task as revoked."""
+        if self.expires and datetime.now() > self.expires:
+            state.revoked.add(self.task_id)
+            if self._store_errors:
+                self.task.backend.mark_as_revoked(self.task_id)
+
+    def revoked(self):
+        """If revoked, skip task and mark state."""
+        if self._already_revoked:
+            return True
+        if self.expires:
+            self.maybe_expire()
+        if self.task_id in state.revoked:
+            self.logger.warn("Skipping revoked task: %s[%s]" % (
+                self.task_name, self.task_id))
+            self.send_event("task-revoked", uuid=self.task_id)
+            self.acknowledge()
+            self._already_revoked = True
+            return True
+        return False
+
     def send_event(self, type, **fields):
         if self.eventer:
             self.eventer.send(type, **fields)
 
-    def execute_using_pool(self, pool, loglevel=None, logfile=None):
-        """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
-
-        :param pool: A :class:`multiprocessing.Pool` instance.
-
-        :keyword loglevel: The loglevel used by the task.
-
-        :keyword logfile: The logfile used by the task.
-
-        """
-        if self.revoked():
-            return
-        # Make sure task has not already been executed.
-        self._set_executed_bit()
-
-        args = self._get_tracer_args(loglevel, logfile)
-        instance_attrs = self.get_instance_attrs(loglevel, logfile)
-        self.time_start = time.time()
-        result = pool.apply_async(execute_and_trace,
-                                  args=args,
-                                  kwargs={"hostname": self.hostname,
-                                          "request": instance_attrs},
-                                  accept_callback=self.on_accepted,
-                                  timeout_callback=self.on_timeout,
-                                  callbacks=[self.on_success],
-                                  errbacks=[self.on_failure])
-        return result
-
     def on_accepted(self):
+        """Handler called when task is accepted by worker pool."""
         state.task_accepted(self)
         if not self.task.acks_late:
             self.acknowledge()
@@ -422,6 +409,7 @@ class TaskRequest(object):
             self.task_name, self.task_id))
 
     def on_timeout(self, soft):
+        """Handler called if the task times out."""
         state.task_ready(self)
         if soft:
             self.logger.warning("Soft time limit exceeded for %s[%s]" % (
@@ -435,14 +423,8 @@ class TaskRequest(object):
         if self._store_errors:
             self.task.backend.mark_as_failure(self.task_id, exc)
 
-    def acknowledge(self):
-        if not self.acknowledged:
-            self.on_ack()
-            self.acknowledged = True
-
     def on_success(self, ret_value):
-        """The handler used if the task was successfully processed (
-        without raising an exception)."""
+        """Handler called if the task was successfully processed."""
         state.task_ready(self)
 
         if self.task.acks_late:
@@ -452,29 +434,25 @@ class TaskRequest(object):
         self.send_event("task-succeeded", uuid=self.task_id,
                         result=repr(ret_value), runtime=runtime)
 
-        msg = self.success_msg.strip() % {
+        self.logger.info(self.success_msg.strip() % {
                 "id": self.task_id,
                 "name": self.task_name,
-                "return_value": self.repr_result(ret_value)}
-        self.logger.info(msg)
-
-    def repr_result(self, result, maxlen=46):
-        # 46 is the length needed to fit
-        #     "the quick brown fox jumps over the lazy dog" :)
-        return truncate_text(repr(result), maxlen)
+                "return_value": self.repr_result(ret_value),
+                "runtime": runtime})
 
     def on_retry(self, exc_info):
+        """Handler called if the task should be retried."""
         self.send_event("task-retried", uuid=self.task_id,
                                         exception=repr(exc_info.exception.exc),
                                         traceback=repr(exc_info.traceback))
-        msg = self.retry_msg.strip() % {
+
+        self.logger.info(self.retry_msg.strip() % {
                 "id": self.task_id,
                 "name": self.task_name,
-                "exc": repr(exc_info.exception.exc)}
-        self.logger.info(msg)
+                "exc": repr(exc_info.exception.exc)})
 
     def on_failure(self, exc_info):
-        """The handler used if the task raised an exception."""
+        """Handler called if the task raised an exception."""
         state.task_ready(self)
 
         if self.task.acks_late:
@@ -514,6 +492,12 @@ class TaskRequest(object):
                               enabled=task_obj.send_error_emails,
                               whitelist=task_obj.error_whitelist)
 
+    def acknowledge(self):
+        """Acknowledge task."""
+        if not self.acknowledged:
+            self.on_ack()
+            self.acknowledged = True
+
     def send_error_email(self, task, context, exc,
             whitelist=None, enabled=False, fail_silently=True):
         if enabled and not task.disable_error_emails:
@@ -524,6 +508,11 @@ class TaskRequest(object):
             body = self.email_body.strip() % context
             self.app.mail_admins(subject, body, fail_silently=fail_silently)
 
+    def repr_result(self, result, maxlen=46):
+        # 46 is the length needed to fit
+        #     "the quick brown fox jumps over the lazy dog" :)
+        return truncate_text(repr(result), maxlen)
+
     def info(self, safe=False):
         args = self.args
         kwargs = self.kwargs
@@ -550,6 +539,18 @@ class TaskRequest(object):
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
                 self.__class__.__name__,
-                self.task_name, self.task_id,
-                self.args, self.kwargs)
+                self.task_name, self.task_id, self.args, self.kwargs)
+
+    def _get_tracer_args(self, loglevel=None, logfile=None):
+        """Get the :class:`WorkerTaskTrace` tracer for this task."""
+        task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
+        return self.task_name, self.task_id, self.args, task_func_kwargs
+
+    def _set_executed_bit(self):
+        """Set task as executed to make sure it's not executed again."""
+        if self.executed:
+            raise AlreadyExecutedError(
+                   "Task %s[%s] has already been executed" % (
+                       self.task_name, self.task_id))
+        self.executed = True
 

+ 6 - 6
docs/homepage/index.html

@@ -98,7 +98,7 @@ pageTracker._trackPageview();
     <h3>Getting Started</h3>
 
     <ol>
-        <li>Install celery by download or <code>pip install -U celery</code></li>
+        <li>Install celery by download or <code>pip install -U Celery</code></li>
         <li>Set up <a href="http://celeryq.org/docs/getting-started/broker-installation.html">RabbitMQ</a>
         or one of the <a href="http://celeryq.org/docs/tutorials/otherqueues.html">ghetto queue</a>
         solutions.
@@ -178,7 +178,7 @@ pageTracker._trackPageview();
       instructions please read the full
       <a href="http://celeryproject.org/docs/changelog.html">changelog</a>.
       Download from <a href="http://pypi.python.org/pypi/celery/1.0.6">PyPI</a>,
-      or simply install the upgrade using <code>pip install -U celery==1.0.6</code>.
+      or simply install the upgrade using <code>pip install -U Celery==1.0.6</code>.
       <hr>
       </span>
 
@@ -189,7 +189,7 @@ pageTracker._trackPageview();
       broker connection loss, as well as some other minor fixes. Also
       AbortableTask has been added to contrib. Please read the full <a href="http://celeryproject.org/docs/changelog.html">changelog</a>
       before you upgrade. Download from <a href="http://pypi.python.org/pypi/celery/1.0.5">PyPI</a>,
-      or simply install the upgrade using <code>pip install -U celery</code>.
+      or simply install the upgrade using <code>pip install -U Celery</code>.
       <hr>
       </span>
 
@@ -199,7 +199,7 @@ pageTracker._trackPageview();
       <p>This release contains a drastic improvement in reliability and
       performance. Please read the full <a href="http://celeryproject.org/docs/changelog.html">changelog</a>
       before you upgrade. Download from <a href="http://pypi.python.org/pypi/celery/1.0.3">PyPI</a>,
-      or simply install the upgrade using <code>pip install -U celery</code>.
+      or simply install the upgrade using <code>pip install -U Celery</code>.
       <hr>
       </span>
 
@@ -211,7 +211,7 @@ pageTracker._trackPageview();
       2.4. Read the full <a href="http://celeryproject.org/docs/changelog.html">Changelog</a>
       for more information. Download from <a
           href="http://pypi.python.org/pypi/celery/1.0.1">PyPI</a>,
-      or simply install the upgrade using <code>pip install -U celery</code>.
+      or simply install the upgrade using <code>pip install -U Celery</code>.
       <hr>
       </span>
 
@@ -221,7 +221,7 @@ pageTracker._trackPageview();
       <p>Celery 1.0 has finally been released! It is available on <a
           href="http://pypi.python.org/pypi/celery/1.0.0">PyPI</a> for
       downloading. You can also install it via <code>pip install
-          celery</code>. You can read the announcement <a href="http://celeryproject.org/celery_1.0_released.html">here</a>.
+          Celery</code>. You can read the announcement <a href="http://celeryproject.org/celery_1.0_released.html">here</a>.
       <hr>
       </span>
 

+ 3 - 3
docs/includes/installation.txt

@@ -1,13 +1,13 @@
-You can install `celery` either via the Python Package Index (PyPI)
+You can install Celery either via the Python Package Index (PyPI)
 or from source.
 
 To install using `pip`,::
 
-    $ pip install celery
+    $ pip install Celery
 
 To install using `easy_install`,::
 
-    $ easy_install celery
+    $ easy_install Celery
 
 Downloading and installing from source
 --------------------------------------

+ 4 - 4
docs/includes/introduction.txt

@@ -189,23 +189,23 @@ is hosted at Github.
 Installation
 ============
 
-You can install `celery` either via the Python Package Index (PyPI)
+You can install Celery either via the Python Package Index (PyPI)
 or from source.
 
 To install using `pip`,::
 
-    $ pip install celery
+    $ pip install Celery
 
 To install using `easy_install`,::
 
-    $ easy_install celery
+    $ easy_install Celery
 
 .. _celery-installing-from-source:
 
 Downloading and installing from source
 --------------------------------------
 
-Download the latest version of `celery` from
+Download the latest version of Celery from
 http://pypi.python.org/pypi/celery/
 
 You can install it by doing the following,::

+ 57 - 0
docs/reference/celery.app.rst

@@ -0,0 +1,57 @@
+.. currentmodule:: celery.app
+
+.. automodule:: celery.app
+
+    .. contents::
+        :local:
+
+    Functions
+    ---------
+
+    .. autofunction:: app_or_default
+
+    Application
+    -----------
+
+    .. autoclass:: App
+
+        .. attribute:: main
+
+            Name of the `__main__` module.  Required for standalone scripts.
+
+            If set this will be used instead of `__main__` when automatically
+            generating task names.
+
+        .. autoattribute:: main
+        .. autoattribute:: amqp
+        .. autoattribute:: backend
+        .. autoattribute:: loader
+        .. autoattribute:: conf
+        .. autoattribute:: control
+        .. autoattribute:: log
+
+        .. automethod:: config_from_object
+        .. automethod:: config_from_envvar
+        .. automethod:: config_from_cmdline
+
+        .. automethod:: task
+        .. automethod:: create_task_cls
+        .. automethod:: TaskSet
+        .. automethod:: send_task
+        .. automethod:: AsyncResult
+        .. automethod:: TaskSetResult
+
+        .. automethod:: worker_main
+        .. automethod:: Worker
+        .. automethod:: Beat
+
+        .. automethod:: broker_connection
+        .. automethod:: with_default_connection
+
+        .. automethod:: mail_admins
+
+        .. automethod:: pre_config_merge
+        .. automethod:: post_config_merge
+
+        .. automethod:: either
+        .. automethod:: merge

+ 1 - 0
docs/reference/index.rst

@@ -8,6 +8,7 @@
 .. toctree::
     :maxdepth: 2
 
+    celery.app
     celery.decorators
     celery.task.base
     celery.task.sets

+ 7 - 0
setup.cfg

@@ -1,3 +1,10 @@
+[egg_info]
+tag_build = dev
+tag_date = true
+
+[aliases]
+release = egg_info -RDb ''
+
 [nosetests]
 where = celery/tests
 cover3-branch = 1