|
@@ -8,6 +8,7 @@ from celery.datastructures import ExceptionInfo
|
|
|
from celery.backends import default_backend
|
|
|
from django.core.mail import mail_admins
|
|
|
from celery.monitoring import TaskTimerStats
|
|
|
+from celery.task.base import RetryTaskError
|
|
|
import multiprocessing
|
|
|
import traceback
|
|
|
import socket
|
|
@@ -84,11 +85,34 @@ def jail(task_id, task_name, func, args, kwargs):
|
|
|
result = func(*args, **kwargs)
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
|
raise
|
|
|
+ except RetryTaskError, exc:
|
|
|
+ ### Task is to be retried.
|
|
|
+
|
|
|
+ # RetryTaskError stores both a small message describing the retry
|
|
|
+ # and the original exception.
|
|
|
+ message, orig_exc = exc.args
|
|
|
+ default_backend.mark_as_retry(task_id, orig_exc)
|
|
|
+
|
|
|
+ # Create a simpler version of the RetryTaskError that stringifies
|
|
|
+ # the original exception instead of including the exception instance.
|
|
|
+ # This is for reporting the retry in logs, e-mail etc, while
|
|
|
+ # guaranteeing pickleability.
|
|
|
+ expanded_msg = "%s: %s" % (message, str(orig_exc))
|
|
|
+ type_, _, tb = sys.exc_info()
|
|
|
+ retval = ExceptionInfo((type_,
|
|
|
+ type_(expanded_msg, None),
|
|
|
+ tb))
|
|
|
except Exception, exc:
|
|
|
+ ### Task ended in failure.
|
|
|
+ # mark_as_failure returns an exception that is guaranteed to
|
|
|
+ # be pickleable.
|
|
|
stored_exc = default_backend.mark_as_failure(task_id, exc)
|
|
|
+
|
|
|
+ # wrap exception info + traceback and return it to caller.
|
|
|
type_, _, tb = sys.exc_info()
|
|
|
retval = ExceptionInfo((type_, stored_exc, tb))
|
|
|
else:
|
|
|
+ ### Task executed successfully.
|
|
|
if not ignore_result:
|
|
|
default_backend.mark_as_done(task_id, result)
|
|
|
retval = result
|