Browse Source

Don't perform late ack for internal errors. Closes #537

Ask Solem 13 years ago
parent
commit
e810420c5b
3 changed files with 46 additions and 22 deletions
  1. 5 1
      celery/datastructures.py
  2. 5 4
      celery/execute/trace.py
  3. 36 17
      celery/worker/job.py

+ 5 - 1
celery/datastructures.py

@@ -412,10 +412,14 @@ class ExceptionInfo(object):
     #: String representation of the traceback.
     traceback = None
 
-    def __init__(self, exc_info):
+    #: Set to true if this is an internal error.
+    internal = False
+
+    def __init__(self, exc_info, internal=False):
         self.type, self.exception, tb = exc_info
         self.tb = Traceback(tb)
         self.traceback = ''.join(traceback.format_exception(*exc_info))
+        self.internal = internal
 
     def __str__(self):
         return self.traceback

+ 5 - 4
celery/execute/trace.py

@@ -149,9 +149,6 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 try:
                     R = retval = task(*args, **kwargs)
                     state, einfo = SUCCESS, None
-                    task_on_success(retval, uuid, args, kwargs)
-                    if publish_result:
-                        store_result(uuid, retval, SUCCESS)
                 except RetryTaskError, exc:
                     I = Info(RETRY, exc, sys.exc_info())
                     state, retval, einfo = I.state, I.retval, I.exc_info
@@ -172,6 +169,10 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     I = Info(FAILURE, None, sys.exc_info())
                     state, retval, einfo = I.state, I.retval, I.exc_info
                     R = I.handle_error_state(task, eager=eager)
+                else:
+                    task_on_success(retval, uuid, args, kwargs)
+                    if publish_result:
+                        store_result(uuid, retval, SUCCESS)
 
                 # -* POST *-
                 if task_request.chord:
@@ -219,7 +220,7 @@ def report_internal_error(task, exc):
     _type, _value, _tb = sys.exc_info()
     try:
         _value = task.backend.prepare_exception(exc)
-        exc_info = ExceptionInfo((_type, _value, _tb))
+        exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
         warn(RuntimeWarning(
             "Exception raised outside body: %r:\n%s" % (
                 exc, exc_info.traceback)))

+ 36 - 17
celery/worker/job.py

@@ -79,7 +79,12 @@ class Request(object):
 
     #: Format string used to log task failure.
     error_msg = """\
-        Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
+        Task %(name)s[%(id)s] raised exception: %(exc)s
+    """
+
+    #: Format string used to log internal error.
+    internal_error_msg = """\
+        Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
     """
 
     #: Format string used to log task retry.
@@ -343,35 +348,49 @@ class Request(object):
         """Handler called if the task raised an exception."""
         state.task_ready(self)
 
-        if self.task.acks_late:
-            self.acknowledge()
+        if not exc_info.internal:
 
-        if isinstance(exc_info.exception, exceptions.RetryTaskError):
-            return self.on_retry(exc_info)
+            if isinstance(exc_info.exception, exceptions.RetryTaskError):
+                return self.on_retry(exc_info)
 
-        # This is a special case as the process would not have had
-        # time to write the result.
-        if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
-                self.store_errors:
-            self.task.backend.mark_as_failure(self.id, exc_info.exception)
+            # This is a special case as the process would not have had
+            # time to write the result.
+            if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
+                    self.store_errors:
+                self.task.backend.mark_as_failure(self.id, exc_info.exception)
+            # (acks_late) acknowledge after result stored.
+            if self.task.acks_late:
+                self.acknowledge()
 
+        self._log_error(exc_info)
+
+    def _log_error(self, exc_info):
+        format = self.error_msg
+        description = "raised exception"
+        severity = logging.ERROR
         self.send_event("task-failed", uuid=self.id,
                          exception=safe_repr(exc_info.exception),
                          traceback=safe_str(exc_info.traceback))
 
+        if exc_info.internal:
+            format = self.internal_error_msg
+            description = "INTERNAL ERROR"
+            severity = logging.CRITICAL
+
         context = {"hostname": self.hostname,
                    "id": self.id,
                    "name": self.name,
                    "exc": safe_repr(exc_info.exception),
                    "traceback": safe_str(exc_info.traceback),
                    "args": safe_repr(self.args),
-                   "kwargs": safe_repr(self.kwargs)}
-
-        self.logger.error(self.error_msg.strip(), context,
-                          exc_info=exc_info.exc_info,
-                          extra={"data": {"id": self.id,
-                                          "name": self.name,
-                                          "hostname": self.hostname}})
+                   "kwargs": safe_repr(self.kwargs),
+                   "description": description}
+
+        self.logger.log(severity, format.strip(), context,
+                        exc_info=exc_info.exc_info,
+                        extra={"data": {"id": self.id,
+                                        "name": self.name,
+                                        "hostname": self.hostname}})
 
         task_obj = tasks.get(self.name, object)
         task_obj.send_error_email(context, exc_info.exception)