Defining Tasks - celery.task.base

class celery.task.base.AsynchronousMapTask

Task used internally by dmap_async() and TaskSet.map_async().

run(serfunc, args, **kwargs)
The method run by celeryd.
class celery.task.base.ExecuteRemoteTask

Execute an arbitrary function or object.

Note You probably want execute_remote() instead, which this is an internal component of.

The object must be pickleable, so you can’t use lambdas or functions defined in the REPL (that is the python shell, or ipython).

run(ser_callable, fargs, fkwargs, **kwargs)
Parameters:
  • ser_callable – A pickled function or callable object.
  • fargs – Positional arguments to apply to the function.
  • fkwargs – Keyword arguments to apply to the function.
class celery.task.base.PeriodicTask

A periodic task is a task that behaves like a cron job.

run_every
REQUIRED Defines how often the task is run (its interval), it can be either a datetime.timedelta object or an integer specifying the time in seconds.
Raises NotImplementedError:
 if the run_every attribute is not defined.

You have to register the periodic task in the task registry.

Example

>>> from celery.task import tasks, PeriodicTask
>>> from datetime import timedelta
>>> class MyPeriodicTask(PeriodicTask):
...     name = "my_periodic_task"
...     run_every = timedelta(seconds=30)
...
...     def run(self, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Running MyPeriodicTask")
>>> tasks.register(MyPeriodicTask)
class celery.task.base.Task

A task that can be delayed for execution by the celery daemon.

All subclasses of Task must define the run() method, which is the actual method the celery daemon executes.

The run() method supports both positional, and keyword arguments.

name
REQUIRED All subclasses of Task has to define the name attribute. This is the name of the task, registered in the task registry, and passed to delay_task().
type
The type of task, currently this can be regular, or periodic, however if you want a periodic task, you should subclass PeriodicTask instead.
routing_key
Override the global default routing_key for this task.
exchange
Override the global default exchange for this task.
mandatory
If set, the message has mandatory routing. By default the message is silently dropped by the broker if it can’t be routed to a queue. However - If the message is mandatory, an exception will be raised instead.
immediate:
Request immediate delivery. If the message cannot be routed to a task worker immediately, an exception will be raised. This is instead of the default behaviour, where the broker will accept and queue the message, but with no guarantee that the message will ever be consumed.
priority:
The message priority. A number from 0 to 9.
ignore_result
Don’t store the status and return value. This means you can’t use the celery.result.AsyncResult to check if the task is done, or get its return value. Only use if you need the performance and is able live without these features. Any exceptions raised will store the return value/status as usual.
disable_error_emails
Disable all error e-mails for this task (only applicable if settings.SEND_CELERY_ERROR_EMAILS is on.)
Raises NotImplementedError:
 if the name attribute is not set.

The resulting class is callable, which if called will apply the run() method.

Examples

This is a simple task just logging a message,

>>> from celery.task import tasks, Task
>>> class MyTask(Task):
...
...     def run(self, some_arg=None, **kwargs):
...         logger = self.get_logger(**kwargs)
...         logger.info("Running MyTask with arg some_arg=%s" %
...                     some_arg))
...         return 42
... tasks.register(MyTask)

You can delay the task using the classmethod delay()...

>>> result = MyTask.delay(some_arg="foo")
>>> result.status # after some time
'DONE'
>>> result.result
42

...or using the delay_task() function, by passing the name of the task.

>>> from celery.task import delay_task
>>> result = delay_task(MyTask.name, some_arg="foo")
classmethod apply(args=None, kwargs=None, **options)

Execute this task at once, by blocking until the task has finished executing.

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
Return type:

celery.result.EagerResult

See celery.execute.apply().

classmethod apply_async(args=None, kwargs=None, **options)

Delay this task for execution by the celery daemon(s).

Parameters:
  • args – positional arguments passed on to the task.
  • kwargs – keyword arguments passed on to the task.
Return type:

celery.result.AsyncResult

See celery.execute.apply_async().

classmethod delay(*args, **kwargs)

Delay this task for execution by the celery daemon(s).

Parameters:
  • *args – positional arguments passed on to the task.
  • **kwargs – keyword arguments passed on to the task.
Return type:

celery.result.AsyncResult

See celery.execute.delay_task().

get_consumer(connect_timeout=4)

Get a celery task message consumer.

Return type:celery.messaging.TaskConsumer.

Please be sure to close the AMQP connection when you’re done with this object. i.e.:

>>> consumer = self.get_consumer()
>>> # do something with consumer
>>> consumer.connection.close()
get_logger(**kwargs)

Get process-aware logger object.

See celery.log.setup_logger().

get_publisher(connect_timeout=4)

Get a celery task message publisher.

Return type:celery.messaging.TaskPublisher.

Please be sure to close the AMQP connection when you’re done with this object, i.e.:

>>> publisher = self.get_publisher()
>>> # do something with publisher
>>> publisher.connection.close()
run(*args, **kwargs)

REQUIRED The actual task.

All subclasses of Task must define the run method.

Raises NotImplementedError:
 by default, so you have to override this method in your subclass.
class celery.task.base.TaskSet(task, args)

A task containing several subtasks, making it possible to track how many, or when all of the tasks has been completed.

Parameters:
  • task – The task class or name. Can either be a fully qualified task name, or a task class.
  • args – A list of args, kwargs pairs. e.g. [[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]
task_name
The name of the task.
arguments
The arguments, as passed to the task set constructor.
total
Total number of tasks in this task set.

Example

>>> from djangofeeds.tasks import RefreshFeedTask
>>> taskset = TaskSet(RefreshFeedTask, args=[
...                 [], {"feed_url": "http://cnn.com/rss"},
...                 [], {"feed_url": "http://bbc.com/rss"},
...                 [], {"feed_url": "http://xkcd.com/rss"}])
>>> taskset_result = taskset.run()
>>> list_of_return_values = taskset.join()
join(timeout=None)

Gather the results for all of the tasks in the taskset, and return a list with them ordered by the order of which they were called.

Parameter:timeout – The time in seconds, how long it will wait for results, before the operation times out.
Raises TimeoutError:
 if timeout is not None and the operation takes longer than timeout seconds.

If any of the tasks raises an exception, the exception will be reraised by join().

Returns:list of return values for all tasks in the taskset.
classmethod map(func, args, timeout=None)
Distribute processing of the arguments and collect the results.
classmethod map_async(func, args, timeout=None)

Distribute processing of the arguments and collect the results asynchronously.

Returns:celery.result.AsyncResult instance.
classmethod remote_execute(func, args)
Apply args to function by distributing the args to the celery server(s).
run(connect_timeout=4)

Run all tasks in the taskset.

Returns:A celery.result.TaskSetResult instance.

Example

>>> ts = TaskSet(RefreshFeedTask, [
...         ["http://foo.com/rss", {}],
...         ["http://bar.com/rss", {}],
... )
>>> result = ts.run()
>>> result.taskset_id
"d2c9b261-8eff-4bfb-8459-1e1b72063514"
>>> result.subtask_ids
["b4996460-d959-49c8-aeb9-39c530dcde25",
"598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
>>> result.waiting()
True
>>> time.sleep(10)
>>> result.ready()
True
>>> result.successful()
True
>>> result.failed()
False
>>> result.join()
[True, True]

Previous topic

Module API Reference

Next topic

Executing Tasks - celery.execute

This Page