Selaa lähdekoodia

Implements new task_retry signal. Closes #1169

Ask Solem 12 vuotta sitten
vanhempi
commit
d718d27d25
3 muutettua tiedostoa jossa 18 lisäystä ja 4 poistoa
  1. 9 0
      celery/exceptions.py
  2. 3 0
      celery/signals.py
  3. 6 4
      celery/task/trace.py

+ 9 - 0
celery/exceptions.py

@@ -63,6 +63,15 @@ class MaxRetriesExceededError(Exception):
 class RetryTaskError(Exception):
     """The task is to be retried later."""
 
+    #: Optional message describing context of retry.
+    message = None
+
+    #: Exception (if any) that caused the retry to happen.
+    exc = None
+
+    #: Time of retry (ETA), either int or :class:`~datetime.datetime`.
+    when = None
+
     def __init__(self, message=None, exc=None, when=None, **kwargs):
         from kombu.utils.encoding import safe_repr
         self.message = message

+ 3 - 0
celery/signals.py

@@ -21,6 +21,9 @@ task_prerun = Signal(providing_args=['task_id', 'task', 'args', 'kwargs'])
 task_postrun = Signal(providing_args=[
     'task_id', 'task', 'args', 'kwargs', 'retval'])
 task_success = Signal(providing_args=['result'])
+task_retry = Signal(providing_args=[
+    'request', 'reason', 'einfo',
+])
 task_failure = Signal(providing_args=[
     'task_id', 'exception', 'args', 'kwargs', 'traceback', 'einfo'])
 task_revoked = Signal(providing_args=['terminated', 'signum', 'expired'])

+ 6 - 4
celery/task/trace.py

@@ -110,11 +110,13 @@ class TraceInfo(object):
         req = task.request
         type_, _, tb = sys.exc_info()
         try:
-            pred = self.retval
-            einfo = ExceptionInfo((type_, pred, tb))
+            reason = self.retval
+            einfo = ExceptionInfo((type_, reason, tb))
             if store_errors:
-                task.backend.mark_as_retry(req.id, pred.exc, einfo.traceback)
-            task.on_retry(pred.exc, req.id, req.args, req.kwargs, einfo)
+                task.backend.mark_as_retry(req.id, reason.exc, einfo.traceback)
+            task.on_retry(reason.exc, req.id, req.args, req.kwargs, einfo)
+            signals.task_retry.send(sender=task, request=req,
+                                    reason=reason, einfo=einfo)
             return einfo
         finally:
             del(tb)