Browse Source

Consistent decoding of exception result

Ask Solem 10 years ago
parent
commit
c998f50cc5

+ 2 - 4
celery/backends/amqp.py

@@ -256,14 +256,12 @@ class AMQPBackend(BaseBackend):
             results = deque()
             results = deque()
             push_result = results.append
             push_result = results.append
             push_cache = self._cache.__setitem__
             push_cache = self._cache.__setitem__
-            to_exception = self.exception_to_python
+            decode_result = self.decode_result
 
 
             def on_message(message):
             def on_message(message):
-                body = message.decode()
+                body = decode_result(message.body)
                 state, uid = getfields(body)
                 state, uid = getfields(body)
                 if state in READY_STATES:
                 if state in READY_STATES:
-                    if state in PROPAGATE_STATES:
-                        body['result'] = to_exception(body['result'])
                     push_result(body) \
                     push_result(body) \
                         if uid in task_ids else push_cache(uid, body)
                         if uid in task_ids else push_cache(uid, body)
 
 

+ 10 - 8
celery/backends/base.py

@@ -181,6 +181,12 @@ class BaseBackend(object):
         _, _, payload = dumps(data, serializer=self.serializer)
         _, _, payload = dumps(data, serializer=self.serializer)
         return payload
         return payload
 
 
+    def decode_result(self, payload):
+        meta = self.decode(payload)
+        if meta['status'] in self.EXCEPTION_STATES:
+            meta['result'] = self.exception_to_python(meta['result'])
+        return meta
+
     def decode(self, payload):
     def decode(self, payload):
         payload = PY3 and payload or str(payload)
         payload = PY3 and payload or str(payload)
         return loads(payload,
         return loads(payload,
@@ -264,11 +270,7 @@ class BaseBackend(object):
 
 
     def get_result(self, task_id):
     def get_result(self, task_id):
         """Get the result of a task."""
         """Get the result of a task."""
-        meta = self.get_task_meta(task_id)
-        if meta['status'] in self.EXCEPTION_STATES:
-            return self.exception_to_python(meta['result'])
-        else:
-            return meta['result']
+        return self.get_task_meta(task_id).get('result')
 
 
     def get_children(self, task_id):
     def get_children(self, task_id):
         """Get the list of subtasks sent by a task."""
         """Get the list of subtasks sent by a task."""
@@ -436,13 +438,13 @@ class KeyValueStoreBackend(BaseBackend):
         if hasattr(values, 'items'):
         if hasattr(values, 'items'):
             # client returns dict so mapping preserved.
             # client returns dict so mapping preserved.
             return {
             return {
-                self._strip_prefix(k): self.decode(v)
+                self._strip_prefix(k): self.decode_result(v)
                 for k, v in items(values) if v is not None
                 for k, v in items(values) if v is not None
             }
             }
         else:
         else:
             # client returns list so need to recreate mapping.
             # client returns list so need to recreate mapping.
             return {
             return {
-                bytes_to_str(keys[i]): self.decode(value)
+                bytes_to_str(keys[i]): self.decode_result(value)
                 for i, value in enumerate(values) if value is not None
                 for i, value in enumerate(values) if value is not None
             }
             }
 
 
@@ -500,7 +502,7 @@ class KeyValueStoreBackend(BaseBackend):
         meta = self.get(self.get_key_for_task(task_id))
         meta = self.get(self.get_key_for_task(task_id))
         if not meta:
         if not meta:
             return {'status': states.PENDING, 'result': None}
             return {'status': states.PENDING, 'result': None}
-        return self.decode(meta)
+        return self.decode_result(meta)
 
 
     def _restore_group(self, group_id):
     def _restore_group(self, group_id):
         """Get task metadata for a task by id."""
         """Get task metadata for a task by id."""

+ 1 - 1
celery/backends/redis.py

@@ -221,7 +221,7 @@ class RedisBackend(KeyValueStoreBackend):
             callback = maybe_signature(request.chord, app=app)
             callback = maybe_signature(request.chord, app=app)
             total = callback['chord_size'] + totaldiff
             total = callback['chord_size'] + totaldiff
             if readycount == total:
             if readycount == total:
-                decode, unpack = self.decode, self._unpack_chord_result
+                decode, unpack = self.decode_result, self._unpack_chord_result
                 resl, _, _ = client.pipeline()  \
                 resl, _, _ = client.pipeline()  \
                     .lrange(jkey, 0, total)     \
                     .lrange(jkey, 0, total)     \
                     .delete(jkey)               \
                     .delete(jkey)               \

+ 2 - 1
celery/canvas.py

@@ -618,7 +618,8 @@ class group(Signature):
                         task.args = tuple(partial_args) + tuple(task.args)
                         task.args = tuple(partial_args) + tuple(task.args)
                     yield task, task.freeze(group_id=group_id, root_id=root_id)
                     yield task, task.freeze(group_id=group_id, root_id=root_id)
 
 
-    def _apply_tasks(self, tasks, producer=None, app=None, **options):
+    def _apply_tasks(self, tasks, producer=None, app=None,
+                     add_to_parent=None, **options):
         app = app or self.app
         app = app or self.app
         with app.producer_or_acquire(producer) as producer:
         with app.producer_or_acquire(producer) as producer:
             for sig, res in tasks:
             for sig, res in tasks:

+ 1 - 5
celery/result.py

@@ -174,9 +174,7 @@ class AsyncResult(ResultBase):
             self._maybe_set_cache(meta)
             self._maybe_set_cache(meta)
             status = meta['status']
             status = meta['status']
             if status in PROPAGATE_STATES and propagate:
             if status in PROPAGATE_STATES and propagate:
-                raise self.backend.exception_to_python(meta['result'])
-            if status in EXCEPTION_STATES:
-                return self.backend.exception_to_python(meta['result'])
+                raise meta['result']
             return meta['result']
             return meta['result']
     wait = get  # deprecated alias to :meth:`get`.
     wait = get  # deprecated alias to :meth:`get`.
 
 
@@ -345,8 +343,6 @@ class AsyncResult(ResultBase):
 
 
     def _set_cache(self, d):
     def _set_cache(self, d):
         state, children = d['status'], d.get('children')
         state, children = d['status'], d.get('children')
-        if state in states.EXCEPTION_STATES:
-            d['result'] = self.backend.exception_to_python(d['result'])
         if children:
         if children:
             d['children'] = [
             d['children'] = [
                 result_from_tuple(child, self.app) for child in children
                 result_from_tuple(child, self.app) for child in children

+ 1 - 1
docs/getting-started/next-steps.rst

@@ -317,7 +317,7 @@ exception, in fact ``result.get()`` will propagate any errors by default::
     File "/opt/devel/celery/celery/result.py", line 113, in get
     File "/opt/devel/celery/celery/result.py", line 113, in get
         interval=interval)
         interval=interval)
     File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for
     File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for
-        raise self.exception_to_python(meta['result'])
+        raise meta['result']
     TypeError: add() takes exactly 2 arguments (1 given)
     TypeError: add() takes exactly 2 arguments (1 given)
 
 
 If you don't wish for the errors to propagate then you can disable that
 If you don't wish for the errors to propagate then you can disable that

+ 1 - 1
docs/userguide/canvas.rst

@@ -735,7 +735,7 @@ to the :exc:`~@ChordError` exception:
       File "*/celery/result.py", line 120, in get
       File "*/celery/result.py", line 120, in get
         interval=interval)
         interval=interval)
       File "*/celery/backends/amqp.py", line 150, in wait_for
       File "*/celery/backends/amqp.py", line 150, in wait_for
-        raise self.exception_to_python(meta['result'])
+        raise meta['result']
     celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
     celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
         raised ValueError('something something',)
         raised ValueError('something something',)