Pārlūkot izejas kodu

Consistent decoding of exception result

Conflicts:
	celery/backends/base.py
	celery/backends/redis.py
	celery/canvas.py
Ask Solem 10 gadi atpakaļ
vecāks
revīzija
c3ff4a2dad

+ 2 - 4
celery/backends/amqp.py

@@ -256,14 +256,12 @@ class AMQPBackend(BaseBackend):
             results = deque()
             push_result = results.append
             push_cache = self._cache.__setitem__
-            to_exception = self.exception_to_python
+            decode_result = self.decode_result
 
             def on_message(message):
-                body = message.decode()
+                body = decode_result(message.body)
                 state, uid = getfields(body)
                 if state in READY_STATES:
-                    if state in PROPAGATE_STATES:
-                        body['result'] = to_exception(body['result'])
                     push_result(body) \
                         if uid in task_ids else push_cache(uid, body)
 

+ 10 - 8
celery/backends/base.py

@@ -182,6 +182,12 @@ class BaseBackend(object):
         _, _, payload = dumps(data, serializer=self.serializer)
         return payload
 
+    def decode_result(self, payload):
+        meta = self.decode(payload)
+        if meta['status'] in self.EXCEPTION_STATES:
+            meta['result'] = self.exception_to_python(meta['result'])
+        return meta
+
     def decode(self, payload):
         payload = PY3 and payload or str(payload)
         return loads(payload,
@@ -265,11 +271,7 @@ class BaseBackend(object):
 
     def get_result(self, task_id):
         """Get the result of a task."""
-        meta = self.get_task_meta(task_id)
-        if meta['status'] in self.EXCEPTION_STATES:
-            return self.exception_to_python(meta['result'])
-        else:
-            return meta['result']
+        return self.get_task_meta(task_id).get('result')
 
     def get_children(self, task_id):
         """Get the list of subtasks sent by a task."""
@@ -431,12 +433,12 @@ class KeyValueStoreBackend(BaseBackend):
     def _mget_to_results(self, values, keys):
         if hasattr(values, 'items'):
             # client returns dict so mapping preserved.
-            return dict((self._strip_prefix(k), self.decode(v))
+            return dict((self._strip_prefix(k), self.decode_result(v))
                         for k, v in items(values)
                         if v is not None)
         else:
             # client returns list so need to recreate mapping.
-            return dict((bytes_to_str(keys[i]), self.decode(value))
+            return dict((bytes_to_str(keys[i]), self.decode_result(value))
                         for i, value in enumerate(values)
                         if value is not None)
 
@@ -494,7 +496,7 @@ class KeyValueStoreBackend(BaseBackend):
         meta = self.get(self.get_key_for_task(task_id))
         if not meta:
             return {'status': states.PENDING, 'result': None}
-        return self.decode(meta)
+        return self.decode_result(meta)
 
     def _restore_group(self, group_id):
         """Get task metadata for a task by id."""

+ 1 - 1
celery/backends/redis.py

@@ -212,7 +212,7 @@ class RedisBackend(KeyValueStoreBackend):
             callback = maybe_signature(request.chord, app=app)
             total = callback['chord_size']
             if readycount == total:
-                decode, unpack = self.decode, self._unpack_chord_result
+                decode, unpack = self.decode_result, self._unpack_chord_result
                 resl, _ = client.pipeline()     \
                     .lrange(jkey, 0, total)     \
                     .delete(jkey)               \

+ 1 - 5
celery/result.py

@@ -172,9 +172,7 @@ class AsyncResult(ResultBase):
             self._maybe_set_cache(meta)
             status = meta['status']
             if status in PROPAGATE_STATES and propagate:
-                raise self.backend.exception_to_python(meta['result'])
-            if status in EXCEPTION_STATES:
-                return self.backend.exception_to_python(meta['result'])
+                raise meta['result']
             return meta['result']
     wait = get  # deprecated alias to :meth:`get`.
 
@@ -343,8 +341,6 @@ class AsyncResult(ResultBase):
 
     def _set_cache(self, d):
         state, children = d['status'], d.get('children')
-        if state in states.EXCEPTION_STATES:
-            d['result'] = self.backend.exception_to_python(d['result'])
         if children:
             d['children'] = [
                 result_from_tuple(child, self.app) for child in children

+ 1 - 1
docs/getting-started/next-steps.rst

@@ -317,7 +317,7 @@ exception, in fact ``result.get()`` will propagate any errors by default::
     File "/opt/devel/celery/celery/result.py", line 113, in get
         interval=interval)
     File "/opt/devel/celery/celery/backends/amqp.py", line 138, in wait_for
-        raise self.exception_to_python(meta['result'])
+        raise meta['result']
     TypeError: add() takes exactly 2 arguments (1 given)
 
 If you don't wish for the errors to propagate then you can disable that

+ 1 - 1
docs/userguide/canvas.rst

@@ -742,7 +742,7 @@ to the :exc:`~@ChordError` exception:
       File "*/celery/result.py", line 120, in get
         interval=interval)
       File "*/celery/backends/amqp.py", line 150, in wait_for
-        raise self.exception_to_python(meta['result'])
+        raise meta['result']
     celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
         raised ValueError('something something',)