Browse Source

Now handles unpickleable exceptions

Ask Solem 16 years ago
parent
commit
8268831e99
4 changed files with 84 additions and 3 deletions
  1. 72 0
      celery/backends/base.py
  2. 4 1
      celery/backends/cache.py
  3. 4 1
      celery/backends/database.py
  4. 4 1
      celery/backends/tyrant.py

+ 72 - 0
celery/backends/base.py

@@ -1,5 +1,64 @@
 """celery.backends.base"""
 from celery.timer import TimeoutTimer
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+import sys
+
+
+def find_nearest_pickleable_exception(exc):
+    """With an exception instance, iterate over its super classes (by mro)
+    and find the first super exception that is pickleable.
+  
+    :param exc: An exception instance.
+    :rtype: :exc:`Exception`
+
+    """
+    for supercls in exc.__class__.mro():
+        try:
+            superexc = supercls(exc.args)
+            pickle.dumps(superexc)
+        except:
+            pass
+        else:
+            return superexc
+    return exc
+
+
+class UnpickleableExceptionWrapper(Exception):
+    """Wraps unpickleable exceptions.
+   
+    :param exc_module: see :attr:`exc_module`.
+
+    :param exc_cls_name: see :attr:`exc_cls_name`.
+    
+    :param exc_args: The arguments for the original exception.
+
+    .. attribute:: exc_module
+
+        The module of the original exception.
+
+    .. attribute:: exc_cls_name
+
+        The name of the original exception class.
+
+    Example
+
+        >>> try:
+        ...     something_raising_unpickleable_exc()
+        >>> except Exception, e:
+        ...     exc = UnpickleableException(e.__class__.__module__,
+        ...                                 e.__class__.__name__,
+        ...                                 e.args)
+        ...     pickle.dumps(exc) # Works fine.
+
+    """
+
+    def __init__(self, exc_module, exc_cls_name, exc_args):
+        self.exc_module = exc_module
+        self.exc_cls = exc_cls_name
+        super(Exception, self).__init__(exc_module, exc_cls_name, exc_args)
 
 
 class BaseBackend(object):
@@ -18,6 +77,19 @@ class BaseBackend(object):
         """Mark task as executed with failure. Stores the execption."""
         return self.store_result(task_id, exc, status="FAILURE")
 
+    def prepare_exception(self, exc):
+        exc = find_nearest_pickleable_exception(exc)
+        try:
+            pickle.dumps(exc)
+        except pickle.PickleError:
+            excwrapper = UnpickleableExceptionWrapper(
+                            exc.__class__.__module__,
+                            exc.__class__.__name__,
+                            exc.args)
+            return excwrapper
+        else:
+            return exc
+
     def mark_as_retry(self, task_id, exc):
         """Mark task for retry."""
         return self.store_result(task_id, exc, status="RETRY")

+ 4 - 1
celery/backends/cache.py

@@ -19,7 +19,10 @@ class Backend(BaseBackend):
 
     def store_result(self, task_id, result, status):
         """Store task result and status."""
-        result = self.prepare_result(result)
+        if status == "DONE":
+            result = self.prepare_result(result)
+        elif status == "FAILURE":
+            result = self.prepare_exception(result)
         meta = {"status": status, "result": pickle.dumps(result)}
         cache.set(self._cache_key(task_id), meta)
 

+ 4 - 1
celery/backends/database.py

@@ -12,7 +12,10 @@ class Backend(BaseBackend):
 
     def store_result(self, task_id, result, status):
         """Mark task as done (executed)."""
-        result = self.prepare_result(result)
+        if status == "DONE":
+            result = self.prepare_result(result)
+        elif status == "FAILURE":
+            result = self.prepare_exception(result)
         return TaskMeta.objects.store_result(task_id, result, status)
 
     def is_done(self, task_id):

+ 4 - 1
celery/backends/tyrant.py

@@ -59,7 +59,10 @@ class Backend(BaseBackend):
 
     def store_result(self, task_id, result, status):
         """Store task result and status."""
-        result = self.prepare_result(result)
+        if status == "DONE":
+            result = self.prepare_result(result)
+        elif status == "FAILURE":
+            result = self.prepare_exception(result)
         meta = {"status": status, "result": pickle.dumps(result)}
         self.get_server()[self._cache_key(task_id)] = serialize(meta)