|
@@ -1,3 +1,8 @@
|
|
|
+"""
|
|
|
+
|
|
|
+Working with tasks and task sets.
|
|
|
+
|
|
|
+"""
|
|
|
from carrot.connection import DjangoAMQPConnection
|
|
|
from celery.log import setup_logger
|
|
|
from celery.registry import tasks
|
|
@@ -16,17 +21,25 @@ import pickle
|
|
|
def delay_task(task_name, *args, **kwargs):
|
|
|
"""Delay a task for execution by the ``celery`` daemon.
|
|
|
|
|
|
+ :param task_name: the name of a task registered in the task registry.
|
|
|
+
|
|
|
+ :param \*args: positional arguments to pass on to the task.
|
|
|
+
|
|
|
+ :param \*\*kwargs: keyword arguments to pass on to the task.
|
|
|
+
|
|
|
+ :raises celery.registry.NotRegistered: exception if no such task
|
|
|
+ has been registered in the task registry.
|
|
|
+
|
|
|
+ :rtype: :class:`celery.result.AsyncResult`.
|
|
|
+
|
|
|
+ Example
|
|
|
+
|
|
|
>>> r = delay_task("update_record", name="George Constanza", age=32)
|
|
|
>>> r.ready()
|
|
|
True
|
|
|
>>> r.result
|
|
|
"Record was updated"
|
|
|
|
|
|
- Raises :class:`celery.registry.NotRegistered` exception if no such task
|
|
|
- has been registered in the task registry.
|
|
|
-
|
|
|
- :rtype: :class:`celery.result.AsyncResult`.
|
|
|
-
|
|
|
"""
|
|
|
if task_name not in tasks:
|
|
|
raise tasks.NotRegistered(
|
|
@@ -45,7 +58,7 @@ def discard_all():
|
|
|
This will ignore all tasks waiting for execution, and they will
|
|
|
be deleted from the messaging server.
|
|
|
|
|
|
- Returns the number of tasks discarded.
|
|
|
+ :returns: the number of tasks discarded.
|
|
|
|
|
|
:rtype: int
|
|
|
|
|
@@ -58,13 +71,25 @@ def discard_all():
|
|
|
|
|
|
|
|
|
def mark_as_done(task_id, result):
|
|
|
- """Mark task as done (executed)."""
|
|
|
- return default_backend.mark_as_done(task_id, result)
|
|
|
+ """Mark task as done (executed successfully).
|
|
|
+
|
|
|
+ :param task_id: id of the task.
|
|
|
+
|
|
|
+ :param result: the return value of the task.
|
|
|
+
|
|
|
+ """
|
|
|
+ default_backend.mark_as_done(task_id, result)
|
|
|
|
|
|
|
|
|
def mark_as_failure(task_id, exc):
|
|
|
- """Mark task as done (executed)."""
|
|
|
- return default_backend.mark_as_failure(task_id, exc)
|
|
|
+ """Mark task as done (executed).
|
|
|
+
|
|
|
+ :param task_id: id of the task.
|
|
|
+
|
|
|
+ :param exc: the exception instance raised by the task.
|
|
|
+
|
|
|
+ """
|
|
|
+ default_backend.mark_as_failure(task_id, exc)
|
|
|
|
|
|
|
|
|
def is_done(task_id):
|
|
@@ -79,10 +104,29 @@ def is_done(task_id):
|
|
|
class Task(object):
|
|
|
"""A task that can be delayed for execution by the ``celery`` daemon.
|
|
|
|
|
|
- All subclasses of ``Task`` has to define the ``name`` attribute, which is
|
|
|
- the name of the task that can be passed to ``celery.task.delay_task``,
|
|
|
- it also has to define the ``run`` method, which is the actual method the
|
|
|
- ``celery`` daemon executes.
|
|
|
+ All subclasses of :class:`Task` must define the :meth:`run` method,
|
|
|
+ which is the actual method the ``celery`` daemon executes.
|
|
|
+
|
|
|
+ The :meth:`run` method supports both positional, and keyword arguments.
|
|
|
+
|
|
|
+ .. attribute:: name
|
|
|
+
|
|
|
+ *REQUIRED* All subclasses of :class:`Task` has to define the
|
|
|
+ :attr:`name` attribute. This is the name of the task, registered
|
|
|
+ in the task registry, and passed to :func:`delay_task`.
|
|
|
+
|
|
|
+ .. attribute:: type
|
|
|
+
|
|
|
+ The type of task, currently this can be ``regular``, or ``periodic``,
|
|
|
+ however if you want a periodic task, you should subclass
|
|
|
+ :class:`PeriodicTask` instead.
|
|
|
+
|
|
|
+ :raises NotImplementedError: if the :attr:`name` attribute is not set.
|
|
|
+
|
|
|
+ The resulting class is callable, which if called will apply the
|
|
|
+ :meth:`run` method.
|
|
|
+
|
|
|
+ Examples
|
|
|
|
|
|
This is a simple task just logging a message,
|
|
|
|
|
@@ -97,7 +141,7 @@ class Task(object):
|
|
|
... return 42
|
|
|
... tasks.register(MyTask)
|
|
|
|
|
|
- You can delay the task using the classmethod ``delay``...
|
|
|
+ You can delay the task using the classmethod :meth:`delay`...
|
|
|
|
|
|
>>> result = MyTask.delay(some_arg="foo")
|
|
|
>>> result.status # after some time
|
|
@@ -105,11 +149,12 @@ class Task(object):
|
|
|
>>> result.result
|
|
|
42
|
|
|
|
|
|
- ...or using the ``celery.task.delay_task`` function, by passing the
|
|
|
- name of the task.
|
|
|
+ ...or using the :func:`delay_task` function, by passing the name of
|
|
|
+ the task.
|
|
|
|
|
|
>>> from celery.task import delay_task
|
|
|
- >>> delay_task(MyTask.name, some_arg="foo")
|
|
|
+ >>> result = delay_task(MyTask.name, some_arg="foo")
|
|
|
+
|
|
|
|
|
|
"""
|
|
|
name = None
|
|
@@ -123,26 +168,55 @@ class Task(object):
|
|
|
raise NotImplementedError("Tasks must define a name attribute.")
|
|
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
- """The ``__call__`` is called when you do ``Task().run()`` and calls
|
|
|
- the ``run`` method. It also catches any exceptions and logs them."""
|
|
|
return self.run(*args, **kwargs)
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
- """The actual task. All subclasses of :class:`Task` must define
|
|
|
- the run method, if not a ``NotImplementedError`` exception is raised.
|
|
|
+ """*REQUIRED* The actual task.
|
|
|
+
|
|
|
+ All subclasses of :class:`Task` must define the run method.
|
|
|
+
|
|
|
+ :raises NotImplementedError: by default, so you have to override
|
|
|
+ this method in your subclass.
|
|
|
+
|
|
|
"""
|
|
|
raise NotImplementedError("Tasks must define a run method.")
|
|
|
|
|
|
def get_logger(self, **kwargs):
|
|
|
- """Get a process-aware logger object."""
|
|
|
+ """Get process-aware logger object.
|
|
|
+
|
|
|
+ See :func:`celery.log.setup_logger`.
|
|
|
+
|
|
|
+ """
|
|
|
return setup_logger(**kwargs)
|
|
|
|
|
|
def get_publisher(self):
|
|
|
- """Get a celery task message publisher."""
|
|
|
+ """Get a celery task message publisher.
|
|
|
+
|
|
|
+ :rtype: :class:`celery.messaging.TaskPublisher`.
|
|
|
+
|
|
|
+ Please be sure to close the AMQP connection when you're done
|
|
|
+ with this object, i.e:
|
|
|
+
|
|
|
+ >>> publisher = self.get_publisher()
|
|
|
+ >>> # do something with publisher
|
|
|
+ >>> publisher.connection.close()
|
|
|
+
|
|
|
+ """
|
|
|
return TaskPublisher(connection=DjangoAMQPConnection())
|
|
|
|
|
|
def get_consumer(self):
|
|
|
- """Get a celery task message consumer."""
|
|
|
+ """Get a celery task message consumer.
|
|
|
+
|
|
|
+ :rtype: :class:`celery.messaging.TaskConsumer`.
|
|
|
+
|
|
|
+ Please be sure to close the AMQP connection when you're done
|
|
|
+ with this object. i.e:
|
|
|
+
|
|
|
+ >>> consumer = self.get_consumer()
|
|
|
+ >>> # do something with consumer
|
|
|
+ >>> consumer.connection.close()
|
|
|
+
|
|
|
+ """
|
|
|
return TaskConsumer(connection=DjangoAMQPConnection())
|
|
|
|
|
|
def requeue(self, task_id, args, kwargs):
|
|
@@ -156,33 +230,57 @@ class Task(object):
|
|
|
@classmethod
|
|
|
def delay(cls, *args, **kwargs):
|
|
|
"""Delay this task for execution by the ``celery`` daemon(s).
|
|
|
+
|
|
|
+ :param \*args: positional arguments passed on to the task.
|
|
|
+
|
|
|
+ :param \*\*kwargs: keyword arguments passed on to the task.
|
|
|
|
|
|
:rtype: :class:`celery.result.AsyncResult`
|
|
|
|
|
|
+ See :func:`delay_task`.
|
|
|
+
|
|
|
"""
|
|
|
return delay_task(cls.name, *args, **kwargs)
|
|
|
|
|
|
|
|
|
class TaskSet(object):
|
|
|
"""A task containing several subtasks, making it possible
|
|
|
- to track how many, or when all of the tasks are completed.
|
|
|
-
|
|
|
+ to track how many, or when all of the tasks has been completed.
|
|
|
+
|
|
|
+ :param task: The task class or name.
|
|
|
+ Can either be a fully qualified task name, or a task class.
|
|
|
+
|
|
|
+ :param args: A list of args, kwargs pairs.
|
|
|
+ e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
|
|
|
+
|
|
|
+
|
|
|
+ .. attribute:: task_name
|
|
|
+
|
|
|
+ The name of the task.
|
|
|
+
|
|
|
+ .. attribute:: arguments
|
|
|
+
|
|
|
+ The arguments, as passed to the task set constructor.
|
|
|
+
|
|
|
+ .. attribute:: total
|
|
|
+
|
|
|
+ Total number of tasks in this task set.
|
|
|
+
|
|
|
+ Example
|
|
|
+
|
|
|
>>> from djangofeeds.tasks import RefreshFeedTask
|
|
|
>>> taskset = TaskSet(RefreshFeedTask, args=[
|
|
|
- ... {"feed_url": "http://cnn.com/rss"},
|
|
|
- ... {"feed_url": "http://bbc.com/rss"},
|
|
|
- ... {"feed_url": "http://xkcd.com/rss"}])
|
|
|
+ ... [], {"feed_url": "http://cnn.com/rss"},
|
|
|
+ ... [], {"feed_url": "http://bbc.com/rss"},
|
|
|
+ ... [], {"feed_url": "http://xkcd.com/rss"}])
|
|
|
|
|
|
>>> taskset_id, subtask_ids = taskset.run()
|
|
|
+ >>> list_of_return_values = taskset.join()
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
def __init__(self, task, args):
|
|
|
- """``task`` can be either a fully qualified task name, or a task
|
|
|
- class, args is a list of arguments for the subtasks.
|
|
|
- """
|
|
|
-
|
|
|
try:
|
|
|
task_name = task.name
|
|
|
except AttributeError:
|
|
@@ -195,7 +293,12 @@ class TaskSet(object):
|
|
|
def run(self):
|
|
|
"""Run all tasks in the taskset.
|
|
|
|
|
|
- Returns a tuple with the taskset id, and a list of subtask id's.
|
|
|
+ :returns: A tuple containing the taskset id, and a list
|
|
|
+ of subtask ids.
|
|
|
+
|
|
|
+ :rtype: tuple
|
|
|
+
|
|
|
+ Example
|
|
|
|
|
|
>>> ts = RefreshFeeds([
|
|
|
... ["http://foo.com/rss", {}],
|
|
@@ -225,10 +328,11 @@ class TaskSet(object):
|
|
|
return taskset_id, subtask_ids
|
|
|
|
|
|
def iterate(self):
|
|
|
- """Iterate over the results returned after calling ``run()``.
|
|
|
+ """Iterate over the results returned after calling :meth:`run`.
|
|
|
|
|
|
If any of the tasks raises an exception, the exception will
|
|
|
- be reraised by ``iterate``.
|
|
|
+ be re-raised.
|
|
|
+
|
|
|
"""
|
|
|
taskset_id, subtask_ids = self.run()
|
|
|
results = dict([(task_id, AsyncResult(task_id))
|
|
@@ -246,12 +350,16 @@ class TaskSet(object):
|
|
|
and return a list with them ordered by the order of which they
|
|
|
were called.
|
|
|
|
|
|
+ :keyword timeout: The time in seconds, how long
|
|
|
+ it will wait for results, before the operation times out.
|
|
|
+
|
|
|
+ :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
|
|
|
+ and the operation takes longer than ``timeout`` seconds.
|
|
|
+
|
|
|
If any of the tasks raises an exception, the exception
|
|
|
- will be reraised by ``join``.
|
|
|
+ will be reraised by :meth:`join`.
|
|
|
|
|
|
- If ``timeout`` is not ``None`` and the operation takes
|
|
|
- longer than ``timeout`` seconds, it will raise
|
|
|
- the :class:`celery.timer.TimeoutError` exception.
|
|
|
+ :returns: list of return values for all tasks in the taskset.
|
|
|
|
|
|
"""
|
|
|
timeout_timer = TimeoutTimer(timeout) # Timeout timer starts here.
|
|
@@ -292,7 +400,7 @@ class TaskSet(object):
|
|
|
"""Distribute processing of the arguments and collect the results
|
|
|
asynchronously.
|
|
|
|
|
|
- Returns :class:`celery.result.AsyncResult` instance.
|
|
|
+ :returns: :class:`celery.result.AsyncResult` instance.
|
|
|
|
|
|
"""
|
|
|
serfunc = pickle.dumps(func)
|
|
@@ -302,6 +410,8 @@ class TaskSet(object):
|
|
|
def dmap(func, args, timeout=None):
|
|
|
"""Distribute processing of the arguments and collect the results.
|
|
|
|
|
|
+ Example
|
|
|
+
|
|
|
>>> from celery.task import map
|
|
|
>>> import operator
|
|
|
>>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
|
|
@@ -312,7 +422,8 @@ def dmap(func, args, timeout=None):
|
|
|
|
|
|
|
|
|
class AsynchronousMapTask(Task):
|
|
|
- """Task used internally by ``dmap_async``."""
|
|
|
+ """Task used internally by :func:`dmap_async` and
|
|
|
+ :meth:`TaskSet.map_async`. """
|
|
|
name = "celery.map_async"
|
|
|
|
|
|
def run(self, serfunc, args, **kwargs):
|
|
@@ -325,7 +436,9 @@ def dmap_async(func, args, timeout=None):
|
|
|
"""Distribute processing of the arguments and collect the results
|
|
|
asynchronously.
|
|
|
|
|
|
- Returns a :class:`celery.result.AsyncResult` object.
|
|
|
+ :returns: :class:`celery.result.AsyncResult` object.
|
|
|
+
|
|
|
+ Example
|
|
|
|
|
|
>>> from celery.task import dmap_async
|
|
|
>>> import operator
|
|
@@ -342,14 +455,21 @@ def dmap_async(func, args, timeout=None):
|
|
|
|
|
|
|
|
|
class PeriodicTask(Task):
|
|
|
- """A periodic task is a task that behaves like a cron job.
|
|
|
+ """A periodic task is a task that behaves like a :manpage:`cron` job.
|
|
|
|
|
|
- The ``run_every`` attribute defines how often the task is run (its
|
|
|
- interval), it can be either a ``datetime.timedelta`` object or a integer
|
|
|
- specifying the time in seconds.
|
|
|
+ .. attribute:: run_every
|
|
|
+
|
|
|
+ *REQUIRED* Defines how often the task is run (its interval),
|
|
|
+ it can be either a :class:`datetime.timedelta` object or an
|
|
|
+ integer specifying the time in seconds.
|
|
|
+
|
|
|
+ :raises NotImplementedError: if the :attr:`run_every` attribute is
|
|
|
+ not defined.
|
|
|
|
|
|
You have to register the periodic task in the task registry.
|
|
|
|
|
|
+ Example
|
|
|
+
|
|
|
>>> from celery.task import tasks, PeriodicTask
|
|
|
>>> from datetime import timedelta
|
|
|
>>> class MyPeriodicTask(PeriodicTask):
|
|
@@ -378,17 +498,26 @@ class PeriodicTask(Task):
|
|
|
|
|
|
|
|
|
class ExecuteRemoteTask(Task):
|
|
|
- """Execute arbitrary function/object.
|
|
|
+ """Execute an arbitrary function or object.
|
|
|
+
|
|
|
+ *Note* You probably want :func:`execute_remote` instead, which this
|
|
|
+ is an internal component of.
|
|
|
|
|
|
The object must be pickleable, so you can't use lambdas or functions
|
|
|
- defined in the REPL.
|
|
|
+ defined in the REPL (that is the python shell, or ``ipython``).
|
|
|
|
|
|
"""
|
|
|
name = "celery.execute_remote"
|
|
|
|
|
|
def run(self, ser_callable, fargs, fkwargs, **kwargs):
|
|
|
- """Execute the pickled ``ser_callable``, with ``fargs`` as positional
|
|
|
- arguments and ``fkwargs`` as keyword arguments."""
|
|
|
+ """
|
|
|
+ :param ser_callable: A pickled function or callable object.
|
|
|
+
|
|
|
+ :param fargs: Positional arguments to apply to the function.
|
|
|
+
|
|
|
+ :param fkwargs: Keyword arguments to apply to the function.
|
|
|
+
|
|
|
+ """
|
|
|
callable_ = pickle.loads(ser_callable)
|
|
|
return callable_(*fargs, **fkwargs)
|
|
|
tasks.register(ExecuteRemoteTask)
|
|
@@ -396,11 +525,17 @@ tasks.register(ExecuteRemoteTask)
|
|
|
|
|
|
def execute_remote(func, *args, **kwargs):
|
|
|
"""Execute arbitrary function/object remotely.
|
|
|
+
|
|
|
+ :param func: A callable function or object.
|
|
|
+
|
|
|
+ :param \*args: Positional arguments to apply to the function.
|
|
|
+
|
|
|
+ :param \*\*kwargs: Keyword arguments to apply to the function.
|
|
|
|
|
|
The object must be picklable, so you can't use lambdas or functions
|
|
|
defined in the REPL (the objects must have an associated module).
|
|
|
|
|
|
- :rtype: :class:`celery.result.AsyncResult`
|
|
|
+ :returns: class:`celery.result.AsyncResult`.
|
|
|
|
|
|
"""
|
|
|
return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
|
|
@@ -409,7 +544,9 @@ def execute_remote(func, *args, **kwargs):
|
|
|
class DeleteExpiredTaskMetaTask(PeriodicTask):
|
|
|
"""A periodic task that deletes expired task metadata every day.
|
|
|
|
|
|
- This runs the current backend's cleanup() method.
|
|
|
+ This runs the current backend's
|
|
|
+ :meth:`celery.backends.base.BaseBackend.cleanup` method.
|
|
|
+
|
|
|
"""
|
|
|
name = "celery.delete_expired_task_meta"
|
|
|
run_every = timedelta(days=1)
|