Ask Solem 11 years ago
parent
commit
fdd020a37b
2 changed files with 21 additions and 9 deletions
  1. 2 2
      celery/backends/base.py
  2. 19 7
      celery/tests/backends/test_base.py

+ 2 - 2
celery/backends/base.py

@@ -31,7 +31,7 @@ from celery.app import current_task
 from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
 from celery.five import items
 from celery.result import (
-    AsyncResult, GroupResult, allow_join_result, result_from_tuple,
+    GroupResult, ResultBase, allow_join_result, result_from_tuple,
 )
 from celery.utils import timeutils
 from celery.utils.functional import LRUCache
@@ -147,7 +147,7 @@ class BaseBackend(object):
 
     def prepare_value(self, result):
         """Prepare value for storage."""
-        if self.serializer != 'pickle' and isinstance(result, AsyncResult):
+        if self.serializer != 'pickle' and isinstance(result, ResultBase):
             return result.as_tuple()
         return result
 

+ 19 - 7
celery/tests/backends/test_base.py

@@ -20,6 +20,7 @@ from celery.backends.base import (
     KeyValueStoreBackend,
     DisabledBackend,
 )
+from celery.result import result_from_tuple
 from celery.utils import uuid
 
 from celery.tests.case import AppCase, Mock, SkipTest, patch
@@ -219,8 +220,18 @@ class test_BaseBackend_dict(AppCase):
             self.assertTrue(args[2])
 
     def test_prepare_value_serializes_group_result(self):
+        self.b.serializer = 'json'
         g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')])
-        self.assertIsInstance(self.b.prepare_value(g), (list, tuple))
+        v = self.b.prepare_value(g)
+        self.assertIsInstance(v, (list, tuple))
+        self.assertEqual(result_from_tuple(v, app=self.app), g)
+
+        v2 = self.b.prepare_value(g[0])
+        self.assertIsInstance(v2, (list, tuple))
+        self.assertEqual(result_from_tuple(v2, app=self.app), g[0])
+
+        self.b.serializer = 'pickle'
+        self.assertIsInstance(self.b.prepare_value(g), self.app.GroupResult)
 
     def test_is_cached(self):
         self.b._cache['foo'] = 1
@@ -287,8 +298,8 @@ class test_KeyValueStoreBackend(AppCase):
 
         b.implements_incr = True
         b.client = Mock()
-        with patch('celery.result.GroupResult') as GR:
-            deps = GR.restore.return_value = Mock()
+        with patch('celery.backends.base.GroupResult') as GR:
+            deps = GR.restore.return_value = Mock(name='DEPS')
             deps.__len__ = Mock()
             deps.__len__.return_value = 10
             b.incr = Mock()
@@ -331,14 +342,15 @@ class test_KeyValueStoreBackend(AppCase):
             self.assertIn('foo', str(exc))
 
     def test_chord_part_return_join_raises_task(self):
-        with self._chord_part_context(self.b) as (task, deps, callback):
+        b = KVBackend(serializer='pickle', app=self.app)
+        with self._chord_part_context(b) as (task, deps, callback):
             deps._failed_join_report = lambda: iter([
                 self.app.AsyncResult('culprit'),
             ])
             deps.join_native.side_effect = KeyError('foo')
-            self.b.on_chord_part_return(task)
-            self.assertTrue(self.b.fail_from_current_stack.called)
-            args = self.b.fail_from_current_stack.call_args
+            b.on_chord_part_return(task)
+            self.assertTrue(b.fail_from_current_stack.called)
+            args = b.fail_from_current_stack.call_args
             exc = args[1]['exc']
             self.assertIsInstance(exc, ChordError)
             self.assertIn('Dependency culprit raised', str(exc))