123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- """
- Working with tasks and task sets.
- """
- from carrot.connection import DjangoAMQPConnection
- from celery.conf import AMQP_CONNECTION_TIMEOUT
- from celery.messaging import TaskPublisher, TaskConsumer
- from celery.log import setup_logger
- from celery.registry import tasks
- from datetime import timedelta
- from celery.backends import default_backend
- from celery.result import AsyncResult, TaskSetResult
- import uuid
- import pickle
- def apply_async(task, args=None, kwargs=None, routing_key=None,
- immediate=None, mandatory=None, connection=None,
- connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None):
- """Run a task asynchronously by the celery daemon(s).
- :param task: The task to run (a callable object, or a :class:`Task`
- instance
- :param args: The positional arguments to pass on to the task (a ``list``).
- :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
- :keyword routing_key: The routing key used to route the task to a worker
- server.
- :keyword immediate: Request immediate delivery. Will raise an exception
- if the task cannot be routed to a worker immediately.
- :keyword mandatory: Mandatory routing. Raises an exception if there's
- no running workers able to take on this task.
- :keyword connection: Re-use existing AMQP connection.
- The ``connect_timeout`` argument is not respected if this is set.
- :keyword connect_timeout: The timeout in seconds, before we give up
- on establishing a connection to the AMQP server.
- :keyword priority: The task priority, a number between ``0`` and ``9``.
- """
- if not args:
- args = []
- if not kwargs:
- kwargs = []
- message_opts = {"routing_key": routing_key,
- "immediate": immediate,
- "mandatory": mandatory,
- "priority": priority}
- for option_name, option_value in message_opts.items():
- message_opts[option_name] = getattr(task, option_name, option_value)
- need_to_close_connection = False
- if not connection:
- connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
- need_to_close_connection = True
- publisher = TaskPublisher(connection=connection)
- task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
- publisher.close()
- if need_to_close_connection:
- connection.close()
- return AsyncResult(task_id)
- def delay_task(task_name, *args, **kwargs):
- """Delay a task for execution by the ``celery`` daemon.
- :param task_name: the name of a task registered in the task registry.
- :param \*args: positional arguments to pass on to the task.
- :param \*\*kwargs: keyword arguments to pass on to the task.
- :raises celery.registry.NotRegistered: exception if no such task
- has been registered in the task registry.
- :rtype: :class:`celery.result.AsyncResult`.
- Example
- >>> r = delay_task("update_record", name="George Constanza", age=32)
- >>> r.ready()
- True
- >>> r.result
- "Record was updated"
- """
- if task_name not in tasks:
- raise tasks.NotRegistered(
- "Task with name %s not registered in the task registry." % (
- task_name))
- task = tasks[task_name]
- return apply_async(task, args, kwargs)
- def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
- """Discard all waiting tasks.
- This will ignore all tasks waiting for execution, and they will
- be deleted from the messaging server.
- :returns: the number of tasks discarded.
- :rtype: int
- """
- amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
- consumer = TaskConsumer(connection=amqp_connection)
- discarded_count = consumer.discard_all()
- amqp_connection.close()
- return discarded_count
- def is_done(task_id):
- """Returns ``True`` if task with ``task_id`` has been executed.
- :rtype: bool
- """
- return default_backend.is_done(task_id)
- class Task(object):
- """A task that can be delayed for execution by the ``celery`` daemon.
- All subclasses of :class:`Task` must define the :meth:`run` method,
- which is the actual method the ``celery`` daemon executes.
- The :meth:`run` method supports both positional, and keyword arguments.
- .. attribute:: name
- *REQUIRED* All subclasses of :class:`Task` has to define the
- :attr:`name` attribute. This is the name of the task, registered
- in the task registry, and passed to :func:`delay_task`.
- .. attribute:: type
- The type of task, currently this can be ``regular``, or ``periodic``,
- however if you want a periodic task, you should subclass
- :class:`PeriodicTask` instead.
- :raises NotImplementedError: if the :attr:`name` attribute is not set.
- The resulting class is callable, which if called will apply the
- :meth:`run` method.
- Examples
- This is a simple task just logging a message,
- >>> from celery.task import tasks, Task
- >>> class MyTask(Task):
- ... name = "mytask"
- ...
- ... def run(self, some_arg=None, **kwargs):
- ... logger = self.get_logger(**kwargs)
- ... logger.info("Running MyTask with arg some_arg=%s" %
- ... some_arg))
- ... return 42
- ... tasks.register(MyTask)
- You can delay the task using the classmethod :meth:`delay`...
- >>> result = MyTask.delay(some_arg="foo")
- >>> result.status # after some time
- 'DONE'
- >>> result.result
- 42
- ...or using the :func:`delay_task` function, by passing the name of
- the task.
- >>> from celery.task import delay_task
- >>> result = delay_task(MyTask.name, some_arg="foo")
- """
- name = None
- type = "regular"
- max_retries = 0 # unlimited
- retry_interval = timedelta(seconds=2)
- auto_retry = False
- routing_key = None
- immediate = False
- mandatory = False
- def __init__(self):
- if not self.name:
- raise NotImplementedError("Tasks must define a name attribute.")
- def __call__(self, *args, **kwargs):
- return self.run(*args, **kwargs)
- def run(self, *args, **kwargs):
- """*REQUIRED* The actual task.
- All subclasses of :class:`Task` must define the run method.
- :raises NotImplementedError: by default, so you have to override
- this method in your subclass.
- """
- raise NotImplementedError("Tasks must define a run method.")
- def get_logger(self, **kwargs):
- """Get process-aware logger object.
- See :func:`celery.log.setup_logger`.
- """
- return setup_logger(**kwargs)
- def get_publisher(self):
- """Get a celery task message publisher.
- :rtype: :class:`celery.messaging.TaskPublisher`.
- Please be sure to close the AMQP connection when you're done
- with this object, i.e.:
- >>> publisher = self.get_publisher()
- >>> # do something with publisher
- >>> publisher.connection.close()
- """
- return TaskPublisher(connection=DjangoAMQPConnection(
- connect_timeout=AMQP_CONNECTION_TIMEOUT))
- def get_consumer(self):
- """Get a celery task message consumer.
- :rtype: :class:`celery.messaging.TaskConsumer`.
- Please be sure to close the AMQP connection when you're done
- with this object. i.e.:
- >>> consumer = self.get_consumer()
- >>> # do something with consumer
- >>> consumer.connection.close()
- """
- return TaskConsumer(connection=DjangoAMQPConnection(
- connect_timeout=AMQP_CONNECTION_TIMEOUT))
- @classmethod
- def delay(cls, *args, **kwargs):
- """Delay this task for execution by the ``celery`` daemon(s).
- :param \*args: positional arguments passed on to the task.
- :param \*\*kwargs: keyword arguments passed on to the task.
- :rtype: :class:`celery.result.AsyncResult`
- See :func:`delay_task`.
- """
- return apply_async(cls, args, kwargs)
- @classmethod
- def apply_async(cls, args=None, kwargs=None, **options):
- """Delay this task for execution by the ``celery`` daemon(s).
- :param args: positional arguments passed on to the task.
- :param kwargs: keyword arguments passed on to the task.
- :rtype: :class:`celery.result.AsyncResult`
- See :func:`apply_async`.
- """
- return apply_async(cls, args, kwargs, **options)
- class TaskSet(object):
- """A task containing several subtasks, making it possible
- to track how many, or when all of the tasks has been completed.
- :param task: The task class or name.
- Can either be a fully qualified task name, or a task class.
- :param args: A list of args, kwargs pairs.
- e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
- .. attribute:: task_name
- The name of the task.
- .. attribute:: arguments
- The arguments, as passed to the task set constructor.
- .. attribute:: total
- Total number of tasks in this task set.
- Example
- >>> from djangofeeds.tasks import RefreshFeedTask
- >>> taskset = TaskSet(RefreshFeedTask, args=[
- ... [], {"feed_url": "http://cnn.com/rss"},
- ... [], {"feed_url": "http://bbc.com/rss"},
- ... [], {"feed_url": "http://xkcd.com/rss"}])
- >>> taskset_result = taskset.run()
- >>> list_of_return_values = taskset.join()
- """
- def __init__(self, task, args):
- try:
- task_name = task.name
- except AttributeError:
- task_name = task
- self.task_name = task_name
- self.arguments = args
- self.total = len(args)
- def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
- """Run all tasks in the taskset.
- :returns: A :class:`celery.result.TaskSetResult` instance.
- Example
- >>> ts = TaskSet(RefreshFeedTask, [
- ... ["http://foo.com/rss", {}],
- ... ["http://bar.com/rss", {}],
- ... )
- >>> result = ts.run()
- >>> result.taskset_id
- "d2c9b261-8eff-4bfb-8459-1e1b72063514"
- >>> result.subtask_ids
- ["b4996460-d959-49c8-aeb9-39c530dcde25",
- "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
- >>> result.waiting()
- True
- >>> time.sleep(10)
- >>> result.ready()
- True
- >>> result.successful()
- True
- >>> result.failed()
- False
- >>> result.join()
- [True, True]
- """
- taskset_id = str(uuid.uuid4())
- conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
- publisher = TaskPublisher(connection=conn)
- subtask_ids = [publisher.delay_task_in_set(task_name=self.task_name,
- taskset_id=taskset_id,
- task_args=arg,
- task_kwargs=kwarg)
- for arg, kwarg in self.arguments]
- publisher.close()
- conn.close()
- return TaskSetResult(taskset_id, subtask_ids)
- def iterate(self):
- """Iterate over the results returned after calling :meth:`run`.
- If any of the tasks raises an exception, the exception will
- be re-raised.
- """
- return iter(self.run())
- def join(self, timeout=None):
- """Gather the results for all of the tasks in the taskset,
- and return a list with them ordered by the order of which they
- were called.
- :keyword timeout: The time in seconds, how long
- it will wait for results, before the operation times out.
- :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
- and the operation takes longer than ``timeout`` seconds.
- If any of the tasks raises an exception, the exception
- will be reraised by :meth:`join`.
- :returns: list of return values for all tasks in the taskset.
- """
- return self.run().join(timeout=timeout)
- @classmethod
- def remote_execute(cls, func, args):
- """Apply ``args`` to function by distributing the args to the
- celery server(s)."""
- pickled = pickle.dumps(func)
- arguments = [[[pickled, arg, {}], {}] for arg in args]
- return cls(ExecuteRemoteTask, arguments)
- @classmethod
- def map(cls, func, args, timeout=None):
- """Distribute processing of the arguments and collect the results."""
- remote_task = cls.remote_execute(func, args)
- return remote_task.join(timeout=timeout)
- @classmethod
- def map_async(cls, func, args, timeout=None):
- """Distribute processing of the arguments and collect the results
- asynchronously.
- :returns: :class:`celery.result.AsyncResult` instance.
- """
- serfunc = pickle.dumps(func)
- return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
- def dmap(func, args, timeout=None):
- """Distribute processing of the arguments and collect the results.
- Example
- >>> from celery.task import map
- >>> import operator
- >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
- [4, 8, 16]
- """
- return TaskSet.map(func, args, timeout=timeout)
- class AsynchronousMapTask(Task):
- """Task used internally by :func:`dmap_async` and
- :meth:`TaskSet.map_async`. """
- name = "celery.map_async"
- def run(self, serfunc, args, **kwargs):
- """The method run by ``celeryd``."""
- timeout = kwargs.get("timeout")
- return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
- tasks.register(AsynchronousMapTask)
- def dmap_async(func, args, timeout=None):
- """Distribute processing of the arguments and collect the results
- asynchronously.
- :returns: :class:`celery.result.AsyncResult` object.
- Example
- >>> from celery.task import dmap_async
- >>> import operator
- >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
- >>> presult
- <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
- >>> presult.status
- 'DONE'
- >>> presult.result
- [4, 8, 16]
- """
- return TaskSet.map_async(func, args, timeout=timeout)
- class PeriodicTask(Task):
- """A periodic task is a task that behaves like a :manpage:`cron` job.
- .. attribute:: run_every
- *REQUIRED* Defines how often the task is run (its interval),
- it can be either a :class:`datetime.timedelta` object or an
- integer specifying the time in seconds.
- :raises NotImplementedError: if the :attr:`run_every` attribute is
- not defined.
- You have to register the periodic task in the task registry.
- Example
- >>> from celery.task import tasks, PeriodicTask
- >>> from datetime import timedelta
- >>> class MyPeriodicTask(PeriodicTask):
- ... name = "my_periodic_task"
- ... run_every = timedelta(seconds=30)
- ...
- ... def run(self, **kwargs):
- ... logger = self.get_logger(**kwargs)
- ... logger.info("Running MyPeriodicTask")
- >>> tasks.register(MyPeriodicTask)
- """
- run_every = timedelta(days=1)
- type = "periodic"
- def __init__(self):
- if not self.run_every:
- raise NotImplementedError(
- "Periodic tasks must have a run_every attribute")
- # If run_every is a integer, convert it to timedelta seconds.
- if isinstance(self.run_every, int):
- self.run_every = timedelta(seconds=self.run_every)
- super(PeriodicTask, self).__init__()
- class ExecuteRemoteTask(Task):
- """Execute an arbitrary function or object.
- *Note* You probably want :func:`execute_remote` instead, which this
- is an internal component of.
- The object must be pickleable, so you can't use lambdas or functions
- defined in the REPL (that is the python shell, or ``ipython``).
- """
- name = "celery.execute_remote"
- def run(self, ser_callable, fargs, fkwargs, **kwargs):
- """
- :param ser_callable: A pickled function or callable object.
- :param fargs: Positional arguments to apply to the function.
- :param fkwargs: Keyword arguments to apply to the function.
- """
- callable_ = pickle.loads(ser_callable)
- return callable_(*fargs, **fkwargs)
- tasks.register(ExecuteRemoteTask)
- def execute_remote(func, *args, **kwargs):
- """Execute arbitrary function/object remotely.
- :param func: A callable function or object.
- :param \*args: Positional arguments to apply to the function.
- :param \*\*kwargs: Keyword arguments to apply to the function.
- The object must be picklable, so you can't use lambdas or functions
- defined in the REPL (the objects must have an associated module).
- :returns: class:`celery.result.AsyncResult`.
- """
- return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
- class DeleteExpiredTaskMetaTask(PeriodicTask):
- """A periodic task that deletes expired task metadata every day.
- This runs the current backend's
- :meth:`celery.backends.base.BaseBackend.cleanup` method.
- """
- name = "celery.delete_expired_task_meta"
- run_every = timedelta(days=1)
- def run(self, **kwargs):
- """The method run by ``celeryd``."""
- logger = self.get_logger(**kwargs)
- logger.info("Deleting expired task meta objects...")
- default_backend.cleanup()
- tasks.register(DeleteExpiredTaskMetaTask)
|