|
@@ -1,19 +1,62 @@
|
|
|
-from carrot.connection import DjangoBrokerConnection
|
|
|
+import sys
|
|
|
+import warnings
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from Queue import Queue
|
|
|
+
|
|
|
+from billiard.serialization import pickle
|
|
|
+
|
|
|
from celery import conf
|
|
|
-from celery.messaging import TaskPublisher, TaskConsumer
|
|
|
from celery.log import setup_logger
|
|
|
-from celery.result import TaskSetResult, EagerResult
|
|
|
-from celery.execute import apply_async, delay_task, apply
|
|
|
-from celery.utils import gen_unique_id, get_full_cls_name
|
|
|
+from celery.utils import gen_unique_id, mexpand, timedelta_seconds
|
|
|
+from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
|
|
|
+from celery.execute import apply_async, apply
|
|
|
from celery.registry import tasks
|
|
|
-from celery.serialization import pickle
|
|
|
-from celery.exceptions import MaxRetriesExceededError, RetryTaskError
|
|
|
from celery.backends import default_backend
|
|
|
-from datetime import timedelta
|
|
|
+from celery.messaging import TaskPublisher, TaskConsumer
|
|
|
+from celery.messaging import establish_connection as _establish_connection
|
|
|
+from celery.exceptions import MaxRetriesExceededError, RetryTaskError
|
|
|
+
|
|
|
+
|
|
|
+class TaskType(type):
|
|
|
+ """Metaclass for tasks.
|
|
|
+
|
|
|
+ Automatically registers the task in the task registry, except
|
|
|
+ if the ``abstract`` attribute is set.
|
|
|
+
|
|
|
+ If no ``name`` attribute is provided, the name is automatically
|
|
|
+ set to the name of the module it was defined in, and the class name.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def __new__(cls, name, bases, attrs):
|
|
|
+ super_new = super(TaskType, cls).__new__
|
|
|
+ task_module = attrs["__module__"]
|
|
|
+
|
|
|
+ # Abstract class, remove the abstract attribute so
|
|
|
+ # any class inheriting from this won't be abstract by default.
|
|
|
+ if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
|
|
|
+ return super_new(cls, name, bases, attrs)
|
|
|
+
|
|
|
+ # Automatically generate missing name.
|
|
|
+ if not attrs.get("name"):
|
|
|
+ task_module = sys.modules[task_module]
|
|
|
+ task_name = ".".join([task_module.__name__, name])
|
|
|
+ attrs["name"] = task_name
|
|
|
+
|
|
|
+ # Because of the way import happens (recursively)
|
|
|
+ # we may or may not be the first time the task tries to register
|
|
|
+ # with the framework. There should only be one class for each task
|
|
|
+ # name, so we always return the registered version.
|
|
|
+
|
|
|
+ task_name = attrs["name"]
|
|
|
+ if task_name not in tasks:
|
|
|
+ task_cls = super_new(cls, name, bases, attrs)
|
|
|
+ tasks.register(task_cls)
|
|
|
+ return tasks[task_name].__class__
|
|
|
|
|
|
|
|
|
class Task(object):
|
|
|
- """A task that can be delayed for execution by the ``celery`` daemon.
|
|
|
+ """A celery task.
|
|
|
|
|
|
All subclasses of :class:`Task` must define the :meth:`run` method,
|
|
|
which is the actual method the ``celery`` daemon executes.
|
|
@@ -21,13 +64,11 @@ class Task(object):
|
|
|
The :meth:`run` method can take use of the default keyword arguments,
|
|
|
as listed in the :meth:`run` documentation.
|
|
|
|
|
|
- The :meth:`run` method supports both positional, and keyword arguments.
|
|
|
-
|
|
|
.. attribute:: name
|
|
|
+ Name of the task.
|
|
|
|
|
|
- *REQUIRED* All subclasses of :class:`Task` has to define the
|
|
|
- :attr:`name` attribute. This is the name of the task, registered
|
|
|
- in the task registry, and passed to :func:`delay_task`.
|
|
|
+ .. attribute:: abstract
|
|
|
+ If ``True`` the task is an abstract base class.
|
|
|
|
|
|
.. attribute:: type
|
|
|
|
|
@@ -45,22 +86,17 @@ class Task(object):
|
|
|
|
|
|
.. attribute:: mandatory
|
|
|
|
|
|
- If set, the message has mandatory routing. By default the message
|
|
|
- is silently dropped by the broker if it can't be routed to a queue.
|
|
|
- However - If the message is mandatory, an exception will be raised
|
|
|
- instead.
|
|
|
+ Mandatory message routing. An exception will be raised if the task
|
|
|
+ can't be routed to a queue.
|
|
|
|
|
|
.. attribute:: immediate:
|
|
|
|
|
|
- Request immediate delivery. If the message cannot be routed to a
|
|
|
- task worker immediately, an exception will be raised. This is
|
|
|
- instead of the default behaviour, where the broker will accept and
|
|
|
- queue the message, but with no guarantee that the message will ever
|
|
|
- be consumed.
|
|
|
+ Request immediate delivery. An exception will be raised if the task
|
|
|
+ can't be routed to a worker immediately.
|
|
|
|
|
|
.. attribute:: priority:
|
|
|
-
|
|
|
- The message priority. A number from ``0`` to ``9``.
|
|
|
+ The message priority. A number from ``0`` to ``9``, where ``0`` is the
|
|
|
+ highest. Note that RabbitMQ doesn't support priorities yet.
|
|
|
|
|
|
.. attribute:: max_retries
|
|
|
|
|
@@ -68,16 +104,24 @@ class Task(object):
|
|
|
|
|
|
.. attribute:: default_retry_delay
|
|
|
|
|
|
- Defeault time in seconds before a retry of the task should be
|
|
|
+ Default time in seconds before a retry of the task should be
|
|
|
executed. Default is a 1 minute delay.
|
|
|
|
|
|
+ .. attribute:: rate_limit
|
|
|
+
|
|
|
+ Set the rate limit for this task type, Examples: ``None`` (no rate
|
|
|
+ limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
|
|
|
+ tasks a minute), ``"100/h"`` (hundred tasks an hour)
|
|
|
+
|
|
|
+ .. attribute:: rate_limit_queue_type
|
|
|
+
|
|
|
+ Type of queue used by the rate limiter for this kind of tasks.
|
|
|
+ Default is a :class:`Queue.Queue`, but you can change this to
|
|
|
+ a :class:`Queue.LifoQueue` or an invention of your own.
|
|
|
+
|
|
|
.. attribute:: ignore_result
|
|
|
|
|
|
- Don't store the status and return value. This means you can't
|
|
|
- use the :class:`celery.result.AsyncResult` to check if the task is
|
|
|
- done, or get its return value. Only use if you need the performance
|
|
|
- and is able live without these features. Any exceptions raised will
|
|
|
- store the return value/status as usual.
|
|
|
+ Don't store the return value of this task.
|
|
|
|
|
|
.. attribute:: disable_error_emails
|
|
|
|
|
@@ -86,66 +130,44 @@ class Task(object):
|
|
|
|
|
|
.. attribute:: serializer
|
|
|
|
|
|
- A string identifying the default serialization
|
|
|
- method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
|
|
|
- Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
|
|
|
- methods that have been registered with
|
|
|
- :mod:`carrot.serialization.registry`.
|
|
|
-
|
|
|
- :raises NotImplementedError: if the :attr:`name` attribute is not set.
|
|
|
-
|
|
|
- The resulting class is callable, which if called will apply the
|
|
|
- :meth:`run` method.
|
|
|
-
|
|
|
- Examples
|
|
|
-
|
|
|
- This is a simple task just logging a message,
|
|
|
+ The name of a serializer that has been registered with
|
|
|
+ :mod:`carrot.serialization.registry`. Example: ``"json"``.
|
|
|
|
|
|
- >>> from celery.task import tasks, Task
|
|
|
- >>> class MyTask(Task):
|
|
|
- ...
|
|
|
- ... def run(self, some_arg=None, **kwargs):
|
|
|
- ... logger = self.get_logger(**kwargs)
|
|
|
- ... logger.info("Running MyTask with arg some_arg=%s" %
|
|
|
- ... some_arg))
|
|
|
- ... return 42
|
|
|
- ... tasks.register(MyTask)
|
|
|
+ .. attribute:: backend
|
|
|
|
|
|
- You can delay the task using the classmethod :meth:`delay`...
|
|
|
+ The result store backend used for this task.
|
|
|
|
|
|
- >>> result = MyTask.delay(some_arg="foo")
|
|
|
- >>> result.status # after some time
|
|
|
- 'DONE'
|
|
|
- >>> result.result
|
|
|
- 42
|
|
|
+ .. attribute:: autoregister
|
|
|
+ If ``True`` the task is automatically registered in the task
|
|
|
+ registry, which is the default behaviour.
|
|
|
|
|
|
- ...or using the :func:`delay_task` function, by passing the name of
|
|
|
- the task.
|
|
|
-
|
|
|
- >>> from celery.task import delay_task
|
|
|
- >>> result = delay_task(MyTask.name, some_arg="foo")
|
|
|
|
|
|
+ The resulting class is callable, which if called will apply the
|
|
|
+ :meth:`run` method.
|
|
|
|
|
|
"""
|
|
|
+ __metaclass__ = TaskType
|
|
|
+
|
|
|
name = None
|
|
|
+ abstract = True
|
|
|
+ autoregister = True
|
|
|
type = "regular"
|
|
|
exchange = None
|
|
|
routing_key = None
|
|
|
immediate = False
|
|
|
mandatory = False
|
|
|
priority = None
|
|
|
- ignore_result = False
|
|
|
+ ignore_result = conf.IGNORE_RESULT
|
|
|
disable_error_emails = False
|
|
|
max_retries = 3
|
|
|
default_retry_delay = 3 * 60
|
|
|
serializer = conf.TASK_SERIALIZER
|
|
|
+ rate_limit = conf.DEFAULT_RATE_LIMIT
|
|
|
+ rate_limit_queue_type = Queue
|
|
|
+ backend = default_backend
|
|
|
|
|
|
MaxRetriesExceededError = MaxRetriesExceededError
|
|
|
|
|
|
- def __init__(self):
|
|
|
- if not self.__class__.name:
|
|
|
- self.__class__.name = get_full_cls_name(self.__class__)
|
|
|
-
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
return self.run(*args, **kwargs)
|
|
|
|
|
@@ -156,67 +178,37 @@ class Task(object):
|
|
|
by the worker if the function/method supports them:
|
|
|
|
|
|
* task_id
|
|
|
-
|
|
|
- Unique id of the currently executing task.
|
|
|
-
|
|
|
* task_name
|
|
|
-
|
|
|
- Name of the currently executing task (same as :attr:`name`)
|
|
|
-
|
|
|
* task_retries
|
|
|
-
|
|
|
- How many times the current task has been retried
|
|
|
- (an integer starting at ``0``).
|
|
|
-
|
|
|
* logfile
|
|
|
-
|
|
|
- Name of the worker log file.
|
|
|
-
|
|
|
* loglevel
|
|
|
|
|
|
- The current loglevel, an integer mapping to one of the
|
|
|
- following values: ``logging.DEBUG``, ``logging.INFO``,
|
|
|
- ``logging.ERROR``, ``logging.CRITICAL``, ``logging.WARNING``,
|
|
|
- ``logging.FATAL``.
|
|
|
-
|
|
|
Additional standard keyword arguments may be added in the future.
|
|
|
To take these default arguments, the task can either list the ones
|
|
|
it wants explicitly or just take an arbitrary list of keyword
|
|
|
arguments (\*\*kwargs).
|
|
|
|
|
|
- Example using an explicit list of default arguments to take:
|
|
|
-
|
|
|
- .. code-block:: python
|
|
|
-
|
|
|
- def run(self, x, y, logfile=None, loglevel=None):
|
|
|
- self.get_logger(loglevel=loglevel, logfile=logfile)
|
|
|
- return x * y
|
|
|
-
|
|
|
-
|
|
|
- Example taking all default keyword arguments, and any extra arguments
|
|
|
- passed on by the caller:
|
|
|
-
|
|
|
- .. code-block:: python
|
|
|
-
|
|
|
- def run(self, x, y, **kwargs): # CORRECT!
|
|
|
- logger = self.get_logger(**kwargs)
|
|
|
- adjust = kwargs.get("adjust", 0)
|
|
|
- return x * y - adjust
|
|
|
-
|
|
|
"""
|
|
|
- raise NotImplementedError("Tasks must define a run method.")
|
|
|
+ raise NotImplementedError("Tasks must define the run method.")
|
|
|
|
|
|
- def get_logger(self, **kwargs):
|
|
|
+ @classmethod
|
|
|
+ def get_logger(self, loglevel=None, logfile=None, **kwargs):
|
|
|
"""Get process-aware logger object.
|
|
|
|
|
|
See :func:`celery.log.setup_logger`.
|
|
|
|
|
|
"""
|
|
|
- logfile = kwargs.get("logfile")
|
|
|
- loglevel = kwargs.get("loglevel")
|
|
|
return setup_logger(loglevel=loglevel, logfile=logfile)
|
|
|
|
|
|
- def get_publisher(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
|
|
|
+ @classmethod
|
|
|
+ def establish_connection(self,
|
|
|
+ connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
+ """Establish a connection to the message broker."""
|
|
|
+ return _establish_connection(connect_timeout)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_publisher(self, connection=None, exchange=None,
|
|
|
+ connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
"""Get a celery task message publisher.
|
|
|
|
|
|
:rtype: :class:`celery.messaging.TaskPublisher`.
|
|
@@ -229,13 +221,16 @@ class Task(object):
|
|
|
>>> publisher.connection.close()
|
|
|
|
|
|
"""
|
|
|
-
|
|
|
- connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
|
|
|
+ if exchange is None:
|
|
|
+ exchange = self.exchange
|
|
|
+ connection = connection or self.establish_connection(connect_timeout)
|
|
|
return TaskPublisher(connection=connection,
|
|
|
- exchange=self.exchange,
|
|
|
+ exchange=exchange,
|
|
|
routing_key=self.routing_key)
|
|
|
|
|
|
- def get_consumer(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
|
|
|
+ @classmethod
|
|
|
+ def get_consumer(self, connection=None,
|
|
|
+ connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
"""Get a celery task message consumer.
|
|
|
|
|
|
:rtype: :class:`celery.messaging.TaskConsumer`.
|
|
@@ -248,27 +243,25 @@ class Task(object):
|
|
|
>>> consumer.connection.close()
|
|
|
|
|
|
"""
|
|
|
- connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
|
|
|
+ connection = connection or self.establish_connection(connect_timeout)
|
|
|
return TaskConsumer(connection=connection, exchange=self.exchange,
|
|
|
routing_key=self.routing_key)
|
|
|
|
|
|
@classmethod
|
|
|
- def delay(cls, *args, **kwargs):
|
|
|
- """Delay this task for execution by the ``celery`` daemon(s).
|
|
|
+ def delay(self, *args, **kwargs):
|
|
|
+ """Shortcut to :meth:`apply_async`, with star arguments,
|
|
|
+ but doesn't support the extra options.
|
|
|
|
|
|
:param \*args: positional arguments passed on to the task.
|
|
|
-
|
|
|
:param \*\*kwargs: keyword arguments passed on to the task.
|
|
|
|
|
|
- :rtype: :class:`celery.result.AsyncResult`
|
|
|
-
|
|
|
- See :func:`celery.execute.delay_task`.
|
|
|
+ :returns: :class:`celery.result.AsyncResult`
|
|
|
|
|
|
"""
|
|
|
- return apply_async(cls, args, kwargs)
|
|
|
+ return self.apply_async(args, kwargs)
|
|
|
|
|
|
@classmethod
|
|
|
- def apply_async(cls, args=None, kwargs=None, **options):
|
|
|
+ def apply_async(self, args=None, kwargs=None, **options):
|
|
|
"""Delay this task for execution by the ``celery`` daemon(s).
|
|
|
|
|
|
:param args: positional arguments passed on to the task.
|
|
@@ -282,8 +275,9 @@ class Task(object):
|
|
|
|
|
|
|
|
|
"""
|
|
|
- return apply_async(cls, args, kwargs, **options)
|
|
|
+ return apply_async(self, args, kwargs, **options)
|
|
|
|
|
|
+ @classmethod
|
|
|
def retry(self, args, kwargs, exc=None, throw=True, **options):
|
|
|
"""Retry the task.
|
|
|
|
|
@@ -292,12 +286,12 @@ class Task(object):
|
|
|
:keyword exc: Optional exception to raise instead of
|
|
|
:exc:`MaxRestartsExceededError` when the max restart limit has
|
|
|
been exceeded.
|
|
|
- :keyword throw: Do not raise the
|
|
|
- :exc:`celery.exceptions.RetryTaskError` exception,
|
|
|
- that tells the worker that the task is to be retried.
|
|
|
:keyword countdown: Time in seconds to delay the retry for.
|
|
|
:keyword eta: Explicit time and date to run the retry at (must be a
|
|
|
:class:`datetime.datetime` instance).
|
|
|
+ :keyword throw: If this is ``False``, do not raise the
|
|
|
+ :exc:`celery.exceptions.RetryTaskError` exception,
|
|
|
+ that tells the worker that the task is to be retried.
|
|
|
:keyword \*\*options: Any extra options to pass on to
|
|
|
meth:`apply_async`. See :func:`celery.execute.apply_async`.
|
|
|
|
|
@@ -335,8 +329,7 @@ class Task(object):
|
|
|
if kwargs.get("task_is_eager", False):
|
|
|
result = self.apply(args=args, kwargs=kwargs, **options)
|
|
|
if isinstance(result, EagerResult):
|
|
|
- # get() propogates any exceptions.
|
|
|
- return result.get()
|
|
|
+ return result.get() # propogates exceptions.
|
|
|
return result
|
|
|
|
|
|
self.apply_async(args=args, kwargs=kwargs, **options)
|
|
@@ -345,6 +338,24 @@ class Task(object):
|
|
|
message = "Retry in %d seconds." % options["countdown"]
|
|
|
raise RetryTaskError(message, exc)
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def apply(self, args=None, kwargs=None, **options):
|
|
|
+ """Execute this task at once, by blocking until the task
|
|
|
+ has finished executing.
|
|
|
+
|
|
|
+ :param args: positional arguments passed on to the task.
|
|
|
+ :param kwargs: keyword arguments passed on to the task.
|
|
|
+ :rtype: :class:`celery.result.EagerResult`
|
|
|
+
|
|
|
+ See :func:`celery.execute.apply`.
|
|
|
+
|
|
|
+ """
|
|
|
+ return apply(self, args, kwargs, **options)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def AsyncResult(self, task_id):
|
|
|
+ return BaseAsyncResult(task_id, backend=self.backend)
|
|
|
+
|
|
|
def on_retry(self, exc, task_id, args, kwargs):
|
|
|
"""Retry handler.
|
|
|
|
|
@@ -378,7 +389,7 @@ class Task(object):
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
|
"""Success handler.
|
|
|
|
|
|
- This is run by the worker when the task executed successfully.
|
|
|
+ Run by the worker if the task executes successfully.
|
|
|
|
|
|
:param retval: The return value of the task.
|
|
|
:param task_id: Unique id of the executed task.
|
|
@@ -390,22 +401,6 @@ class Task(object):
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
- @classmethod
|
|
|
- def apply(cls, args=None, kwargs=None, **options):
|
|
|
- """Execute this task at once, by blocking until the task
|
|
|
- has finished executing.
|
|
|
-
|
|
|
- :param args: positional arguments passed on to the task.
|
|
|
-
|
|
|
- :param kwargs: keyword arguments passed on to the task.
|
|
|
-
|
|
|
- :rtype: :class:`celery.result.EagerResult`
|
|
|
-
|
|
|
- See :func:`celery.execute.apply`.
|
|
|
-
|
|
|
- """
|
|
|
- return apply(cls, args, kwargs, **options)
|
|
|
-
|
|
|
|
|
|
class ExecuteRemoteTask(Task):
|
|
|
"""Execute an arbitrary function or object.
|
|
@@ -422,15 +417,11 @@ class ExecuteRemoteTask(Task):
|
|
|
def run(self, ser_callable, fargs, fkwargs, **kwargs):
|
|
|
"""
|
|
|
:param ser_callable: A pickled function or callable object.
|
|
|
-
|
|
|
:param fargs: Positional arguments to apply to the function.
|
|
|
-
|
|
|
:param fkwargs: Keyword arguments to apply to the function.
|
|
|
|
|
|
"""
|
|
|
- callable_ = pickle.loads(ser_callable)
|
|
|
- return callable_(*fargs, **fkwargs)
|
|
|
-tasks.register(ExecuteRemoteTask)
|
|
|
+ return pickle.loads(ser_callable)(*fargs, **fkwargs)
|
|
|
|
|
|
|
|
|
class AsynchronousMapTask(Task):
|
|
@@ -438,11 +429,9 @@ class AsynchronousMapTask(Task):
|
|
|
:meth:`TaskSet.map_async`. """
|
|
|
name = "celery.map_async"
|
|
|
|
|
|
- def run(self, serfunc, args, **kwargs):
|
|
|
- """The method run by ``celeryd``."""
|
|
|
- timeout = kwargs.get("timeout")
|
|
|
- return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
|
|
|
-tasks.register(AsynchronousMapTask)
|
|
|
+ def run(self, ser_callable, args, timeout=None, **kwargs):
|
|
|
+ """:see :meth:`TaskSet.dmap_async`."""
|
|
|
+ return TaskSet.map(pickle.loads(ser_callable), args, timeout=timeout)
|
|
|
|
|
|
|
|
|
class TaskSet(object):
|
|
@@ -472,13 +461,13 @@ class TaskSet(object):
|
|
|
|
|
|
>>> from djangofeeds.tasks import RefreshFeedTask
|
|
|
>>> taskset = TaskSet(RefreshFeedTask, args=[
|
|
|
- ... [], {"feed_url": "http://cnn.com/rss"},
|
|
|
- ... [], {"feed_url": "http://bbc.com/rss"},
|
|
|
- ... [], {"feed_url": "http://xkcd.com/rss"}])
|
|
|
-
|
|
|
- >>> taskset_result = taskset.run()
|
|
|
- >>> list_of_return_values = taskset.join()
|
|
|
+ ... ([], {"feed_url": "http://cnn.com/rss"}),
|
|
|
+ ... ([], {"feed_url": "http://bbc.com/rss"}),
|
|
|
+ ... ([], {"feed_url": "http://xkcd.com/rss"})
|
|
|
+ ... ])
|
|
|
|
|
|
+ >>> taskset_result = taskset.apply_async()
|
|
|
+ >>> list_of_return_values = taskset_result.join()
|
|
|
|
|
|
"""
|
|
|
|
|
@@ -490,22 +479,32 @@ class TaskSet(object):
|
|
|
task_name = task
|
|
|
task_obj = tasks[task_name]
|
|
|
|
|
|
+ # Get task instance
|
|
|
+ task_obj = tasks[task_obj.name]
|
|
|
+
|
|
|
self.task = task_obj
|
|
|
self.task_name = task_name
|
|
|
self.arguments = args
|
|
|
self.total = len(args)
|
|
|
|
|
|
- def run(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
|
|
|
+ def run(self, *args, **kwargs):
|
|
|
+ """Deprecated alias to :meth:`apply_async`"""
|
|
|
+ warnings.warn(PendingDeprecationWarning(
|
|
|
+ "TaskSet.run will be deprecated in favor of TaskSet.apply_async "
|
|
|
+ "in celery v1.2.0"))
|
|
|
+ return self.apply_async(*args, **kwargs)
|
|
|
+
|
|
|
+ def apply_async(self, connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
|
|
|
"""Run all tasks in the taskset.
|
|
|
|
|
|
:returns: A :class:`celery.result.TaskSetResult` instance.
|
|
|
|
|
|
Example
|
|
|
|
|
|
- >>> ts = TaskSet(RefreshFeedTask, [
|
|
|
- ... ["http://foo.com/rss", {}],
|
|
|
- ... ["http://bar.com/rss", {}],
|
|
|
- ... )
|
|
|
+ >>> ts = TaskSet(RefreshFeedTask, args=[
|
|
|
+ ... (["http://foo.com/rss"], {}),
|
|
|
+ ... (["http://bar.com/rss"], {}),
|
|
|
+ ... ])
|
|
|
>>> result = ts.run()
|
|
|
>>> result.taskset_id
|
|
|
"d2c9b261-8eff-4bfb-8459-1e1b72063514"
|
|
@@ -525,44 +524,33 @@ class TaskSet(object):
|
|
|
[True, True]
|
|
|
|
|
|
"""
|
|
|
- taskset_id = gen_unique_id()
|
|
|
+ if conf.ALWAYS_EAGER:
|
|
|
+ return self.apply()
|
|
|
|
|
|
- from celery.conf import ALWAYS_EAGER
|
|
|
- if ALWAYS_EAGER:
|
|
|
- subtasks = [apply(self.task, args, kwargs)
|
|
|
- for args, kwargs in self.arguments]
|
|
|
- return TaskSetResult(taskset_id, subtasks)
|
|
|
-
|
|
|
- conn = DjangoBrokerConnection(connect_timeout=connect_timeout)
|
|
|
- publisher = TaskPublisher(connection=conn,
|
|
|
- exchange=self.task.exchange)
|
|
|
- subtasks = [apply_async(self.task, args, kwargs,
|
|
|
- taskset_id=taskset_id, publisher=publisher)
|
|
|
- for args, kwargs in self.arguments]
|
|
|
- publisher.close()
|
|
|
- conn.close()
|
|
|
+ taskset_id = gen_unique_id()
|
|
|
+ conn = self.task.establish_connection(connect_timeout=connect_timeout)
|
|
|
+ publisher = self.task.get_publisher(connection=conn)
|
|
|
+ try:
|
|
|
+ subtasks = [self.apply_part(arglist, taskset_id, publisher)
|
|
|
+ for arglist in self.arguments]
|
|
|
+ finally:
|
|
|
+ publisher.close()
|
|
|
+ conn.close()
|
|
|
result = TaskSetResult(taskset_id, subtasks)
|
|
|
- default_backend.store_taskset(taskset_id, result)
|
|
|
- return result
|
|
|
-
|
|
|
- def join(self, timeout=None):
|
|
|
- """Gather the results for all of the tasks in the taskset,
|
|
|
- and return a list with them ordered by the order of which they
|
|
|
- were called.
|
|
|
-
|
|
|
- :keyword timeout: The time in seconds, how long
|
|
|
- it will wait for results, before the operation times out.
|
|
|
-
|
|
|
- :raises TimeoutError: if ``timeout`` is not ``None``
|
|
|
- and the operation takes longer than ``timeout`` seconds.
|
|
|
+ self.task.backend.store_taskset(taskset_id, result)
|
|
|
|
|
|
- If any of the tasks raises an exception, the exception
|
|
|
- will be reraised by :meth:`join`.
|
|
|
+ return result
|
|
|
|
|
|
- :returns: list of return values for all tasks in the taskset.
|
|
|
+ def apply_part(self, arglist, taskset_id, publisher):
|
|
|
+ args, kwargs, opts = mexpand(arglist, 3, default={})
|
|
|
+ return apply_async(self.task, args, kwargs,
|
|
|
+ taskset_id=taskset_id, publisher=publisher, **opts)
|
|
|
|
|
|
- """
|
|
|
- return self.run().join(timeout=timeout)
|
|
|
+ def apply(self):
|
|
|
+ taskset_id = gen_unique_id()
|
|
|
+ subtasks = [apply(self.task, args, kwargs)
|
|
|
+ for args, kwargs in self.arguments]
|
|
|
+ return TaskSetResult(taskset_id, subtasks)
|
|
|
|
|
|
@classmethod
|
|
|
def remote_execute(cls, func, args):
|
|
@@ -576,7 +564,7 @@ class TaskSet(object):
|
|
|
def map(cls, func, args, timeout=None):
|
|
|
"""Distribute processing of the arguments and collect the results."""
|
|
|
remote_task = cls.remote_execute(func, args)
|
|
|
- return remote_task.join(timeout=timeout)
|
|
|
+ return remote_task.run().join(timeout=timeout)
|
|
|
|
|
|
@classmethod
|
|
|
def map_async(cls, func, args, timeout=None):
|
|
@@ -593,36 +581,42 @@ class TaskSet(object):
|
|
|
class PeriodicTask(Task):
|
|
|
"""A periodic task is a task that behaves like a :manpage:`cron` job.
|
|
|
|
|
|
+ Results of periodic tasks are not stored by default.
|
|
|
+
|
|
|
.. attribute:: run_every
|
|
|
|
|
|
*REQUIRED* Defines how often the task is run (its interval),
|
|
|
it can be either a :class:`datetime.timedelta` object or an
|
|
|
integer specifying the time in seconds.
|
|
|
|
|
|
+ .. attribute:: relative
|
|
|
+
|
|
|
+ If set to ``True``, run times are relative to the time when the
|
|
|
+ server was started. This was the previous behaviour, periodic tasks
|
|
|
+ are now scheduled by the clock.
|
|
|
+
|
|
|
:raises NotImplementedError: if the :attr:`run_every` attribute is
|
|
|
not defined.
|
|
|
|
|
|
- You have to register the periodic task in the task registry.
|
|
|
-
|
|
|
Example
|
|
|
|
|
|
>>> from celery.task import tasks, PeriodicTask
|
|
|
>>> from datetime import timedelta
|
|
|
>>> class MyPeriodicTask(PeriodicTask):
|
|
|
- ... name = "my_periodic_task"
|
|
|
... run_every = timedelta(seconds=30)
|
|
|
...
|
|
|
... def run(self, **kwargs):
|
|
|
... logger = self.get_logger(**kwargs)
|
|
|
... logger.info("Running MyPeriodicTask")
|
|
|
- >>> tasks.register(MyPeriodicTask)
|
|
|
|
|
|
"""
|
|
|
- run_every = timedelta(days=1)
|
|
|
+ abstract = True
|
|
|
+ ignore_result = True
|
|
|
type = "periodic"
|
|
|
+ relative = False
|
|
|
|
|
|
def __init__(self):
|
|
|
- if not self.run_every:
|
|
|
+ if not hasattr(self, "run_every"):
|
|
|
raise NotImplementedError(
|
|
|
"Periodic tasks must have a run_every attribute")
|
|
|
|
|
@@ -633,3 +627,56 @@ class PeriodicTask(Task):
|
|
|
self.__class__.run_every = timedelta(seconds=self.run_every)
|
|
|
|
|
|
super(PeriodicTask, self).__init__()
|
|
|
+
|
|
|
+ def remaining_estimate(self, last_run_at):
|
|
|
+ """Returns when the periodic task should run next as a timedelta."""
|
|
|
+ next_run_at = last_run_at + self.run_every
|
|
|
+ if not self.relative:
|
|
|
+ next_run_at = self.delta_resolution(next_run_at, self.run_every)
|
|
|
+ return next_run_at - datetime.now()
|
|
|
+
|
|
|
+ def timedelta_seconds(self, delta):
|
|
|
+ """Convert :class:`datetime.timedelta` to seconds.
|
|
|
+
|
|
|
+ Doesn't account for negative timedeltas.
|
|
|
+
|
|
|
+ """
|
|
|
+ return timedelta_seconds(delta)
|
|
|
+
|
|
|
+ def is_due(self, last_run_at):
|
|
|
+ """Returns tuple of two items ``(is_due, next_time_to_run)``,
|
|
|
+ where next time to run is in seconds.
|
|
|
+
|
|
|
+ e.g.
|
|
|
+
|
|
|
+ * ``(True, 20)``, means the task should be run now, and the next
|
|
|
+ time to run is in 20 seconds.
|
|
|
+
|
|
|
+ * ``(False, 12)``, means the task should be run in 12 seconds.
|
|
|
+
|
|
|
+ You can override this to decide the interval at runtime,
|
|
|
+ but keep in mind the value of ``CELERYBEAT_MAX_LOOP_INTERVAL``, which
|
|
|
+ decides the maximum number of seconds celerybeat can sleep between
|
|
|
+ re-checking the periodic task intervals. So if you dynamically change
|
|
|
+ the next run at value, and the max interval is set to 5 minutes, it
|
|
|
+ will take 5 minutes for the change to take effect, so you may
|
|
|
+ consider lowering the value of ``CELERYBEAT_MAX_LOOP_INTERVAL`` if
|
|
|
+ responsiveness if of importance to you.
|
|
|
+
|
|
|
+ """
|
|
|
+ rem_delta = self.remaining_estimate(last_run_at)
|
|
|
+ rem = self.timedelta_seconds(rem_delta)
|
|
|
+ if rem == 0:
|
|
|
+ return True, self.timedelta_seconds(self.run_every)
|
|
|
+ return False, rem
|
|
|
+
|
|
|
+ def delta_resolution(self, dt, delta):
|
|
|
+ resolution = {3: lambda x: x / 86400,
|
|
|
+ 4: lambda x: x / 3600,
|
|
|
+ 5: lambda x: x / 60}
|
|
|
+ args = dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second
|
|
|
+ r = None
|
|
|
+ for res, calc in resolution.items():
|
|
|
+ if calc(self.timedelta_seconds(delta)):
|
|
|
+ r = res
|
|
|
+ return datetime(*args[:r])
|