|
@@ -6,6 +6,7 @@
|
|
|
|
|
|
.. contents::
|
|
|
:local:
|
|
|
+ :depth: 1
|
|
|
|
|
|
.. _task-basics:
|
|
|
|
|
@@ -33,9 +34,9 @@ Task options can be specified as arguments to the decorator:
|
|
|
def create_user(username, password):
|
|
|
User.objects.create(username=username, password=password)
|
|
|
|
|
|
-.. admonition:: How do I import the task decorator?
|
|
|
+.. sidebar:: How do I import the task decorator?
|
|
|
|
|
|
- The task decorator is available on your Celery instance,
|
|
|
+ The task decorator is available on your :class:`@Celery` instance,
|
|
|
if you don't know what that is then please read :ref:`first-steps`.
|
|
|
|
|
|
If you're using Django or are still using the "old" module based celery API,
|
|
@@ -47,14 +48,30 @@ Task options can be specified as arguments to the decorator:
|
|
|
def add(x, y):
|
|
|
return x + y
|
|
|
|
|
|
+.. sidebar:: Multiple decorators
|
|
|
+
|
|
|
+ When using multiple decorators in combination with the task
|
|
|
+ decorator you must make sure that the `task`
|
|
|
+ decorator is applied last (which in Python oddly means that it must
|
|
|
+ be the first in the list):
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ @celery.task()
|
|
|
+ @decorator2
|
|
|
+ @decorator1
|
|
|
+ def add(x, y):
|
|
|
+ return x + y
|
|
|
+
|
|
|
.. _task-request-info:
|
|
|
|
|
|
Context
|
|
|
=======
|
|
|
|
|
|
-:attr:`@-Task.request` contains information and state related
|
|
|
-the currently executing task, and always contains the following
|
|
|
-attributes:
|
|
|
+:attr:`~@Task.request` contains information and state related to
|
|
|
+the executing task.
|
|
|
+
|
|
|
+The request defines the following attributes:
|
|
|
|
|
|
:id: The unique id of the executing task.
|
|
|
|
|
@@ -82,13 +99,12 @@ attributes:
|
|
|
to resend the task to the same destination queue.
|
|
|
|
|
|
|
|
|
-Example Usage
|
|
|
--------------
|
|
|
+An example task accessing information in the context is:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
@celery.task()
|
|
|
- def add(x, y):
|
|
|
+ def dump_context(x, y):
|
|
|
print("Executing task id %r, args: %r kwargs: %r" % (
|
|
|
add.request.id, add.request.args, add.request.kwargs))
|
|
|
|
|
@@ -98,23 +114,32 @@ Logging
|
|
|
=======
|
|
|
|
|
|
The worker will automatically set up logging for you, or you can
|
|
|
-configure logging manually. Every task will also have a dedicated
|
|
|
-logger that can be used freely to emit logs from your tasks.
|
|
|
+configure logging manually.
|
|
|
+
|
|
|
+A special logger is available named "celery.task", you can inherit
|
|
|
+from this logger to automatically get the task name and unique id as part
|
|
|
+of the logs.
|
|
|
+
|
|
|
+The best practice is to create a common logger
|
|
|
+for all of your tasks at the top of your module:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
+ from celery.utils.log import get_task_logger
|
|
|
+
|
|
|
+ logger = get_task_logger(__name__)
|
|
|
+
|
|
|
@celery.task()
|
|
|
def add(x, y):
|
|
|
- logger = add.get_logger()
|
|
|
logger.info("Adding %s + %s" % (x, y))
|
|
|
return x + y
|
|
|
|
|
|
-:meth:`@-Task.get_logger` returns a standard Python logger instance,
|
|
|
-for which documentation can be found in the standard library's :mod:`logging`
|
|
|
+Celery uses the standard Python logger library,
|
|
|
+for which documentation can be found in the :mod:`logging`
|
|
|
module.
|
|
|
|
|
|
You can also simply use :func:`print`, as anything written to standard
|
|
|
-out/-err will be redirected to a logger by default (see
|
|
|
+out/-err will be redirected to the workers logs by default (see
|
|
|
:setting:`CELERY_REDIRECT_STDOUTS`).
|
|
|
|
|
|
.. _task-retry:
|
|
@@ -122,8 +147,15 @@ out/-err will be redirected to a logger by default (see
|
|
|
Retrying a task if something fails
|
|
|
==================================
|
|
|
|
|
|
-:meth:`@-Task.retry` can be used to re-send the task, for example in the event
|
|
|
-of temporary failure.
|
|
|
+:meth:`~@Task.retry` can be used to re-execute the task in the event
|
|
|
+of temporary failure, or for any other reason.
|
|
|
+
|
|
|
+A new message will the be sent using the same task-id, and sent
|
|
|
+to the same queue as the current task. This also means that you
|
|
|
+can track retries using the task's result instance (if a result
|
|
|
+backend is enabled).
|
|
|
+
|
|
|
+An example:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -136,9 +168,9 @@ of temporary failure.
|
|
|
raise send_twitter_status.retry(exc=exc)
|
|
|
|
|
|
Here we used the `exc` argument to pass the current exception to
|
|
|
-:meth:`@-Task.retry`. At each step of the retry this exception
|
|
|
+:meth:`~@Task.retry`. At each step of the retry this exception
|
|
|
is available as the tombstone (result) of the task. When
|
|
|
-:attr:`@-Task.max_retries` has been exceeded this is the
|
|
|
+:attr:`~@Task.max_retries` has been exceeded this is the
|
|
|
exception raised. However, if an `exc` argument is not provided the
|
|
|
:exc:`~@RetryTaskError` exception is raised instead.
|
|
|
|
|
@@ -147,7 +179,8 @@ exception raised. However, if an `exc` argument is not provided the
|
|
|
The :meth:`~@Task.retry` call will raise an exception so any code after the retry
|
|
|
will not be reached. This is the :exc:`~@RetryTaskError`
|
|
|
exception, it is not handled as an error but rather as a semi-predicate
|
|
|
- to signify to the worker that the task is to be retried.
|
|
|
+ to signify to the worker that the task is to be retried,
|
|
|
+ so that it can store the correct state when a result backend is enabled.
|
|
|
|
|
|
This is normal operation and always happens unless the
|
|
|
``throw`` argument to retry is set to :const:`False`.
|
|
@@ -157,10 +190,10 @@ exception raised. However, if an `exc` argument is not provided the
|
|
|
Using a custom retry delay
|
|
|
--------------------------
|
|
|
|
|
|
-When a task is to be retried, it will wait for a given amount of time
|
|
|
-before doing so. The default delay is in the
|
|
|
+When a task is to be retried, it can wait for a given amount of time
|
|
|
+before doing so, and the default delay is defined by the
|
|
|
:attr:`~@Task.default_retry_delay`
|
|
|
-attribute on the task. By default this is set to 3 minutes. Note that the
|
|
|
+attribute. By default this is set to 3 minutes. Note that the
|
|
|
unit for setting the delay is in seconds (int or float).
|
|
|
|
|
|
You can also provide the `countdown` argument to :meth:`~@Task.retry` to
|
|
@@ -190,7 +223,7 @@ General
|
|
|
|
|
|
The name the task is registered as.
|
|
|
|
|
|
- You can set this name manually, or just use the default which is
|
|
|
+ You can set this name manually, or a name will be
|
|
|
automatically generated using the module and class name. See
|
|
|
:ref:`task-names`.
|
|
|
|
|
@@ -260,10 +293,10 @@ General
|
|
|
Defaults to the :setting:`CELERY_SEND_TASK_ERROR_EMAILS` setting.
|
|
|
See :ref:`conf-error-mails` for more information.
|
|
|
|
|
|
-.. attribute:: Task.error_whitelist
|
|
|
+.. attribute:: Task.ErrorMail
|
|
|
|
|
|
If the sending of error emails is enabled for this task, then
|
|
|
- this is a white list of exceptions to actually send emails about.
|
|
|
+ this is the class defining the logic to send error mails.
|
|
|
|
|
|
.. attribute:: Task.serializer
|
|
|
|
|
@@ -314,56 +347,6 @@ General
|
|
|
|
|
|
The API reference for :class:`~@Task`.
|
|
|
|
|
|
-.. _task-message-options:
|
|
|
-
|
|
|
-Message and routing options
|
|
|
----------------------------
|
|
|
-
|
|
|
-.. attribute:: Task.queue
|
|
|
-
|
|
|
- Use the routing settings from a queue defined in :setting:`CELERY_QUEUES`.
|
|
|
- If defined the :attr:`exchange` and :attr:`routing_key` options will be
|
|
|
- ignored.
|
|
|
-
|
|
|
-.. attribute:: Task.exchange
|
|
|
-
|
|
|
- Override the global default `exchange` for this task.
|
|
|
-
|
|
|
-.. attribute:: Task.routing_key
|
|
|
-
|
|
|
- Override the global default `routing_key` for this task.
|
|
|
-
|
|
|
-.. attribute:: Task.mandatory
|
|
|
-
|
|
|
- If set, the task message has mandatory routing. By default the task
|
|
|
- is silently dropped by the broker if it can't be routed to a queue.
|
|
|
- However -- If the task is mandatory, an exception will be raised
|
|
|
- instead.
|
|
|
-
|
|
|
- Not supported by amqplib.
|
|
|
-
|
|
|
-.. attribute:: Task.immediate
|
|
|
-
|
|
|
- Request immediate delivery. If the task cannot be routed to a
|
|
|
- task worker immediately, an exception will be raised. This is
|
|
|
- instead of the default behavior, where the broker will accept and
|
|
|
- queue the task, but with no guarantee that the task will ever
|
|
|
- be executed.
|
|
|
-
|
|
|
- Not supported by amqplib.
|
|
|
-
|
|
|
-.. attribute:: Task.priority
|
|
|
-
|
|
|
- The message priority. A number from 0 to 9, where 0 is the
|
|
|
- highest priority.
|
|
|
-
|
|
|
- Only supported by Beanstalk.
|
|
|
-
|
|
|
-.. seealso::
|
|
|
-
|
|
|
- :ref:`executing-routing` for more information about message options,
|
|
|
- and :ref:`guide-routing`.
|
|
|
-
|
|
|
.. _task-names:
|
|
|
|
|
|
Task names
|
|
@@ -385,7 +368,7 @@ For example:
|
|
|
>>> add.name
|
|
|
'sum-of-two-numbers'
|
|
|
|
|
|
-The best practice is to use the module name as a prefix to classify the
|
|
|
+A best practice is to use the module name as a prefix to classify the
|
|
|
tasks using namespaces. This way the name won't collide with the name from
|
|
|
another module:
|
|
|
|
|
@@ -451,25 +434,6 @@ add the project directory to the Python path::
|
|
|
|
|
|
This makes more sense from the reusable app perspective anyway.
|
|
|
|
|
|
-.. _tasks-decorating:
|
|
|
-
|
|
|
-Decorating tasks
|
|
|
-================
|
|
|
-
|
|
|
-When using other decorators you must make sure that the `task`
|
|
|
-decorator is applied last:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- @celery.task()
|
|
|
- @decorator2
|
|
|
- @decorator1
|
|
|
- def add(x, y):
|
|
|
- return x + y
|
|
|
-
|
|
|
-
|
|
|
-Which means the `@celery.task` decorator must be the top statement.
|
|
|
-
|
|
|
.. _task-states:
|
|
|
|
|
|
Task States
|
|
@@ -647,10 +611,12 @@ which defines its own custom :state:`ABORTED` state.
|
|
|
|
|
|
Use :meth:`~@Task.update_state` to update a task's state::
|
|
|
|
|
|
+ from celery import current_task
|
|
|
+
|
|
|
@celery.task()
|
|
|
def upload_files(filenames):
|
|
|
for i, file in enumerate(filenames):
|
|
|
- upload_files.update_state(state="PROGRESS",
|
|
|
+ current_task.update_state(state="PROGRESS",
|
|
|
meta={"current": i, "total": len(filenames)})
|
|
|
|
|
|
|
|
@@ -760,6 +726,8 @@ If you have a task,
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
+ from celery import Task
|
|
|
+
|
|
|
class NaiveAuthenticateServer(Task):
|
|
|
|
|
|
def __init__(self):
|
|
@@ -793,6 +761,8 @@ base class for new task types.
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
+ from celery import Task
|
|
|
+
|
|
|
class DebugTask(Task):
|
|
|
abstract = True
|
|
|
|
|
@@ -808,15 +778,6 @@ base class for new task types.
|
|
|
Handlers
|
|
|
--------
|
|
|
|
|
|
-.. method:: execute(self, request, pool, loglevel, logfile, \*\*kw):
|
|
|
-
|
|
|
- :param request: A :class:`~celery.worker.job.Request`.
|
|
|
- :param pool: The task pool.
|
|
|
- :param loglevel: Current loglevel.
|
|
|
- :param logfile: Name of the currently used logfile.
|
|
|
-
|
|
|
- :keyword consumer: The :class:`~celery.worker.consumer.Consumer`.
|
|
|
-
|
|
|
.. method:: after_return(self, status, retval, task_id, args, kwargs, einfo)
|
|
|
|
|
|
Handler called after the task returns.
|
|
@@ -966,6 +927,9 @@ rate limits:
|
|
|
|
|
|
CELERY_DISABLE_RATE_LIMITS = True
|
|
|
|
|
|
+You find additional optimization tips in the
|
|
|
+:ref:`Optimizing Guide <guide-optimizing>`.
|
|
|
+
|
|
|
.. _task-synchronous-subtasks:
|
|
|
|
|
|
Avoid launching synchronous subtasks
|
|
@@ -1003,41 +967,28 @@ Make your design asynchronous instead, for example by using *callbacks*.
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- @celery.task(ignore_result=True)
|
|
|
def update_page_info(url):
|
|
|
# fetch_page -> parse_page -> store_page
|
|
|
- fetch_page.delay(url, callback=subtask(parse_page,
|
|
|
- callback=subtask(store_page_info)))
|
|
|
+ chain = fetch_page.s() | parse_page.s(url) | store_page_info.s(url)
|
|
|
+ chain.apply_async()
|
|
|
|
|
|
@celery.task(ignore_result=True)
|
|
|
- def fetch_page(url, callback=None):
|
|
|
- page = myhttplib.get(url)
|
|
|
- if callback:
|
|
|
- # The callback may have been serialized with JSON,
|
|
|
- # so best practice is to convert the subtask dict back
|
|
|
- # into a subtask object.
|
|
|
- subtask(callback).delay(url, page)
|
|
|
+ def fetch_page(url):
|
|
|
+ return myhttplib.get(url)
|
|
|
|
|
|
@celery.task(ignore_result=True)
|
|
|
- def parse_page(url, page, callback=None):
|
|
|
- info = myparser.parse_document(page)
|
|
|
- if callback:
|
|
|
- subtask(callback).delay(url, info)
|
|
|
+ def parse_page(url, page):
|
|
|
+ return myparser.parse_document(page)
|
|
|
|
|
|
@celery.task(ignore_result=True)
|
|
|
def store_page_info(url, info):
|
|
|
PageInfo.objects.create(url, info)
|
|
|
|
|
|
|
|
|
-We use :class:`~celery.task.sets.subtask` here to safely pass
|
|
|
-around the callback task. :class:`~celery.task.sets.subtask` is a
|
|
|
-subclass of dict used to wrap the arguments and execution options
|
|
|
-for a single task invocation.
|
|
|
-
|
|
|
-
|
|
|
-.. seealso::
|
|
|
-
|
|
|
- :ref:`sets-subtasks` for more information about subtasks.
|
|
|
+Here we instead create a chain of tasks by linking together
|
|
|
+different :func:`~celery.subtask`'s.
|
|
|
+You can read about chains and other powerful constructs
|
|
|
+at :ref:`designing-work-flows`.
|
|
|
|
|
|
.. _task-performance-and-strategies:
|
|
|
|
|
@@ -1296,6 +1247,8 @@ blog/tasks.py
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
+ import celery
|
|
|
+
|
|
|
from akismet import Akismet
|
|
|
|
|
|
from django.core.exceptions import ImproperlyConfigured
|