Pārlūkot izejas kodu

Rewrite and updatae the execution guide (now called Calling Guide)

Ask Solem 12 gadi atpakaļ
vecāks
revīzija
0648ca3b0f

+ 23 - 27
celery/tests/tasks/test_result.py

@@ -13,6 +13,7 @@ from celery.result import (
     AsyncResult,
     EagerResult,
     GroupResult,
+    TaskSetResult,
     ResultSet,
     from_serializable,
 )
@@ -334,14 +335,13 @@ class SimpleBackend(object):
             return ((id, {"result": i}) for i, id in enumerate(self.ids))
 
 
-class test_GroupResult(AppCase):
+class test_TaskSetResult(AppCase):
 
     def setup(self):
         self.size = 10
-        self.ts = GroupResult(uuid(), make_mock_group(self.size))
+        self.ts = TaskSetResult(uuid(), make_mock_group(self.size))
 
     def test_total(self):
-        self.assertEqual(len(self.ts), self.size)
         self.assertEqual(self.ts.total, self.size)
 
     def test_compat_properties(self):
@@ -349,16 +349,32 @@ class test_GroupResult(AppCase):
         self.ts.taskset_id = "foo"
         self.assertEqual(self.ts.taskset_id, "foo")
 
+    def test_compat_subtasks_kwarg(self):
+        x = TaskSetResult(uuid(), subtasks=[1, 2, 3])
+        self.assertEqual(x.results, [1, 2, 3])
+
+    def test_itersubtasks(self):
+        it = self.ts.itersubtasks()
+
+        for i, t in enumerate(it):
+            self.assertEqual(t.get(), i)
+
+
+class test_GroupResult(AppCase):
+
+    def setup(self):
+        self.size = 10
+        self.ts = GroupResult(uuid(), make_mock_group(self.size))
+
+    def test_len(self):
+        self.assertEqual(len(self.ts), self.size)
+
     def test_eq_other(self):
         self.assertFalse(self.ts == 1)
 
     def test_reduce(self):
         self.assertTrue(loads(dumps(self.ts)))
 
-    def test_compat_subtasks_kwarg(self):
-        x = GroupResult(uuid(), subtasks=[1, 2, 3])
-        self.assertEqual(x.results, [1, 2, 3])
-
     def test_iterate_raises(self):
         ar = MockAsyncResultFailure(uuid())
         ts = GroupResult(uuid(), [ar])
@@ -435,17 +451,8 @@ class test_GroupResult(AppCase):
         with self.assertRaises(TimeoutError):
             ts.join(timeout=0.0000001)
 
-    def test_itersubtasks(self):
-
-        it = self.ts.itersubtasks()
-
-        for i, t in enumerate(it):
-            self.assertEqual(t.get(), i)
-
     def test___iter__(self):
-
         it = iter(self.ts)
-
         results = sorted(list(it))
         self.assertListEqual(results, list(xrange(self.size)))
 
@@ -488,17 +495,6 @@ class test_failed_AsyncResult(test_GroupResult):
         failed_res = AsyncResult(failed["id"])
         self.ts = GroupResult(uuid(), subtasks + [failed_res])
 
-    def test_itersubtasks(self):
-
-        it = self.ts.itersubtasks()
-
-        for i in xrange(self.size - 1):
-            t = it.next()
-            self.assertEqual(t.get(), i)
-        with self.assertRaises(KeyError):
-            t = it.next()   # need to do this in two lines or 2to3 borks.
-            t.get()
-
     def test_completed_count(self):
         self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
 

+ 3 - 2
docs/_ext/celerydocs.py

@@ -14,11 +14,12 @@ APPATTRS = {
     "tasks": "celery.app.registry.Registry",
 
     "AsyncResult": "celery.result.AsyncResult",
-    "TaskSetResult": "celery.result.TaskSetResult",
+    "GroupResult": "celery.result.GroupResult",
     "Worker": "celery.apps.worker.Worker",
     "WorkController": "celery.worker.WorkController",
     "Beat": "celery.apps.beat.Beat",
-    "Task": "celery.app.task.BaseTask",
+    "Task": "celery.app.task.Task",
+    "send_task": "celery.Celery.send_task",
 }
 
 ABBRS = {

+ 1 - 39
docs/configuration.rst

@@ -888,45 +888,7 @@ CELERY_TASK_PUBLISH_RETRY_POLICY
 Defines the default policy when retrying publishing a task message in
 the case of connection loss or other connection errors.
 
-This is a mapping that must contain the following keys:
-
-    * `max_retries`
-
-        Maximum number of retries before giving up, in this case the
-        exception that caused the retry to fail will be raised.
-
-        A value of 0 or :const:`None` means it will retry forever.
-
-        The default is to retry 3 times.
-
-    * `interval_start`
-
-        Defines the number of seconds (float or integer) to wait between
-        retries.  Default is 0, which means the first retry will be
-        instantaneous.
-
-    * `interval_step`
-
-        On each consecutive retry this number will be added to the retry
-        delay (float or integer).  Default is 0.2.
-
-    * `interval_max`
-
-        Maximum number of seconds (float or integer) to wait between
-        retries.  Default is 0.2.
-
-With the default policy of::
-
-    {"max_retries": 3,
-     "interval_start": 0,
-     "interval_step": 0.2,
-     "interval_max": 0.2}
-
-the maximum time spent retrying will be 0.4 seconds.  It is set relatively
-short by default because a connection failure could lead to a retry pile effect
-if the broker connection is down: e.g. many web server processes waiting
-to retry blocking other incoming requests.
-
+See :ref:`calling-retry` for more information.
 
 .. setting:: CELERY_DEFAULT_RATE_LIMIT
 

+ 1 - 1
docs/reference/celery.app.task.rst

@@ -7,4 +7,4 @@
 .. currentmodule:: celery.app.task
 
 .. automodule:: celery.app.task
-    :members: Context, TaskType, BaseTask
+    :members: Task, Context, TaskType

+ 59 - 61
docs/reference/celery.rst

@@ -2,14 +2,18 @@
  celery
 ========
 
+.. currentmodule:: celery
+
+.. module:: celery
+
+
 .. contents::
     :local:
 
 Application
 -----------
 
-.. class:: Celery(main=None, broker="amqp://guest:guest@localhost:5672//",
-                  loader="app", backend=None)
+.. class:: Celery(main='__main__', broker='amqp://localhost//', ...)
 
     :param main: Name of the main module if running as `__main__`.
     :keyword broker: URL of the default broker used.
@@ -25,63 +29,64 @@ Application
     :keyword set_as_current:  Make this the global current app.
     :keyword tasks: A task registry or the name of a registry class.
 
-    .. attribute:: main
+    .. attribute:: Celery.main
 
         Name of the `__main__` module.  Required for standalone scripts.
 
         If set this will be used instead of `__main__` when automatically
         generating task names.
 
-    .. attribute:: conf
+    .. attribute:: Celery.conf
 
         Current configuration.
 
-    .. attribute:: current_task
+    .. attribute:: Celery.current_task
 
         The instance of the task that is being executed, or :const:`None`.
 
-    .. attribute:: amqp
+    .. attribute:: Celery.amqp
 
         AMQP related functionality: :class:`~@amqp`.
 
-    .. attribute:: backend
+    .. attribute:: Celery.backend
 
         Current backend instance.
 
-    .. attribute:: loader
+    .. attribute:: Celery.loader
 
         Current loader instance.
 
-    .. attribute:: control
+    .. attribute:: Celery.control
 
         Remote control: :class:`~@control`.
 
-    .. attribute:: events
+    .. attribute:: Celery.events
 
         Consuming and sending events: :class:`~@events`.
 
-    .. attribute:: log
+    .. attribute:: Celery.log
 
         Logging: :class:`~@log`.
 
-    .. attribute:: tasks
+    .. attribute:: Celery.tasks
 
         Task registry.
 
-    .. attribute:: pool
+    .. attribute:: Celery.pool
 
         Broker connection pool: :class:`~@pool`.
+        This attribute is not related to the workers concurrency pool.
 
-    .. attribute:: Task
+    .. attribute:: Celery.Task
 
         Base task class for this app.
 
-    .. method:: bugreport
+    .. method:: Celery.bugreport
 
         Returns a string with information useful for the Celery core
         developers when reporting a bug.
 
-    .. method:: config_from_object(obj, silent=False)
+    .. method:: Celery.config_from_object(obj, silent=False)
 
         Reads configuration from object, where object is either
         an object or the name of a module to import.
@@ -95,7 +100,7 @@ Application
             >>> from myapp import celeryconfig
             >>> celery.config_from_object(celeryconfig)
 
-    .. method:: config_from_envvar(variable_name, silent=False)
+    .. method:: Celery.config_from_envvar(variable_name, silent=False)
 
         Read configuration from environment variable.
 
@@ -107,28 +112,17 @@ Application
             >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
             >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
 
-    .. method:: config_from_cmdline(argv, namespace="celery")
-
-        Parses argv for configuration strings.
-
-        Configuration strings must be located after a '--' sequence,
-        e.g.::
-
-            program arg1 arg2 -- celeryd.concurrency=10
-
-        :keyword namespace: Default namespace if omitted.
-
-    .. method:: start(argv=None)
+    .. method:: Celery.start(argv=None)
 
         Run :program:`celery` using `argv`.
 
         Uses :data:`sys.argv` if `argv` is not specified.
 
-    .. method:: task(fun, **options)
+    .. method:: Celery.task(fun, ...)
 
         Decorator to create a task class out of any callable.
 
-        **Examples:**
+        Examples:
 
         .. code-block:: python
 
@@ -155,48 +149,45 @@ Application
             application is fully set up (finalized).
 
 
-    .. method:: send_task(name, args=(), kwargs={}, countdown=None,
-            eta=None, task_id=None, publisher=None, connection=None,
-            result_cls=AsyncResult, expires=None, queues=None, **options)
+    .. method:: Celery.send_task(name[, args[, kwargs[, ...]]])
 
-        Send task by **name**.
+        Send task by name.
 
         :param name: Name of task to call (e.g. `"tasks.add"`).
         :keyword result_cls: Specify custom result class. Default is
             using :meth:`AsyncResult`.
 
-        Otherwise supports the same arguments as :meth:`~@Task.apply_async`.
+        Otherwise supports the same arguments as :meth:`@-Task.apply_async`.
 
-    .. attribute:: AsyncResult
+    .. attribute:: Celery.AsyncResult
 
         Create new result instance. See :class:`~celery.result.AsyncResult`.
 
-    .. attribute:: GroupResult
+    .. attribute:: Celery.GroupResult
 
         Create new taskset result instance.
         See :class:`~celery.result.GroupResult`.
 
-    .. method:: worker_main(argv=None)
+    .. method:: Celery.worker_main(argv=None)
 
         Run :program:`celeryd` using `argv`.
 
         Uses :data:`sys.argv` if `argv` is not specified."""
 
-    .. attribute:: Worker
+    .. attribute:: Celery.Worker
 
         Worker application. See :class:`~@Worker`.
 
-    .. attribute:: WorkController
+    .. attribute:: Celery.WorkController
 
         Embeddable worker. See :class:`~@WorkController`.
 
-    .. attribute:: Beat
+    .. attribute:: Celery.Beat
 
         Celerybeat scheduler application.
         See :class:`~@Beat`.
 
-    .. method:: broker_connection(url="amqp://guest:guest@localhost:5672//",
-            ssl=False, transport_options={})
+    .. method:: Celery.broker_connection(url=default, [ssl, [transport_options={}]])
 
         Establish a connection to the message broker.
 
@@ -215,7 +206,7 @@ Application
 
         :returns :class:`kombu.connection.BrokerConnection`:
 
-    .. method:: default_connection(connection=None)
+    .. method:: Celery.default_connection(connection=None)
 
         For use within a with-statement to get a connection from the pool
         if one is not already provided.
@@ -224,30 +215,30 @@ Application
                              acquired from the connection pool.
 
 
-    .. method:: mail_admins(subject, body, fail_silently=False)
+    .. method:: Celery.mail_admins(subject, body, fail_silently=False)
 
         Sends an email to the admins in the :setting:`ADMINS` setting.
 
-    .. method:: select_queues(queues=[])
+    .. method:: Celery.select_queues(queues=[])
 
         Select a subset of queues, where queues must be a list of queue
         names to keep.
 
-    .. method:: now()
+    .. method:: Celery.now()
 
         Returns the current time and date as a :class:`~datetime.datetime`
         object.
 
-    .. method:: set_current()
+    .. method:: Celery.set_current()
 
         Makes this the current app for this thread.
 
-    .. method:: finalize()
+    .. method:: Celery.finalize()
 
         Finalizes the app by loading built-in tasks,
         and evaluating pending task decorators
 
-    .. attribute:: Pickler
+    .. attribute:: Celery.Pickler
 
         Helper class used to pickle this application.
 
@@ -266,11 +257,14 @@ Grouping Tasks
 
     The ``apply_async`` method returns :class:`~@GroupResult`.
 
-.. class:: chain(*tasks)
+.. class:: chain(task1[, task2[, task3[,... taskN]]])
 
     Chains tasks together, so that each tasks follows each other
     by being applied as a callback of the previous task.
 
+    If called with only one argument, then that argument must
+    be an iterable of tasks to chain.
+
     Example::
 
         >>> res = chain(add.s(2, 2), add.s(4)).apply_async()
@@ -332,11 +326,11 @@ Grouping Tasks
         >>> subtask(s)
         {"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
 
-    .. method:: delay(*args, **kwargs)
+    .. method:: subtask.delay(*args, \*\*kwargs)
 
         Shortcut to :meth:`apply_async`.
 
-    .. method:: apply_async(args=(), kwargs={}, **options)
+    .. method:: subtask.apply_async(args=(), kwargs={}, ...)
 
         Apply this task asynchronously.
 
@@ -347,12 +341,12 @@ Grouping Tasks
 
         See :meth:`~@Task.apply_async`.
 
-    .. method:: apply(args=(), kwargs={}, **options)
+    .. method:: subtask.apply(args=(), kwargs={}, ...)
 
         Same as :meth:`apply_async` but executed the task inline instead
         of sending a task message.
 
-    .. method:: clone(args=(), kwargs={}, **options)
+    .. method:: subtask.clone(args=(), kwargs={}, ...)
 
         Returns a copy of this subtask.
 
@@ -361,28 +355,32 @@ Grouping Tasks
         :keyword options: Partial options to be merged with the existing
                           options.
 
-    .. method:: replace(args=None, kwargs=None, options=None)
+    .. method:: subtask.replace(args=None, kwargs=None, options=None)
 
         Replace the args, kwargs or options set for this subtask.
         These are only replaced if the selected is not :const:`None`.
 
-    .. method:: link(other_subtask)
+    .. method:: subtask.link(other_subtask)
 
         Add a callback task to be applied if this task
         executes successfully.
 
-    .. method:: link_error(other_subtask)
+        :returns: ``other_subtask`` (to work with :func:`~functools.reduce`).
+
+    .. method:: subtask.link_error(other_subtask)
 
         Add a callback task to be applied if an error occurs
         while executing this task.
 
-    .. method:: set(**options)
+        :returns: ``other_subtask`` (to work with :func:`~functools.reduce`)
+
+    .. method:: subtask.set(...)
 
         Set arbitrary options (same as ``.options.update(...)``).
 
         This is a chaining method call (i.e. it returns itself).
 
-    .. method:: flatten_links()
+    .. method:: subtask.flatten_links()
 
         Gives a recursive list of dependencies (unchain if you will,
         but with links intact).

+ 0 - 11
docs/reference/celery.task.chords.rst

@@ -1,11 +0,0 @@
-======================================================
- celery.task.chords
-======================================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.task.chords
-
-.. automodule:: celery.task.chords
-    :members:
-    :undoc-members:

+ 0 - 11
docs/reference/celery.task.control.rst

@@ -1,11 +0,0 @@
-====================================================
- celery.task.control
-====================================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.task.control
-
-.. automodule:: celery.task.control
-    :members:
-    :undoc-members:

+ 281 - 98
docs/userguide/calling.rst

@@ -1,11 +1,12 @@
 .. _guide-calling:
 
-=================
+===============
  Calling Tasks
-=================
+===============
 
 .. contents::
     :local:
+    :depth: 1
 
 
 .. _calling-basics:
@@ -13,31 +14,77 @@
 Basics
 ======
 
-Calling a task is done using :meth:`~@Task.apply_async`,
-or the meth:`~@Task.delay` shortcut.
+This document describes Celery's uniform "Calling API"
+used by task instances and the :ref:`canvas <guide-canvas>`.
+
+The API defines a standard set of execution options, as well as three methods:
+
+    - ``apply_async(args[, kwargs[, ...]])``
+
+        Sends a task message.
+
+    - ``delay(*args, **kwargs)``
+
+        Shortcut to send a task message, but does not support execution
+        options.
+
+    - ``apply()``
+
+        Does not send a message but executes the task inline instead.
+
+.. _calling-cheat:
+
+.. topic:: Quick Cheat Sheet
+
+    - ``T.delay(arg, kwarg=value)``
+        always a shortcut to ``.apply_async``.
 
-:meth:`~@Task.delay` i convenient as it looks like calling a regular
+    - ``T.apply_async((arg, ), {"kwarg": value})``
+
+    - ``T.apply_async(countdown=10)``
+        executes 10 seconds from now.
+
+    - ``T.apply_async(eta=now + timedelta(seconds=10))``
+        executes 10 seconds from now, specifed using ``eta``
+
+    - ``T.apply_async(countdown=60, expires=120)``
+        executes in one minute from now, but expires after 2 minutes.
+
+    - ``T.apply_async(expires=now + timedelta(days=2))``
+        expires in 2 days, set using :class:`~datetime.datetime`.
+
+
+Example
+-------
+
+The :meth:`~@Task.delay` method is convenient as it looks like calling a regular
 function:
 
 .. code-block:: python
 
-    Task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
+    task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
 
-The same using :meth:`~@Task.apply_async` is written like this:
+Using :meth:`~@Task.apply_async` instead we have to write::
 
 .. code-block:: python
 
-    Task.apply_async(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
+    task.apply_async(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
 
+.. sidebar:: Tip
 
-While `delay` is convenient, it doesn't give you as much control as using
+    If the task is not registered in the current process
+    you can use :meth:`~@send_task` to call the task by name instead.
+
+
+So `delay` is clearly convenient, but it doesn't give you as much control as using
 `apply_async`.  With `apply_async` you can override the execution options
 available as attributes on the :class:`~@Task` class (see :ref:`task-options`).
 In addition you can set countdown/eta, task expiry, provide a custom broker
 connection and more.
 
-Let's go over these in more detail.  All the examples uses a simple task
-called `add`, returning the sum of two positional arguments:
+The rest of this document will go into the task execution
+options in detail.  All examples use a task
+called `add`, returning the sum of two arguments:
 
 .. code-block:: python
 
@@ -45,18 +92,82 @@ called `add`, returning the sum of two positional arguments:
     def add(x, y):
         return x + y
 
-.. note::
 
-    If the task is not registered in the current process
-    you can call it by name.
+.. topic:: There's another way...
 
-    The :meth:`@send_task` method is used for this purpose:
+    You will learn more about this later while reading about the :ref:`Canvas
+    <guide-canvas>`, but :class:`~celery.subtask`'s are objects used to pass around
+    the signature of a task invocation, (for example to send it over the
+    network), and they also support the Calling API:
 
     .. code-block:: python
 
-        >>> result = celery.send_task("tasks.add", [2, 2])
-        >>> result.get()
-        4
+        task.s(arg1, arg2, kwarg1="x", kwargs2="y").apply_async()
+
+.. _calling-links:
+
+Linking (callbacks/errbacks)
+============================
+
+Celery supports linking tasks together so that one task follows another.
+The callback task will be applied with the result of the parent task
+as a partial argument:
+
+.. code-block:: python
+
+    add.apply_async((2, 2), link=add.s(16))
+
+.. sidebar:: What is ``s``?
+
+    The ``add.s`` call used here is called a subtask, we talk
+    more about subtasks in the :ref:`canvas guide <guide-canvas>`,
+    where you can also learn about :class:`~celery.chain`, which
+    is a simpler way to chain tasks together.
+
+    In practice the ``link`` execution option is considered an internal
+    primitive, and you will probably not use it directly, but
+    rather use chains instead.
+
+Here the result of the first task (4) will be sent to a new
+task that adds 16 to the previous result, forming the expression
+:math:`(2 + 2) + 16 = 20`
+
+
+You can also cause a callback to be applied if task raises an exception
+(*errback*), but this behaves differently from a regular callback
+in that it will be passed the id of the parent task, not the result.
+This is because it may not always be possible to serialize
+the exception raised, and so this way the error callback requires
+a result backend to be enabled, and the task must retrieve the result
+of the task instead.
+
+This is an example error callback:
+
+.. code-block:: python
+
+    @celery.task()
+    def error_handler(uuid):
+        result = AsyncResult(uuid)
+        exc = result.get(propagate=False)
+        print("Task %r raised exception: %r\n%r" % (
+              exc, result.traceback))
+
+it can be added to the task using the ``link_error`` execution
+option:
+
+.. code-block:: python
+
+    add.apply_async((2, 2), link_error=error_handler.s())
+
+
+In addition, both the ``link`` and ``link_error`` options can be expressed
+as a list::
+
+    add.apply_async((2, 2), link=[add.s(16), other_task.s()])
+
+The callbacks/errbacks will then be called in order, and all
+callbacks will be called with the return value of the parent task
+as a partial argument.
 
 .. _calling-eta:
 
@@ -69,7 +180,7 @@ a shortcut to set eta by seconds into the future.
 
 .. code-block:: python
 
-    >>> result = add.apply_async(args=[10, 10], countdown=3)
+    >>> result = add.apply_async((2, 2), countdown=3)
     >>> result.get()    # this takes at least 3 seconds to return
     20
 
@@ -77,7 +188,7 @@ The task is guaranteed to be executed at some time *after* the
 specified date and time, but not necessarily at that exact time.
 Possible reasons for broken deadlines may include many items waiting
 in the queue, or heavy network latency.  To make sure your tasks
-are executed in a timely manner you should monitor queue lengths. Use
+are executed in a timely manner you should monitor the queue for congestion. Use
 Munin, or similar tools, to receive alerts, so appropriate action can be
 taken to ease the workload.  See :ref:`monitoring-munin`.
 
@@ -89,8 +200,8 @@ and timezone information):
 
     >>> from datetime import datetime, timedelta
 
-    >>> tomorrow = datetime.now() + timedelta(days=1)
-    >>> add.apply_async(args=[10, 10], eta=tomorrow)
+    >>> tomorrow = datetime.utcnow() + timedelta(days=1)
+    >>> add.apply_async((2, 2), eta=tomorrow)
 
 .. _calling-expiration:
 
@@ -104,26 +215,111 @@ either as seconds after task publish, or a specific date and time using
 .. code-block:: python
 
     >>> # Task expires after one minute from now.
-    >>> add.apply_async(args=[10, 10], expires=60)
+    >>> add.apply_async((10, 10), expires=60)
 
     >>> # Also supports datetime
     >>> from datetime import datetime, timedelta
-    >>> add.apply_async(args=[10, 10], kwargs,
+    >>> add.apply_async((10, 10), kwargs,
     ...                 expires=datetime.now() + timedelta(days=1)
 
 
 When a worker receives an expired task it will mark
 the task as :state:`REVOKED` (:exc:`~@TaskRevokedError`).
 
+.. _calling-retry:
+
+Message Sending Retry
+=====================
+
+Celery will automatically retry sending messages in the event of connection
+failure, and retry behavior can be configured -- like how often to retry, or a maximum
+number of retries -- or disabled all together.
+
+To disable retry you can set the ``retry`` execution option to :const:`False`:
+
+.. code-block:: python
+
+    add.apply_async((2, 2), retry=False)
+
+.. topic:: Related Settings
+
+    .. hlist::
+        :columns: 2
+
+        - :setting:`CELERY_TASK_PUBLISH_RETRY`
+        - :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
+
+
+
+Retry Policy
+------------
+
+A retry policy is a mapping that controls how retries behave,
+and can contain the following keys:
+
+- `max_retries`
+
+    Maximum number of retries before giving up, in this case the
+    exception that caused the retry to fail will be raised.
+
+    A value of 0 or :const:`None` means it will retry forever.
+
+    The default is to retry 3 times.
+
+- `interval_start`
+
+    Defines the number of seconds (float or integer) to wait between
+    retries.  Default is 0, which means the first retry will be
+    instantaneous.
+
+- `interval_step`
+
+    On each consecutive retry this number will be added to the retry
+    delay (float or integer).  Default is 0.2.
+
+- `interval_max`
+
+    Maximum number of seconds (float or integer) to wait between
+    retries.  Default is 0.2.
+
+For example, the default policy correlates to:
+
+.. code-block:: python
+
+    add.apply_async((2, 2), retry=True, retry_policy={
+        "max_retries": 3,
+        "interval_start": 0,
+        "interval_step": 0.2,
+        "interval_max": 0.2,
+    })
+
+the maximum time spent retrying will be 0.4 seconds.  It is set relatively
+short by default because a connection failure could lead to a retry pile effect
+if the broker connection is down: e.g. many web server processes waiting
+to retry blocking other incoming requests.
+
 .. _calling-serializers:
 
 Serializers
 ===========
 
-Data transferred between clients and workers needs to be serialized.
+.. sidebar::  Security
+
+    The pickle module allows for execution of arbitrary functions,
+    please see the :ref:`security guide <guide-security>`.
+
+    Celery also comes with a special serializer that uses
+    cryptography to sign your messages.
+
+Data transferred between clients and workers needs to be serialized,
+so every message in Celery has a ``content_type`` header that
+describes the serialization method used to encode it.
+
 The default serializer is :mod:`pickle`, but you can
-change this globally or for each individual task.
-There is built-in support for :mod:`pickle`, `JSON`, `YAML`
+change this using the :setting:`CELERY_TASK_SERIALIZER` setting,
+or for each individual task, or even per message.
+
+There's built-in support for :mod:`pickle`, `JSON`, `YAML`
 and `msgpack`, and you can also add your own custom serializers by registering
 them into the Kombu serializer registry (see `Kombu: Serialization of Data`_).
 
@@ -180,26 +376,46 @@ The encoding used is available as a message header, so the worker knows how to
 deserialize any task.  If you use a custom serializer, this serializer must
 be available for the worker.
 
-The client uses the following order to decide which serializer
+The following order is used to decide which serializer
 to use when sending a task:
 
-    1. The `serializer` argument to :meth:`~@Task.apply_async`
+    1. The `serializer` execution option.
     2. The :attr:`@-Task.serializer` attribute
-    3. The default :setting:`CELERY_TASK_SERIALIZER` setting.
+    3. The :setting:`CELERY_TASK_SERIALIZER` setting.
 
 
-* Using the `serializer` argument to :meth:`~@Task.apply_async`:
+Example setting a custom serializer for a single task invocation:
 
 .. code-block:: python
 
-    >>> add.apply_async(args=[10, 10], serializer="json")
+    >>> add.apply_async((10, 10), serializer="json")
+
+.. _calling-compression:
+
+Compression
+===========
+
+Celery can compress the messages using either *gzip*, or *bzip2*.
+You can also create your own compression schemes and register
+them in the :func:`kombu compression registry <kombu.compression.register>`.
+
+The following order is used to decide which compression scheme
+to use when sending a task:
+
+    1. The `compression` execution option.
+    2. The :attr:`@-Task.compression` attribute.
+    3. The :setting:`CELERY_MESSAGE_COMPRESSION` attribute.
+
+Example specifying the compression used when calling a task::
+
+    >>> add.apply_async((2, 2), compression="zlib")
 
 .. _calling-connections:
 
 Connections
 ===========
 
-.. admonition:: Automatic Pool Support
+.. sidebar:: Automatic Pool Support
 
     Since version 2.3 there is support for automatic connection pools,
     so you don't have to manually handle connections and publishers
@@ -220,7 +436,7 @@ publisher:
         with add.get_publisher(connection) as publisher:
             try:
                 for args in numbers:
-                    res = add.apply_async(args=args, publisher=publisher)
+                    res = add.apply_async((2, 2), publisher=publisher)
                     results.append(res)
     print([res.get() for res in results])
 
@@ -242,89 +458,56 @@ Though this particular example is much better expressed as a group:
 Routing options
 ===============
 
-Celery uses the AMQP routing mechanisms to route tasks to different workers.
-
-Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
-routing key. Let's look at an example:
-
-Let's pretend we have an application with lot of different tasks: some
-process video, others process images, and some gather collective intelligence
-about its users.  Some of these tasks are more important, so we want to make
-sure the high priority tasks get sent to dedicated nodes.
+Celery can route tasks to different queues.
 
-For the sake of this example we have a single exchange called `tasks`.
-There are different types of exchanges, each type interpreting the routing
-key in different ways, implementing different messaging scenarios.
+Simple routing (name <-> name) is accomplished using the ``queue`` option::
 
-The most common types used with Celery are `direct` and `topic`.
+    add.apply_async(queue="priority.high")
 
-* direct
+You can then assign workers to the ``priority.high`` queue by using
+the workers :option:`-Q` argument::
 
-    Matches the routing key exactly.
+    $ celery worker -l info -Q celery,priority.high
 
-* topic
-
-    In the topic exchange the routing key is made up of words separated by
-    dots (`.`).  Words can be matched by the wild cards `*` and `#`,
-    where `*` matches one exact word, and `#` matches one or many words.
-
-    For example, `*.stock.#` matches the routing keys `usd.stock` and
-    `euro.stock.db` but not `stock.nasdaq`.
-
-We create three queues, `video`, `image` and `lowpri` that binds to
-the `tasks` exchange.  For the queues we use the following binding keys::
-
-    video: video.#
-    image: image.#
-    lowpri: misc.#
-
-Now we can send our tasks to different worker machines, by making the workers
-listen to different queues:
-
-.. code-block:: python
-
-    >>> add.apply_async(args=[filename],
-    ...                               routing_key="video.compress")
-
-    >>> add.apply_async(args=[filename, 360],
-    ...                             routing_key="image.rotate")
+.. seealso::
 
-    >>> add.apply_async(args=[filename, selection],
-    ...                           routing_key="image.crop")
-    >>> add.apply_async(routing_key="misc.recommend")
+    Hard-coding queue names in code is not recommended, the best practice
+    is to use configuration routers (:setting:`CELERY_ROUTES`).
 
+    To find out more about routing, please see :ref:`guide-routing`.
 
-Later, if the crop task is consuming a lot of resources,
-we can bind new workers to handle just the `"image.crop"` task,
-by creating a new queue that binds to `"image.crop`".
+Advanced Options
+----------------
 
-.. seealso::
+These options are for advanced users who want to take use of
+AMQP's full routing capabilities. Interested parties may read the
+:ref:`routing guide <guide-routing>`.
 
-    To find out more about routing, please see :ref:`guide-routing`.
+- exchange
 
-.. _calling-amq-opts:
+    Name of exchange (or a :class:`kombu.entity.Exchange`) to
+    send the message to.
 
-AMQP options
-============
+- routing_key
 
-* mandatory
+    Routing key used to determine.
 
-This sets the delivery to be mandatory.  An exception will be raised
-if there are no running workers able to take on the task.
+- mandatory
 
-Not supported by :mod:`amqplib`.
+    This sets the delivery to be mandatory.  An exception will be raised
+    if there are no running workers able to take on the task.
 
-* immediate
+    Not supported by :mod:`amqplib`.
 
-Request immediate delivery. Will raise an exception
-if the task cannot be routed to a worker immediately.
+- immediate
 
-Not supported by :mod:`amqplib`.
+    Request immediate delivery. Will raise an exception
+    if the task cannot be routed to a worker immediately.
 
-* priority
+    Not supported by :mod:`amqplib`.
 
-A number between `0` and `9`, where `0` is the highest priority.
+- priority
 
-.. note::
+    A number between `0` and `9`, where `0` is the highest priority.
 
-    RabbitMQ does not yet support AMQP priorities.
+    Supported by: redis, beanstalk

+ 10 - 0
docs/userguide/tasks.rst

@@ -308,6 +308,16 @@ General
 
     Please see :ref:`calling-serializers` for more information.
 
+.. attribute:: Task.compression
+
+    A string identifying the default compression scheme to use.
+
+    Defaults to the :setting:`CELERY_MESSAGE_COMPRESSION` setting.
+    Can be `gzip`, or `bzip2`, or any custom compression schemes
+    that have been registered with the :mod:`kombu.compression` registry.
+
+    Please see :ref:`calling-compression` for more information.
+
 .. attribute:: Task.backend
 
     The result store backend to use for this task.  Defaults to the

+ 2 - 20
docs/whatsnew-2.6.rst

@@ -114,26 +114,8 @@ Tasks can now have callbacks and errbacks, and dependencies are recorded
     - ``link`` and ``link_error`` keyword arguments has been added
       to ``apply_async``.
 
-        The value passed can be either a subtask or a list of
-        subtasks:
-
-        .. code-block:: python
-
-            add.apply_async((2, 2), link=mul.subtask())
-            add.apply_async((2, 2), link=[mul.subtask(), echo.subtask()])
-
-        Example error callback:
-
-        .. code-block:: python
-
-            @celery.task()
-            def error_handler(uuid):
-                result = AsyncResult(uuid)
-                exc = result.get(propagate=False)
-                print("Task %r raised exception: %r\n%r" % (
-                    exc, result.traceback))
-
-            >>> add.apply_async((2, 2), link_error=error_handler)
+        These add callbacks and errbacks to the task, and
+        you can read more about them at :ref:`calling-links`.
 
     - We now track what subtasks a task sends, and some result backends
       supports retrieving this information.

+ 18 - 3
docs/xreftest.rst

@@ -4,6 +4,8 @@ xreftest
 Must not be in public docs
 --------------------------
 
+hello, how do you do3
+
 
 ``meth @Task.retry``: :meth:`@Task.retry`
 
@@ -19,6 +21,19 @@ Must not be in public docs
 ``class ~@Celery``: :class:`~@Celery`
 
 
+``meth @Celery.config_from_object``: :meth:`@Celery.config_from_object`
+
+``meth @-Celery.config_from_object``: :meth:`@-Celery.config_from_object`
+
+``meth ~@Celery.config_from_object``: :meth:`~@Celery.config_from_object`
+
+``meth celery.Celery.config_from_object``: :meth:`@Celery.send_task`
+
+:class:`celery.Celery`
+
+:class:`celery.subtask.link`
+
+
 ``attr @amqp``:   :attr:`@amqp`
 
 ``attr @-amqp``:   :attr:`@-amqp`
@@ -26,11 +41,11 @@ Must not be in public docs
 ``attr ~@amqp``:   :attr:`~@amqp`
 
 
-``meth @amqp.get_task_consumer``:  :meth:`@amqp.get_task_consumer`
+``meth @amqp.TaskConsumer``:  :meth:`@amqp.TaskConsumer`
 
-``meth @-amqp.get_task_consumer``: :meth:`@-amqp.get_task_consumer`
+``meth @-amqp.TaskConsumer``: :meth:`@-amqp.TaskConsumer`
 
-``meth ~@amqp.get_task_consumer``: :meth:`~@amqp.get_task_consumer`
+``meth ~@amqp.TaskConsumer``: :meth:`~@amqp.TaskConsumer`
 
 
 ``exc @NotRegistered``: :exc:`@NotRegistered`