Parcourir la source

Task.retry now always raises RetryTaskError (except if throw=False)

Ask Solem il y a 15 ans
Parent
commit
8a9669c3c4
1 fichiers modifiés avec 18 ajouts et 5 suppressions
  1. 18 5
      celery/task/base.py

+ 18 - 5
celery/task/base.py

@@ -14,6 +14,14 @@ class MaxRetriesExceededError(Exception):
     """The tasks max restart limit has been exceeded."""
 
 
+class RetryTaskError(Exception):
+    """The task is to be retried later."""
+
+    def __init__(self, message, exc, *args, **kwargs):
+        self.exc = exc
+        super(RetryTaskError, self).__init__(message, exc, *args, **kwargs)
+
+
 class Task(object):
     """A task that can be delayed for execution by the ``celery`` daemon.
 
@@ -226,7 +234,7 @@ class Task(object):
         """
         return apply_async(cls, args, kwargs, **options)
 
-    def retry(self, args, kwargs, exc=None, **options):
+    def retry(self, args, kwargs, exc=None, throw=True, **options):
         """Retry the task.
 
         :param args: Positional arguments to retry with.
@@ -258,22 +266,27 @@ class Task(object):
         options["task_id"] = kwargs.pop("task_id", None)
         options["countdown"] = options.get("countdown",
                                            self.default_retry_delay)
-        exc = exc or self.MaxRetriesExceededError(
+        max_exc = exc or self.MaxRetriesExceededError(
                 "Can't retry %s[%s] args:%s kwargs:%s" % (
                     self.name, options["task_id"], args, kwargs))
         if options["retries"] > self.max_retries:
-            raise exc
+            raise max_exc
 
         # If task was executed eagerly using apply(),
         # then the retry must also be executed eagerly.
         if kwargs.get("task_is_eager", False):
             result = self.apply(args=args, kwargs=kwargs, **options)
             if isinstance(result, EagerResult):
+                # get() propogates any exceptions.
                 return result.get()
             return result
 
-        return self.apply_async(args=args, kwargs=kwargs, **options)
-        
+        self.apply_async(args=args, kwargs=kwargs, **options)
+
+        if throw:
+            message = "Retry in %d seconds." % options["countdown"]
+            raise RetryTaskError(message, exc)
+       
     @classmethod
     def apply(cls, args=None, kwargs=None, **options):
         """Execute this task at once, by blocking until the task