Browse Source

Chords: Now supports setting the interval and other unlock args

- The interval can now be set as part of the chord subtasks kwargs::

    chord(header)(body, interval=10.0)

- In addition the chord unlock task now honors the Task.default_retry_delay
  option, used when none is specified, which also means that the default
  interval can also be changed using annotations::

    CELERY_ANNOTATIONS = {
        'celery.chord_unlock': {
            'default_retry_delay': 10.0,
        }
    }
Ask Solem 13 years ago
parent
commit
ae27f81245
2 changed files with 19 additions and 12 deletions
  1. 10 5
      celery/app/builtins.py
  2. 9 7
      celery/canvas.py

+ 10 - 5
celery/app/builtins.py

@@ -70,9 +70,12 @@ def add_unlock_chord_task(app):
     from celery.canvas import subtask
     from celery import result as _res
 
-    @app.task(name='celery.chord_unlock', max_retries=None)
-    def unlock_chord(group_id, callback, interval=1, propagate=False,
+    @app.task(name='celery.chord_unlock', max_retries=None,
+              default_retry_delay=1)
+    def unlock_chord(group_id, callback, interval=None, propagate=False,
             max_retries=None, result=None):
+        if interval is None:
+            interval = self.default_retry_delay
         result = _res.GroupResult(group_id, map(_res.AsyncResult, result))
         j = result.join_native if result.supports_native_join else result.join
         if result.ready():
@@ -308,15 +311,17 @@ def add_chord_task(app):
                 return self.apply(args, kwargs, **options)
             group_id = options.pop('group_id', None)
             chord = options.pop('chord', None)
-            header, body = (list(maybe_subtask(kwargs['header'])),
-                            maybe_subtask(kwargs['body']))
+            header = kwargs.pop('header')
+            body = kwargs.pop('body')
+            header, body = (list(maybe_subtask(header)),
+                            maybe_subtask(body))
             if group_id:
                 body.set(group_id=group_id)
             if chord:
                 body.set(chord=chord)
             callback_id = body.options.setdefault('task_id', task_id or uuid())
             parent = super(Chord, self).apply_async((header, body, args),
-                                                    **options)
+                                                     kwargs, **options)
             body_result = self.AsyncResult(callback_id)
             body_result.parent = parent
             return body_result

+ 9 - 7
celery/canvas.py

@@ -331,24 +331,26 @@ class chord(Signature):
     Chord = Chord
 
     def __init__(self, header, body=None, **options):
+        kwargs = options.get('kwargs') or {}
         Signature.__init__(self, 'celery.chord', (),
-                         {'header': _maybe_group(header),
-                          'body': maybe_subtask(body)}, **options)
+                         dict(kwargs, header=_maybe_group(header),
+                                      body=maybe_subtask(body)), **options)
         self.subtask_type = 'chord'
 
     @classmethod
     def from_dict(self, d):
         kwargs = d['kwargs']
-        return chord(kwargs['header'], kwargs.get('body'),
-                     **kwdict(d['options']))
+        header = kwargs.pop('header')
+        body = kwargs.pop('body', None)
+        return chord(header, body, **kwdict(d))
 
-    def __call__(self, body=None, **options):
+    def __call__(self, body=None, **kwargs):
         _chord = self.Chord
         body = self.kwargs['body'] = body or self.kwargs['body']
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
-            return self.apply((), {}, **options)
+            return self.apply((), kwargs)
         callback_id = body.options.setdefault('task_id', uuid())
-        _chord(**self.kwargs)
+        _chord(**dict(self.kwargs, **kwargs))
         return _chord.AsyncResult(callback_id)
 
     def clone(self, *args, **kwargs):