Browse Source

Add traceback to result value on failure. Closes #28. **NOTE** If you use the
database backend you have to re-create the database table celery_taskmeta

Ask Solem 15 years ago
parent
commit
929c842541

+ 7 - 4
celery/backends/amqp.py

@@ -54,7 +54,7 @@ class Backend(BaseBackend):
                         auto_delete=True,
                         routing_key=routing_key)
 
-    def store_result(self, task_id, result, status):
+    def store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
         if status == "DONE":
             result = self.prepare_result(result)
@@ -64,15 +64,14 @@ class Backend(BaseBackend):
 
         meta = {"task_id": task_id,
                 "result": result,
-                "status": status}
+                "status": status,
+                "traceback": traceback}
 
         connection = self.connection
         publisher = self._publisher_for_task_id(task_id, connection)
         publisher.send(meta, serializer="pickle")
         publisher.close()
 
-        print("SENT %s RESULT: %s TO %s" % (
-            status, result, task_id.replace("-", "")))
         return result
 
     def is_done(self, task_id):
@@ -83,6 +82,10 @@ class Backend(BaseBackend):
         """Get the status of a task."""
         return self._get_task_meta_for(task_id)["status"]
 
+    def get_traceback(self, task_id):
+        """Get the traceback for a failed task."""
+        return self._get_task_meta_for(task_id)["traceback"]
+
     def _get_task_meta_for(self, task_id):
 
         if task_id in self._cache:

+ 18 - 6
celery/backends/base.py

@@ -101,14 +101,16 @@ class BaseBackend(object):
         """Mark task as successfully executed."""
         return self.store_result(task_id, result, status="DONE")
 
-    def mark_as_failure(self, task_id, exc):
+    def mark_as_failure(self, task_id, exc, traceback=None):
         """Mark task as executed with failure. Stores the execption."""
-        return self.store_result(task_id, exc, status="FAILURE")
+        return self.store_result(task_id, exc, status="FAILURE",
+                                 traceback=traceback)
 
-    def mark_as_retry(self, task_id, exc):
+    def mark_as_retry(self, task_id, exc, traceback=None):
         """Mark task as being retries. Stores the current
         exception (if any)."""
-        return self.store_result(task_id, exc, status="RETRY")
+        return self.store_result(task_id, exc, status="RETRY",
+                                 traceback=traceback)
 
     def create_exception_cls(self, name, module, parent=None):
         """Dynamically create an exception class."""
@@ -155,6 +157,11 @@ class BaseBackend(object):
         raise NotImplementedError(
                 "get_result is not supported by this backend.")
 
+    def get_traceback(self, task_id):
+        """Get the traceback for a failed task."""
+        raise NotImplementedError(
+                "get_traceback is not supported by this backend.")
+
     def is_done(self, task_id):
         """Returns ``True`` if the task was successfully executed."""
         return self.get_status(task_id) == "DONE"
@@ -218,13 +225,13 @@ class KeyValueStoreBackend(BaseBackend):
     def set(self, key, value):
         raise NotImplementedError("Must implement the set method.")
 
-    def store_result(self, task_id, result, status):
+    def store_result(self, task_id, result, status, traceback=None):
         """Store task result and status."""
         if status == "DONE":
             result = self.prepare_result(result)
         elif status == "FAILURE":
             result = self.prepare_exception(result)
-        meta = {"status": status, "result": result}
+        meta = {"status": status, "result": result, "traceback": traceback}
         self.set(self.get_cache_key_for_task(task_id), pickle.dumps(meta))
         return result
 
@@ -240,6 +247,11 @@ class KeyValueStoreBackend(BaseBackend):
         else:
             return meta["result"]
 
+    def get_traceback(self, task_id):
+        """Get the traceback for a failed task."""
+        meta = self._get_task_meta_for(task_id)
+        return meta["traceback"]
+
     def is_done(self, task_id):
         """Returns ``True`` if the task executed successfully."""
         return self.get_status(task_id) == "DONE"

+ 8 - 3
celery/backends/database.py

@@ -30,13 +30,14 @@ class Backend(BaseBackend):
             task_id_tuples.append((waiting_task, task_id))
         return task_id_tuples
 
-    def store_result(self, task_id, result, status):
-        """Mark task as done (executed)."""
+    def store_result(self, task_id, result, status, traceback=None):
+        """Store return value and status of an executed task."""
         if status == "DONE":
             result = self.prepare_result(result)
         elif status == "FAILURE":
             result = self.prepare_exception(result)
-        TaskMeta.objects.store_result(task_id, result, status)
+        TaskMeta.objects.store_result(task_id, result, status,
+                                      traceback=None)
         return result
 
     def is_done(self, task_id):
@@ -47,6 +48,10 @@ class Backend(BaseBackend):
         """Get the status of a task."""
         return self._get_task_meta_for(task_id).status
 
+    def get_traceback(self, task_id):
+        """Get the traceback of a failed task."""
+        return self._get_task_meta_for(task_id).traceback
+
     def get_result(self, task_id):
         """Get the result for a task."""
         meta = self._get_task_meta_for(task_id)

+ 4 - 1
celery/execute.py

@@ -7,6 +7,7 @@ from celery.utils import gen_unique_id
 from functools import partial as curry
 from datetime import datetime, timedelta
 from multiprocessing import get_logger
+import traceback
 import inspect
 
 
@@ -154,8 +155,10 @@ def apply(task, args, kwargs, **options):
     try:
         ret_value = task(*args, **kwargs)
         status = "DONE"
+        tb = None
     except Exception, exc:
         ret_value = exc
+        tb = traceback.format_stack()
         status = "FAILURE"
 
-    return EagerResult(task_id, ret_value, status)
+    return EagerResult(task_id, ret_value, status, traceback=tb)

+ 7 - 2
celery/managers.py

@@ -80,7 +80,7 @@ class TaskManager(models.Manager):
         """Delete all expired task results."""
         self.get_all_expired().delete()
 
-    def store_result(self, task_id, result, status):
+    def store_result(self, task_id, result, status, traceback=None):
         """Store the result and status of a task.
 
         :param task_id: task id
@@ -92,13 +92,18 @@ class TaskManager(models.Manager):
             :meth:`celery.result.AsyncResult.get_status` for a list of
             possible status values.
 
+        :keyword traceback: The traceback at the point of exception (if the
+            task failed).
+
         """
         task, created = self.get_or_create(task_id=task_id, defaults={
                                             "status": status,
-                                            "result": result})
+                                            "result": result,
+                                            "traceback": traceback})
         if not created:
             task.status = status
             task.result = result
+            task.traceback = traceback
             task.save()
 
 

+ 1 - 0
celery/models.py

@@ -26,6 +26,7 @@ class TaskMeta(models.Model):
             default=TASK_STATUS_PENDING, choices=TASK_STATUSES_CHOICES)
     result = PickledObjectField()
     date_done = models.DateTimeField(_(u"done at"), auto_now=True)
+    traceback = models.TextField(_(u"traceback"), blank=True, null=True)
 
     objects = TaskManager()
 

+ 12 - 1
celery/result.py

@@ -98,6 +98,11 @@ class BaseAsyncResult(object):
             return self.backend.get_result(self.task_id)
         return None
 
+    @property
+    def traceback(self):
+        """Get the traceback of a failed task."""
+        return self.backend.get_traceback(self.task_id)
+
     @property
     def status(self):
         """The current status of the task.
@@ -300,10 +305,11 @@ class EagerResult(BaseAsyncResult):
     """Result that we know has already been executed.  """
     TimeoutError = TimeoutError
 
-    def __init__(self, task_id, ret_value, status):
+    def __init__(self, task_id, ret_value, status, traceback=None):
         self.task_id = task_id
         self._result = ret_value
         self._status = status
+        self._traceback = traceback
 
     def is_done(self):
         """Returns ``True`` if the task executed without failure."""
@@ -330,5 +336,10 @@ class EagerResult(BaseAsyncResult):
         """The tasks status"""
         return self._status
 
+    @property
+    def traceback(self):
+        """The traceback if the task failed."""
+        return self._traceback
+
     def __repr__(self):
         return "<EagerResult: %s>" % self.task_id

+ 1 - 0
celery/tests/test_task.py

@@ -310,6 +310,7 @@ class TestTaskApply(unittest.TestCase):
         f = RaisingTask.apply()
         self.assertTrue(f.is_ready())
         self.assertFalse(f.is_done())
+        self.assertTrue(f.traceback)
         self.assertRaises(KeyError, f.get)
 
 

+ 5 - 4
celery/worker/job.py

@@ -73,29 +73,30 @@ def jail(task_id, task_name, func, args, kwargs):
         raise
     except RetryTaskError, exc:
         ### Task is to be retried.
+        type_, _, tb = sys.exc_info()
 
         # RetryTaskError stores both a small message describing the retry
         # and the original exception.
         message, orig_exc = exc.args
-        default_backend.mark_as_retry(task_id, orig_exc)
+        default_backend.mark_as_retry(task_id, orig_exc, tb)
 
         # Create a simpler version of the RetryTaskError that stringifies
         # the original exception instead of including the exception instance.
         # This is for reporting the retry in logs, e-mail etc, while
         # guaranteeing pickleability.
         expanded_msg = "%s: %s" % (message, str(orig_exc))
-        type_, _, tb = sys.exc_info()
         retval = ExceptionInfo((type_,
                                 type_(expanded_msg, None),
                                 tb))
     except Exception, exc:
         ### Task ended in failure.
+        type_, _, tb = sys.exc_info()
+
         # mark_as_failure returns an exception that is guaranteed to
         # be pickleable.
-        stored_exc = default_backend.mark_as_failure(task_id, exc)
+        stored_exc = default_backend.mark_as_failure(task_id, exc, tb)
 
         # wrap exception info + traceback and return it to caller.
-        type_, _, tb = sys.exc_info()
         retval = ExceptionInfo((type_, stored_exc, tb))
     else:
         ### Task executed successfully.