Prechádzať zdrojové kódy

Clone must keep chord_size for all signature types. Closes #2339

Conflicts:
	celery/backends/redis.py
	celery/canvas.py
Ask Solem 10 rokov pred
rodič
commit
dfc4db901c
3 zmenil súbory, kde vykonal 25 pridanie a 8 odobranie
  1. 1 1
      celery/app/builtins.py
  2. 4 1
      celery/backends/redis.py
  3. 20 6
      celery/canvas.py

+ 1 - 1
celery/app/builtins.py

@@ -332,7 +332,7 @@ def add_chord_task(app):
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)
 
-            body.setdefault('chord_size', len(header.tasks))
+            body.chord_size = len(header.tasks)
             results = header.freeze(group_id=group_id, chord=body).results
 
             return self.backend.apply_chord(

+ 4 - 1
celery/backends/redis.py

@@ -178,8 +178,11 @@ class RedisBackend(KeyValueStoreBackend):
         return self.client.expire(key, value)
 
     def _unpack_chord_result(self, tup, decode,
+                             EXCEPTION_STATES=states.EXCEPTION_STATES,
                              PROPAGATE_STATES=states.PROPAGATE_STATES):
         _, tid, state, retval = decode(tup)
+        if state in EXCEPTION_STATES:
+            retval = self.exception_to_python(retval)
         if state in PROPAGATE_STATES:
             raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
         return retval
@@ -212,7 +215,7 @@ class RedisBackend(KeyValueStoreBackend):
             callback = maybe_signature(request.chord, app=app)
             total = callback['chord_size']
             if readycount == total:
-                decode, unpack = self.decode_result, self._unpack_chord_result
+                decode, unpack = self.decode, self._unpack_chord_result
                 resl, _ = client.pipeline()     \
                     .lrange(jkey, 0, total)     \
                     .delete(jkey)               \

+ 20 - 6
celery/canvas.py

@@ -99,6 +99,12 @@ def maybe_unroll_group(g):
         return g.tasks[0] if size == 1 else g
 
 
+def _upgrade(fields, sig):
+    """Used by custom signatures in .from_dict, to keep common fields."""
+    sig.update(chord_size=fields.get('chord_size'))
+    return sig
+
+
 class Signature(dict):
     """Class that wraps the arguments and execution options
     for a single task invocation.
@@ -158,7 +164,8 @@ class Signature(dict):
              kwargs=kwargs or {},
              options=dict(options or {}, **ex),
              subtask_type=subtask_type,
-             immutable=immutable)
+             immutable=immutable,
+             chord_size=None)
 
     def __call__(self, *partial_args, **partial_kwargs):
         args, kwargs, _ = self._merge(partial_args, partial_kwargs, None)
@@ -190,6 +197,7 @@ class Signature(dict):
         s = Signature.from_dict({'task': self.task, 'args': tuple(args),
                                  'kwargs': kwargs, 'options': deepcopy(opts),
                                  'subtask_type': self.subtask_type,
+                                 'chord_size': self.chord_size,
                                  'immutable': self.immutable},
                                 app=app or self._app)
         s._type = self._type
@@ -333,6 +341,7 @@ class Signature(dict):
     kwargs = _getitem_property('kwargs')
     options = _getitem_property('options')
     subtask_type = _getitem_property('subtask_type')
+    chord_size = _getitem_property('chord_size')
     immutable = _getitem_property('immutable')
 
 
@@ -358,7 +367,7 @@ class chain(Signature):
         if d['args'] and tasks:
             # partial args passed on to first task in chain (Issue #1057).
             tasks[0]['args'] = tasks[0]._merge(d['args'])[0]
-        return chain(*tasks, app=app, **kwdict(d['options']))
+        return _upgrade(d, chain(*tasks, app=app, **d['options']))
 
     @property
     def type(self):
@@ -390,7 +399,9 @@ class _basemap(Signature):
 
     @classmethod
     def from_dict(cls, d, app=None):
-        return cls(*cls._unpack_args(d['kwargs']), app=app, **d['options'])
+        return _upgrade(
+            d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
+        )
 
 
 @Signature.register_type
@@ -426,7 +437,10 @@ class chunks(Signature):
 
     @classmethod
     def from_dict(self, d, app=None):
-        return chunks(*self._unpack_args(d['kwargs']), app=app, **d['options'])
+        return _upgrade(
+            d, chunks(*self._unpack_args(
+                d['kwargs']), app=app, **d['options']),
+        )
 
     def apply_async(self, args=(), kwargs={}, **opts):
         return self.group().apply_async(args, kwargs, **opts)
@@ -479,7 +493,7 @@ class group(Signature):
             # partial args passed on to all tasks in the group (Issue #1057).
             for task in tasks:
                 task['args'] = task._merge(d['args'])[0]
-        return group(tasks, app=app, **kwdict(d['options']))
+        return _upgrade(d, group(tasks, app=app, **kwdict(d['options'])))
 
     def apply_async(self, args=(), kwargs=None, add_to_parent=True, **options):
         tasks = _maybe_clone(self.tasks, app=self._app)
@@ -573,7 +587,7 @@ class chord(Signature):
     @classmethod
     def from_dict(self, d, app=None):
         args, d['kwargs'] = self._unpack_args(**kwdict(d['kwargs']))
-        return self(*args, app=app, **kwdict(d))
+        return _upgrade(d, self(*args, app=app, **kwdict(d)))
 
     @staticmethod
     def _unpack_args(header=None, body=None, **kwargs):