Browse Source

Clone must keep chord_size for all signature types. Closes #2339

Ask Solem 10 years ago
parent
commit
a46d337c2c
2 changed files with 27 additions and 9 deletions
  1. 4 1
      celery/backends/redis.py
  2. 23 8
      celery/canvas.py

+ 4 - 1
celery/backends/redis.py

@@ -180,8 +180,11 @@ class RedisBackend(KeyValueStoreBackend):
         self.client.incr(self.get_key_for_group(group_id, '.t'), 1)
 
     def _unpack_chord_result(self, tup, decode,
+                             EXCEPTION_STATES=states.EXCEPTION_STATES,
                              PROPAGATE_STATES=states.PROPAGATE_STATES):
         _, tid, state, retval = decode(tup)
+        if state in EXCEPTION_STATES:
+            retval = self.exception_to_python(retval)
         if state in PROPAGATE_STATES:
             raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
         return retval
@@ -220,7 +223,7 @@ class RedisBackend(KeyValueStoreBackend):
             callback = maybe_signature(request.chord, app=app)
             total = callback['chord_size'] + totaldiff
             if readycount == total:
-                decode, unpack = self.decode_result, self._unpack_chord_result
+                decode, unpack = self.decode, self._unpack_chord_result
                 resl, _, _ = client.pipeline()  \
                     .lrange(jkey, 0, total)     \
                     .delete(jkey)               \

+ 23 - 8
celery/canvas.py

@@ -103,6 +103,12 @@ def task_name_from(task):
     return getattr(task, 'name', task)
 
 
+def _upgrade(fields, sig):
+    """Used by custom signatures in .from_dict, to keep common fields."""
+    sig.update(chord_size=fields.get('chord_size'))
+    return sig
+
+
 class Signature(dict):
     """Class that wraps the arguments and execution options
     for a single task invocation.
@@ -162,7 +168,8 @@ class Signature(dict):
              kwargs=kwargs or {},
              options=dict(options or {}, **ex),
              subtask_type=subtask_type,
-             immutable=immutable)
+             immutable=immutable,
+             chord_size=None)
 
     def __call__(self, *partial_args, **partial_kwargs):
         args, kwargs, _ = self._merge(partial_args, partial_kwargs, None)
@@ -194,6 +201,7 @@ class Signature(dict):
         s = Signature.from_dict({'task': self.task, 'args': tuple(args),
                                  'kwargs': kwargs, 'options': deepcopy(opts),
                                  'subtask_type': self.subtask_type,
+                                 'chord_size': self.chord_size,
                                  'immutable': self.immutable}, app=self._app)
         s._type = self._type
         return s
@@ -345,6 +353,7 @@ class Signature(dict):
     kwargs = _getitem_property('kwargs')
     options = _getitem_property('options')
     subtask_type = _getitem_property('subtask_type')
+    chord_size = _getitem_property('chord_size')
     immutable = _getitem_property('immutable')
 
 
@@ -475,7 +484,7 @@ class chain(Signature):
                 tasks = d['kwargs']['tasks'] = list(tasks)
             # First task must be signature object to get app
             tasks[0] = maybe_signature(tasks[0], app=app)
-        return chain(*tasks, app=app, **d['options'])
+        return _upgrade(d, chain(*tasks, app=app, **d['options']))
 
     @property
     def app(self):
@@ -511,7 +520,9 @@ class _basemap(Signature):
 
     @classmethod
     def from_dict(cls, d, app=None):
-        return cls(*cls._unpack_args(d['kwargs']), app=app, **d['options'])
+        return _upgrade(
+            d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
+        )
 
 
 @Signature.register_type
@@ -547,7 +558,10 @@ class chunks(Signature):
 
     @classmethod
     def from_dict(self, d, app=None):
-        return chunks(*self._unpack_args(d['kwargs']), app=app, **d['options'])
+        return _upgrade(
+            d, chunks(*self._unpack_args(
+                d['kwargs']), app=app, **d['options']),
+        )
 
     def apply_async(self, args=(), kwargs={}, **opts):
         return self.group().apply_async(
@@ -594,7 +608,9 @@ class group(Signature):
 
     @classmethod
     def from_dict(self, d, app=None):
-        return group(d['kwargs']['tasks'], app=app, **d['options'])
+        return _upgrade(
+            d, group(d['kwargs']['tasks'], app=app, **d['options']),
+        )
 
     def _prepared(self, tasks, partial_args, group_id, root_id, app, dict=dict,
                   Signature=Signature, from_dict=Signature.from_dict):
@@ -756,7 +772,7 @@ class chord(Signature):
     @classmethod
     def from_dict(self, d, app=None):
         args, d['kwargs'] = self._unpack_args(**d['kwargs'])
-        return self(*args, app=app, **d)
+        return _upgrade(d, self(*args, app=app, **d))
 
     @staticmethod
     def _unpack_args(header=None, body=None, **kwargs):
@@ -820,8 +836,7 @@ class chord(Signature):
                      if propagate is None else propagate)
         group_id = uuid()
         root_id = body.options.get('root_id')
-        if 'chord_size' not in body:
-            body['chord_size'] = self.__length_hint__()
+        body.chord_size = self.__length_hint__()
         options = dict(self.options, **options) if options else self.options
         if options:
             body.options.update(options)