|
@@ -26,10 +26,13 @@ from kombu.serialization import (
|
|
|
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
|
|
|
|
|
|
from celery import states
|
|
|
+from celery import current_app, maybe_signature
|
|
|
from celery.app import current_task
|
|
|
from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
|
|
|
from celery.five import items
|
|
|
-from celery.result import result_from_tuple, GroupResult
|
|
|
+from celery.result import (
|
|
|
+ AsyncResult, GroupResult, allow_join_result, result_from_tuple,
|
|
|
+)
|
|
|
from celery.utils import timeutils
|
|
|
from celery.utils.functional import LRUCache
|
|
|
from celery.utils.serialization import (
|
|
@@ -46,7 +49,6 @@ PY3 = sys.version_info >= (3, 0)
|
|
|
|
|
|
def unpickle_backend(cls, args, kwargs):
|
|
|
"""Return an unpickled backend."""
|
|
|
- from celery import current_app
|
|
|
return cls(*args, app=current_app._get_current_object(), **kwargs)
|
|
|
|
|
|
|
|
@@ -145,7 +147,7 @@ class BaseBackend(object):
|
|
|
|
|
|
def prepare_value(self, result):
|
|
|
"""Prepare value for storage."""
|
|
|
- if isinstance(result, GroupResult):
|
|
|
+ if self.serializer != 'pickle' and isinstance(result, AsyncResult):
|
|
|
return result.as_tuple()
|
|
|
return result
|
|
|
|
|
@@ -480,8 +482,6 @@ class KeyValueStoreBackend(BaseBackend):
|
|
|
def on_chord_part_return(self, task, propagate=None):
|
|
|
if not self.implements_incr:
|
|
|
return
|
|
|
- from celery import maybe_signature
|
|
|
- from celery.result import GroupResult, allow_join_result
|
|
|
app = self.app
|
|
|
if propagate is None:
|
|
|
propagate = self.app.conf.CELERY_CHORD_PROPAGATES
|