Procházet zdrojové kódy

Merge branch 'master' of github.com:celery/celery

Ask Solem před 13 roky
rodič
revize
a0cc134b91
2 změnil soubory, kde provedl 7 přidání a 2 odebrání
  1. 6 1
      celery/app/builtins.py
  2. 1 1
      celery/backends/base.py

+ 6 - 1
celery/app/builtins.py

@@ -250,9 +250,14 @@ def add_chord_task(app):
         def apply_async(self, args=(), kwargs={}, task_id=None, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
+            group_id = options.pop('group_id', None)
+            chord = options.pop('chord', None)
             header, body = (list(kwargs['header']),
                             maybe_subtask(kwargs['body']))
-
+            if group_id:
+                body.set(group_id=group_id)
+            if chord:
+                body.set(chord=chord)
             callback_id = body.options.setdefault('task_id', task_id or uuid())
             parent = super(Chord, self).apply_async((header, body), **options)
             body_result = self.AsyncResult(callback_id)

+ 1 - 1
celery/backends/base.py

@@ -469,7 +469,7 @@ class KeyValueStoreBackend(BaseDictBackend):
         key = self.get_key_for_chord(gid)
         deps = GroupResult.restore(gid, backend=task.backend)
         val = self.incr(key)
-        if val >= deps.total:
+        if val >= len(deps):
             subtask(task.request.chord).delay(deps.join(propagate=propagate))
             deps.delete()
             self.client.delete(key)