Browse Source

Fixes for RetryTaskError exception

Ask Solem 12 years ago
parent
commit
c06772304d
4 changed files with 36 additions and 23 deletions
  1. 7 8
      celery/app/task.py
  2. 22 3
      celery/exceptions.py
  3. 6 10
      celery/task/trace.py
  4. 1 2
      celery/worker/job.py

+ 7 - 8
celery/app/task.py

@@ -488,12 +488,12 @@ class Task(object):
                 evd = app.events.Dispatcher(channel=P.channel,
                                             buffer_while_offline=False)
 
-            task_id = P.delay_task(self.name, args, kwargs,
-                                   task_id=task_id,
-                                   event_dispatcher=evd,
-                                   callbacks=maybe_list(link),
-                                   errbacks=maybe_list(link_error),
-                                   **options)
+            task_id = P.publish_task(self.name, args, kwargs,
+                                     task_id=task_id,
+                                     event_dispatcher=evd,
+                                     callbacks=maybe_list(link),
+                                     errbacks=maybe_list(link_error),
+                                     **options)
         result = self.AsyncResult(task_id)
         if add_to_parent:
             parent = get_current_worker_task()
@@ -583,8 +583,7 @@ class Task(object):
             self.apply(args=args, kwargs=kwargs, **options).get()
         else:
             self.apply_async(args=args, kwargs=kwargs, **options)
-        ret = RetryTaskError(eta and 'Retry at %s' % eta
-                                  or 'Retry in %s secs.' % countdown, exc)
+        ret = RetryTaskError(exc, eta or countdown)
         if throw:
             raise ret
         return ret

+ 22 - 3
celery/exceptions.py

@@ -12,6 +12,8 @@ from billiard.exceptions import (  # noqa
     SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError,
 )
 
+from .utils.encoding import safe_repr
+
 UNREGISTERED_FMT = """\
 Task of kind %s is not registered, please make sure it's imported.\
 """
@@ -59,9 +61,26 @@ class MaxRetriesExceededError(Exception):
 class RetryTaskError(Exception):
     """The task is to be retried later."""
 
-    def __init__(self, message, exc, *args, **kwargs):
-        self.exc = exc
-        Exception.__init__(self, message, exc, *args, **kwargs)
+    def __init__(self, exc=None, when=None, **kwargs):
+        if isinstance(exc, basestring):
+            self.exc, self.excs = None, exc
+        else:
+            self.exc, self.excs = exc, safe_repr(exc) if exc else None
+        self.when = when
+        Exception.__init__(self, exc, when, **kwargs)
+
+    def humanize(self):
+        if isinstance(self.when, int):
+            return 'in %ss' % self.when
+        return 'at %s' % (self.when, )
+
+    def __str__(self):
+        if self.excs:
+            return 'Retry %s: %r' % (self.humanize(), self.excs)
+        return 'Retry %s' % self.humanize()
+
+    def __reduce__(self):
+        return self.__class__, (self.excs, self.when)
 
 
 class TaskRevokedError(Exception):

+ 6 - 10
celery/task/trace.py

@@ -93,20 +93,16 @@ class TraceInfo(object):
 
     def handle_retry(self, task, store_errors=True):
         """Handle retry exception."""
-        # Create a simpler version of the RetryTaskError that stringifies
-        # the original exception instead of including the exception instance.
-        # This is for reporting the retry in logs, email etc, while
-        # guaranteeing pickleability.
+        # the exception raised is the RetryTaskError semi-predicate,
+        # and it's exc' attribute is the original exception raised (if any).
         req = task.request
         type_, _, tb = sys.exc_info()
         try:
-            exc = self.retval
-            message, orig_exc = exc.args
-            expanded_msg = '%s: %s' % (message, str(orig_exc))
-            einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
+            retrypred = self.retval
+            einfo = ExceptionInfo((type_, retrypred, tb))
             if store_errors:
-                task.backend.mark_as_retry(req.id, orig_exc, einfo.traceback)
-            task.on_retry(exc, req.id, req.args, req.kwargs, einfo)
+                task.backend.mark_as_retry(req.id, retrypred.exc, einfo.traceback)
+            task.on_retry(retrypred.exc, req.id, req.args, req.kwargs, einfo)
             return einfo
         finally:
             del(tb)

+ 1 - 2
celery/worker/job.py

@@ -324,8 +324,7 @@ class Request(object):
         if _does_info:
             info(self.retry_msg.strip(), {
                 'id': self.id, 'name': self.name,
-                'exc': safe_repr(exc_info.exception.exc)},
-                exc_info=exc_info.exc_info)
+                'exc': exc_info.exception})
 
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""