Ver Fonte

Task Retries (not tested yet) Example:

    >>> class PostTwitterUpdateTask(Task):
    ...
    ...     def run(self, username, password, message, **kwargs):
    ...         twitter = Twitter(username, password)
    ...         try:
    ...             twitter.post_update(message)
    ...         except twitter.FailWhale, exc:
    ...             # Retry in 5 minutes.
    ...             self.retry([username, password, message], kwargs,
    ...                        countdown=60 * 5, exc=exc)
Ask Solem há 15 anos atrás
pai
commit
794582ebb2
4 ficheiros alterados com 52 adições e 8 exclusões
  1. 3 1
      celery/execute.py
  2. 0 5
      celery/messaging.py
  3. 46 0
      celery/task/base.py
  4. 3 2
      celery/worker/job.py

+ 3 - 1
celery/execute.py

@@ -10,7 +10,7 @@ import inspect
 
 
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
-        routing_key=None, exchange=None,
+        routing_key=None, exchange=None, task_id=None,
         immediate=None, mandatory=None, priority=None, connection=None,
         connect_timeout=AMQP_CONNECTION_TIMEOUT, **opts):
     """Run a task asynchronously by the celery daemon(s).
@@ -63,6 +63,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     priority = priority or getattr(task, "priority", None)
     taskset_id = opts.get("taskset_id")
     publisher = opts.get("publisher")
+    retries = opts.get("retries")
     if countdown:
         eta = datetime.now() + timedelta(seconds=countdown)
 
@@ -83,6 +84,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         delay_task = curry(publisher.delay_task_in_set, taskset_id)
 
     task_id = delay_task(task.name, args, kwargs,
+                         task_id=task_id, retries=retries,
                          routing_key=routing_key, exchange=exchange,
                          mandatory=mandatory, immediate=immediate,
                          priority=priority, eta=eta)

+ 0 - 5
celery/messaging.py

@@ -38,11 +38,6 @@ class TaskPublisher(Publisher):
                                 task_args=task_args, task_kwargs=task_kwargs,
                                 **kwargs)
 
-    def retry_task(self, task_name, task_id, delivery_info, **kwargs):
-        kwargs["routing_key"] = delivery_info.get("routing_key")
-        kwargs["retries"] = kwargs.get("retries", 0) + 1
-        self._delay_task(task_name, task_id, **kwargs)
-
     def _delay_task(self, task_name, task_id=None, part_of_set=None,
             task_args=None, task_kwargs=None, **kwargs):
         """INTERNAL"""

+ 46 - 0
celery/task/base.py

@@ -10,6 +10,10 @@ from celery.registry import tasks
 from celery.serialization import pickle
 
 
+class MaxRetriesExceededError(Exception):
+    """The tasks max restart limit has been exceeded."""
+
+
 class Task(object):
     """A task that can be delayed for execution by the ``celery`` daemon.
 
@@ -57,6 +61,16 @@ class Task(object):
 
         The message priority. A number from ``0`` to ``9``.
 
+    .. attribute:: max_retries
+
+        Maximum number of retries before giving up (i.e. raising the last
+        resulting exception). Default is ``3``.
+
+    .. attribute:: default_retry_delay
+
+        Defeault time in seconds before a retry of the task should be
+        executed. Default is a 1 minute delay.
+
     .. attribute:: ignore_result
 
         Don't store the status and return value. This means you can't
@@ -114,6 +128,10 @@ class Task(object):
     priority = None
     ignore_result = False
     disable_error_emails = False
+    max_retries = 3
+    default_retry_delay = 60
+
+    MaxRetriesExceededError = MaxRetriesExceededError
 
     def __init__(self):
         if not self.__class__.name:
@@ -207,6 +225,34 @@ class Task(object):
         """
         return apply_async(cls, args, kwargs, **options)
 
+    def retry(self, args, kwargs, **options):
+        """Retry the task.
+
+        Example
+
+            >>> class TwitterPostStatusTask(Task):
+            ... 
+            ...     def run(self, username, password, message, **kwargs):
+            ...         twitter = Twitter(username, password)
+            ...         try:
+            ...             twitter.post_status(message)
+            ...         except twitter.FailWhale, exc:
+            ...             # Retry in 5 minutes.
+            ...             self.retry([username, password, message], kwargs,
+            ...                        countdown=60 * 5, exc=exc)
+
+        """
+        options["retries"] = kwargs.pop("task_retries", 0) + 1
+        options["task_id"] = kwargs.pop("task_id", None)
+        options["countdown"] = options.get("countdown",
+                                           self.default_retry_delay)
+        exc = options.pop("exc", MaxRetriesExceededError(
+            "Can't retry %s[%s] args:%s kwargs:%s" % (
+                self.name, options["task_id"], args, kwargs)))
+        if options["retries"] > self.max_retries:
+            raise exc
+        return self.apply_async(args=args, kwargs=kwargs, **options)
+        
     @classmethod
     def apply(cls, args=None, kwargs=None, **options):
         """Execute this task at once, by blocking until the task

+ 3 - 2
celery/worker/job.py

@@ -198,12 +198,13 @@ class TaskWrapper(object):
         These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
 
         """
+        kwargs = dict(self.kwargs)
         task_func_kwargs = {"logfile": logfile,
                             "loglevel": loglevel,
                             "task_id": self.task_id,
                             "task_name": self.task_name}
-        task_func_kwargs.update(self.kwargs)
-        return task_func_kwargs
+        kwargs.update(task_func_kwargs)
+        return kwargs
 
     def execute(self, loglevel=None, logfile=None):
         """Execute the task in a :func:`jail` and store return value