|
@@ -29,7 +29,7 @@ from celery import states
|
|
from celery.app import current_task
|
|
from celery.app import current_task
|
|
from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
|
|
from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
|
|
from celery.five import items
|
|
from celery.five import items
|
|
-from celery.result import from_serializable, GroupResult
|
|
|
|
|
|
+from celery.result import result_from_tuple, GroupResult
|
|
from celery.utils import timeutils
|
|
from celery.utils import timeutils
|
|
from celery.utils.functional import LRUCache
|
|
from celery.utils.functional import LRUCache
|
|
from celery.utils.serialization import (
|
|
from celery.utils.serialization import (
|
|
@@ -139,7 +139,7 @@ class BaseBackend(object):
|
|
def prepare_value(self, result):
|
|
def prepare_value(self, result):
|
|
"""Prepare value for storage."""
|
|
"""Prepare value for storage."""
|
|
if isinstance(result, GroupResult):
|
|
if isinstance(result, GroupResult):
|
|
- return result.serializable()
|
|
|
|
|
|
+ return result.as_tuple()
|
|
return result
|
|
return result
|
|
|
|
|
|
def encode(self, data):
|
|
def encode(self, data):
|
|
@@ -307,7 +307,7 @@ class BaseBackend(object):
|
|
|
|
|
|
def fallback_chord_unlock(self, group_id, body, result=None,
|
|
def fallback_chord_unlock(self, group_id, body, result=None,
|
|
countdown=1, **kwargs):
|
|
countdown=1, **kwargs):
|
|
- kwargs['result'] = [r.serializable() for r in result]
|
|
|
|
|
|
+ kwargs['result'] = [r.as_tuple() for r in result]
|
|
self.app.tasks['celery.chord_unlock'].apply_async(
|
|
self.app.tasks['celery.chord_unlock'].apply_async(
|
|
(group_id, body, ), kwargs, countdown=countdown,
|
|
(group_id, body, ), kwargs, countdown=countdown,
|
|
)
|
|
)
|
|
@@ -316,7 +316,7 @@ class BaseBackend(object):
|
|
def current_task_children(self, request=None):
|
|
def current_task_children(self, request=None):
|
|
request = request or getattr(current_task(), 'request', None)
|
|
request = request or getattr(current_task(), 'request', None)
|
|
if request:
|
|
if request:
|
|
- return [r.serializable() for r in getattr(request, 'children', [])]
|
|
|
|
|
|
+ return [r.as_tuple() for r in getattr(request, 'children', [])]
|
|
|
|
|
|
def __reduce__(self, args=(), kwargs={}):
|
|
def __reduce__(self, args=(), kwargs={}):
|
|
return (unpickle_backend, (self.__class__, args, kwargs))
|
|
return (unpickle_backend, (self.__class__, args, kwargs))
|
|
@@ -422,7 +422,7 @@ class KeyValueStoreBackend(BaseBackend):
|
|
|
|
|
|
def _save_group(self, group_id, result):
|
|
def _save_group(self, group_id, result):
|
|
self.set(self.get_key_for_group(group_id),
|
|
self.set(self.get_key_for_group(group_id),
|
|
- self.encode({'result': result.serializable()}))
|
|
|
|
|
|
+ self.encode({'result': result.as_tuple()}))
|
|
return result
|
|
return result
|
|
|
|
|
|
def _delete_group(self, group_id):
|
|
def _delete_group(self, group_id):
|
|
@@ -444,7 +444,7 @@ class KeyValueStoreBackend(BaseBackend):
|
|
if meta:
|
|
if meta:
|
|
meta = self.decode(meta)
|
|
meta = self.decode(meta)
|
|
result = meta['result']
|
|
result = meta['result']
|
|
- meta['result'] = from_serializable(result, self.app)
|
|
|
|
|
|
+ meta['result'] = result_from_tuple(result, self.app)
|
|
return meta
|
|
return meta
|
|
|
|
|
|
def on_chord_apply(self, group_id, body, result=None, **kwargs):
|
|
def on_chord_apply(self, group_id, body, result=None, **kwargs):
|