Преглед изворни кода

Merge pull request #2598 from ByteInternet/fix-exception-marshalling-with-json-serializer

Fix Exception marshalling with JSON serializer
PMickael пре 10 година
родитељ
комит
b339b57e5c
3 измењених фајлова са 16 додато и 5 уклоњено
  1. 1 1
      celery/backends/amqp.py
  2. 13 2
      celery/tests/backends/test_amqp.py
  3. 2 2
      tox.ini

+ 1 - 1
celery/backends/amqp.py

@@ -195,7 +195,7 @@ class AMQPBackend(BaseBackend):
 
 
         def callback(meta, message):
         def callback(meta, message):
             if meta['status'] in states.READY_STATES:
             if meta['status'] in states.READY_STATES:
-                results[meta['task_id']] = meta
+                results[meta['task_id']] = self.meta_from_decoded(meta)
 
 
         consumer.callbacks[:] = [callback]
         consumer.callbacks[:] = [callback]
         time_start = now()
         time_start = now()

+ 13 - 2
celery/tests/backends/test_amqp.py

@@ -13,6 +13,7 @@ from celery import states
 from celery.backends.amqp import AMQPBackend
 from celery.backends.amqp import AMQPBackend
 from celery.exceptions import TimeoutError
 from celery.exceptions import TimeoutError
 from celery.five import Empty, Queue, range
 from celery.five import Empty, Queue, range
+from celery.result import AsyncResult
 from celery.utils import uuid
 from celery.utils import uuid
 
 
 from celery.tests.case import (
 from celery.tests.case import (
@@ -246,10 +247,20 @@ class test_AMQPBackend(AppCase):
         with self.assertRaises(TimeoutError):
         with self.assertRaises(TimeoutError):
             b.wait_for(tid, timeout=0.01, cache=False)
             b.wait_for(tid, timeout=0.01, cache=False)
 
 
-    def test_drain_events_remaining_timeouts(self):
+    def test_drain_events_decodes_exceptions_in_meta(self):
+        tid = uuid()
+        b = self.create_backend(serializer="json")
+        b.store_result(tid, RuntimeError("aap"), states.FAILURE)
+        result = AsyncResult(tid, backend=b)
 
 
-        class Connection(object):
+        with self.assertRaises(Exception) as cm:
+            result.get()
 
 
+        self.assertEqual(cm.exception.__class__.__name__, "RuntimeError")
+        self.assertEqual(str(cm.exception), "aap")
+
+    def test_drain_events_remaining_timeouts(self):
+        class Connection(object):
             def drain_events(self, timeout=None):
             def drain_events(self, timeout=None):
                 pass
                 pass
 
 

+ 2 - 2
tox.ini

@@ -48,7 +48,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/dev.txt
        -r{toxinidir}/requirements/dev.txt
 setenv = C_DEBUG_TEST = 1
 setenv = C_DEBUG_TEST = 1
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           pip install -U -r{toxinidir}/requirements/dev.txt
+           pip install -q -U -r{toxinidir}/requirements/dev.txt
            nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
            nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 
 [testenv:pypy3]
 [testenv:pypy3]
@@ -59,7 +59,7 @@ deps = -r{toxinidir}/requirements/default.txt
        -r{toxinidir}/requirements/dev.txt
        -r{toxinidir}/requirements/dev.txt
 setenv = C_DEBUG_TEST = 1
 setenv = C_DEBUG_TEST = 1
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
-           pip install -U -r{toxinidir}/requirements/dev.txt
+           pip install -q -U -r{toxinidir}/requirements/dev.txt
            nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
            nosetests -xsv --with-coverage --cover-inclusive --cover-erase []
 
 
 [testenv:docs]
 [testenv:docs]