|
@@ -11,6 +11,13 @@ import traceback
|
|
|
|
|
|
|
|
|
def delay_task(task_name, **kwargs):
|
|
|
+ """Delay a task for execution by the ``celery`` daemon.
|
|
|
+
|
|
|
+ Examples
|
|
|
+ --------
|
|
|
+ >>> delay_task("update_record", name="George Constanza", age=32)
|
|
|
+
|
|
|
+ """
|
|
|
if task_name not in tasks:
|
|
|
raise tasks.NotRegistered(
|
|
|
"Task with name %s not registered in the task registry." % (
|
|
@@ -22,6 +29,14 @@ def delay_task(task_name, **kwargs):
|
|
|
|
|
|
|
|
|
def discard_all():
|
|
|
+ """Discard all waiting tasks.
|
|
|
+
|
|
|
+ This will ignore all tasks waiting for execution, and they will
|
|
|
+ be deleted from the messaging server.
|
|
|
+
|
|
|
+ Returns the number of tasks discarded.
|
|
|
+
|
|
|
+ """
|
|
|
consumer = TaskConsumer(connection=DjangoAMQPConnection)
|
|
|
discarded_count = consumer.discard_all()
|
|
|
consumer.close()
|
|
@@ -29,10 +44,18 @@ def discard_all():
|
|
|
|
|
|
|
|
|
def gen_task_done_cache_key(task_id):
|
|
|
+ """Generate a cache key for marking a task as done."""
|
|
|
return "celery-task-done-marker-%s" % task_id
|
|
|
|
|
|
|
|
|
def mark_as_done(task_id, result):
|
|
|
+ """Mark task as done (executed).
|
|
|
+
|
|
|
+ if ``settings.TASK_META_USE_DB`` is ``True``, this will
|
|
|
+ use the :class:`celery.models.TaskMeta` model, if not memcached
|
|
|
+ is used.
|
|
|
+
|
|
|
+ """
|
|
|
if result is None:
|
|
|
result = True
|
|
|
if TASK_META_USE_DB:
|
|
@@ -43,14 +66,49 @@ def mark_as_done(task_id, result):
|
|
|
|
|
|
|
|
|
def is_done(task_id):
|
|
|
+ """Returns ``True`` if task with ``task_id`` has been executed."""
|
|
|
if TASK_META_USE_DB:
|
|
|
return TaskMeta.objects.is_done(task_id)
|
|
|
else:
|
|
|
cache_key = gen_task_done_cache_key(task_id)
|
|
|
- return cache.get(cache_key)
|
|
|
+ return bool(cache.get(cache_key))
|
|
|
|
|
|
|
|
|
class Task(object):
|
|
|
+ """A task that can be delayed for execution by the ``celery`` daemon.
|
|
|
+
|
|
|
+ All subclasses of ``Task`` has to define the ``name`` attribute, which is
|
|
|
+ the name of the task that can be passed to ``celery.task.delay_task``,
|
|
|
+ it also has to define the ``run`` method, which is the actual method the
|
|
|
+ ``celery`` daemon executes. This method does not support positional
|
|
|
+ arguments, only keyword arguments.
|
|
|
+
|
|
|
+ Examples
|
|
|
+ --------
|
|
|
+
|
|
|
+ This is a simple task just logging a message,
|
|
|
+
|
|
|
+ >>> from celery.task import tasks, Task
|
|
|
+ >>> class MyTask(Task):
|
|
|
+ ... name = "mytask"
|
|
|
+ ...
|
|
|
+ ... def run(self, some_arg=None, **kwargs):
|
|
|
+ ... logger = self.get_logger(**kwargs)
|
|
|
+ ... logger.info("Running MyTask with arg some_arg=%s" %
|
|
|
+ ... some_arg))
|
|
|
+ ... tasks.register(MyTask)
|
|
|
+
|
|
|
+ You can delay the task using the classmethod ``delay``...
|
|
|
+
|
|
|
+ >>> MyTask.delay(some_arg="foo")
|
|
|
+
|
|
|
+ ...or using the ``celery.task.delay_task`` function, by passing the
|
|
|
+ name of the task.
|
|
|
+
|
|
|
+ >>> from celery.task import delay_task
|
|
|
+ >>> delay_task(MyTask.name, some_arg="foo")
|
|
|
+
|
|
|
+ """
|
|
|
name = None
|
|
|
type = "regular"
|
|
|
|
|
@@ -59,6 +117,8 @@ class Task(object):
|
|
|
raise NotImplementedError("Tasks must define a name attribute.")
|
|
|
|
|
|
def __call__(self, **kwargs):
|
|
|
+ """The ``__call__`` is called when you do ``Task().run()`` and calls
|
|
|
+ the ``run`` method. It also catches any exceptions and logs them."""
|
|
|
try:
|
|
|
retval = self.run(**kwargs)
|
|
|
except Exception, e:
|
|
@@ -70,6 +130,9 @@ class Task(object):
|
|
|
return retval
|
|
|
|
|
|
def run(self, **kwargs):
|
|
|
+ """The actual task. All subclasses of :class:`Task` must define
|
|
|
+ the run method, if not a ``NotImplementedError`` exception is raised.
|
|
|
+ """
|
|
|
raise NotImplementedError("Tasks must define a run method.")
|
|
|
|
|
|
def get_logger(self, **kwargs):
|
|
@@ -86,6 +149,7 @@ class Task(object):
|
|
|
|
|
|
@classmethod
|
|
|
def delay(cls, **kwargs):
|
|
|
+ """Delay this task for execution by the ``celery`` daemon(s)."""
|
|
|
return delay_task(cls.name, **kwargs)
|
|
|
|
|
|
|
|
@@ -102,7 +166,7 @@ class TaskSet(object):
|
|
|
... {"feed_url": "http://bbc.com/rss"},
|
|
|
... {"feed_url": "http://xkcd.com/rss"}])
|
|
|
|
|
|
- >>> taskset_id = taskset.delay()
|
|
|
+ >>> taskset_id, subtask_ids = taskset.run()
|
|
|
|
|
|
|
|
|
"""
|
|
@@ -122,6 +186,23 @@ class TaskSet(object):
|
|
|
self.total = len(args)
|
|
|
|
|
|
def run(self):
|
|
|
+ """Run all tasks in the taskset.
|
|
|
+
|
|
|
+ Returns a tuple with the taskset id, and a list of subtask id's.
|
|
|
+
|
|
|
+ Examples
|
|
|
+ --------
|
|
|
+ >>> ts = RefreshFeeds(["http://foo.com/rss", http://bar.com/rss"])
|
|
|
+ >>> taskset_id, subtask_ids = ts.run()
|
|
|
+ >>> taskset_id
|
|
|
+ "d2c9b261-8eff-4bfb-8459-1e1b72063514"
|
|
|
+ >>> subtask_ids
|
|
|
+ ["b4996460-d959-49c8-aeb9-39c530dcde25",
|
|
|
+ "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
|
|
|
+ >>> time.sleep(10)
|
|
|
+ >>> is_done(taskset_id)
|
|
|
+ True
|
|
|
+ """
|
|
|
taskset_id = str(uuid.uuid4())
|
|
|
publisher = TaskPublisher(connection=DjangoAMQPConnection)
|
|
|
subtask_ids = []
|
|
@@ -135,6 +216,29 @@ class TaskSet(object):
|
|
|
|
|
|
|
|
|
class PeriodicTask(Task):
|
|
|
+ """A periodic task is a task that behaves like a cron job.
|
|
|
+
|
|
|
+ The ``run_every`` attribute defines how often the task is run (its
|
|
|
+ interval), it can be either a ``datetime.timedelta`` object or a integer
|
|
|
+ specifying the time in seconds.
|
|
|
+
|
|
|
+ You have to register the periodic task in the task registry.
|
|
|
+
|
|
|
+ Examples
|
|
|
+ --------
|
|
|
+
|
|
|
+ >>> from celery.task import tasks, PeriodicTask
|
|
|
+ >>> from datetime import timedelta
|
|
|
+ >>> class MyPeriodicTask(PeriodicTask):
|
|
|
+ ... name = "my_periodic_task"
|
|
|
+ ... run_every = timedelta(seconds=30)
|
|
|
+ ...
|
|
|
+ ... def run(self, **kwargs):
|
|
|
+ ... logger = self.get_logger(**kwargs)
|
|
|
+ ... logger.info("Running MyPeriodicTask")
|
|
|
+ >>> tasks.register(MyPeriodicTask)
|
|
|
+
|
|
|
+ """
|
|
|
run_every = timedelta(days=1)
|
|
|
type = "periodic"
|
|
|
|
|
@@ -151,6 +255,7 @@ class PeriodicTask(Task):
|
|
|
|
|
|
|
|
|
class TestTask(Task):
|
|
|
+ """A simple test task that just logs something."""
|
|
|
name = "celery.test_task"
|
|
|
|
|
|
def run(self, some_arg, **kwargs):
|
|
@@ -160,6 +265,10 @@ tasks.register(TestTask)
|
|
|
|
|
|
|
|
|
class DeleteExpiredTaskMetaTask(PeriodicTask):
|
|
|
+ """A periodic task that deletes expired task metadata every day.
|
|
|
+
|
|
|
+ It's only registered if ``settings.CELERY_TASK_META_USE_DB`` is set.
|
|
|
+ """
|
|
|
name = "celery.delete_expired_task_meta"
|
|
|
run_every = timedelta(days=1)
|
|
|
|