|
@@ -319,7 +319,7 @@ class Request(object):
|
|
|
if failed:
|
|
|
if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
|
|
|
raise retval.exception
|
|
|
- return self.on_failure(retval)
|
|
|
+ return self.on_failure(retval, return_ok=True)
|
|
|
task_ready(self)
|
|
|
|
|
|
if self.task.acks_late:
|
|
@@ -339,7 +339,7 @@ class Request(object):
|
|
|
exception=safe_repr(exc_info.exception.exc),
|
|
|
traceback=safe_str(exc_info.traceback))
|
|
|
|
|
|
- def on_failure(self, exc_info, send_failed_event=True):
|
|
|
+ def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
|
|
|
"""Handler called if the task raised an exception."""
|
|
|
task_ready(self)
|
|
|
|
|
@@ -358,14 +358,14 @@ class Request(object):
|
|
|
# These are special cases where the process would not have had
|
|
|
# time to write the result.
|
|
|
if self.store_errors:
|
|
|
- if isinstance(exc, WorkerLostError):
|
|
|
- self.task.backend.mark_as_failure(
|
|
|
- self.id, exc, request=self,
|
|
|
- )
|
|
|
- elif isinstance(exc, Terminated):
|
|
|
+ if isinstance(exc, Terminated):
|
|
|
self._announce_revoked(
|
|
|
'terminated', True, string(exc), False)
|
|
|
send_failed_event = False # already sent revoked event
|
|
|
+ elif isinstance(exc, WorkerLostError) or not return_ok:
|
|
|
+ self.task.backend.mark_as_failure(
|
|
|
+ self.id, exc, request=self,
|
|
|
+ )
|
|
|
# (acks_late) acknowledge after result stored.
|
|
|
if self.task.acks_late:
|
|
|
self.acknowledge()
|
|
@@ -377,6 +377,10 @@ class Request(object):
|
|
|
traceback=exc_info.traceback,
|
|
|
)
|
|
|
|
|
|
+ if not return_ok:
|
|
|
+ error('Task handler raised error: %r', exc,
|
|
|
+ exc_info=exc_info.exc_info)
|
|
|
+
|
|
|
def acknowledge(self):
|
|
|
"""Acknowledge task."""
|
|
|
if not self.acknowledged:
|