|
@@ -12,6 +12,7 @@
|
|
|
"""
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
+from collections import deque
|
|
|
from copy import deepcopy
|
|
|
from functools import partial as _partial, reduce
|
|
|
from operator import itemgetter
|
|
@@ -19,7 +20,7 @@ from itertools import chain as _chain
|
|
|
|
|
|
from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
|
|
|
|
|
|
-from celery._state import current_app
|
|
|
+from celery._state import current_app, get_current_worker_task
|
|
|
from celery.utils.functional import (
|
|
|
maybe_list, is_list, regen,
|
|
|
chunks as _chunks,
|
|
@@ -194,12 +195,13 @@ class Signature(dict):
|
|
|
return s
|
|
|
partial = clone
|
|
|
|
|
|
- def freeze(self, _id=None, group_id=None, chord=None):
|
|
|
+ def freeze(self, _id=None, group_id=None, chord=None, root_id=None):
|
|
|
opts = self.options
|
|
|
try:
|
|
|
tid = opts['task_id']
|
|
|
except KeyError:
|
|
|
tid = opts['task_id'] = _id or uuid()
|
|
|
+ root_id = opts.setdefault('root_id', root_id)
|
|
|
if 'reply_to' not in opts:
|
|
|
opts['reply_to'] = self.app.oid
|
|
|
if group_id:
|
|
@@ -348,6 +350,99 @@ class chain(Signature):
|
|
|
if self.tasks:
|
|
|
return self.apply_async(args, kwargs)
|
|
|
|
|
|
+ def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
|
|
|
+ task_id=None, link=None, link_error=None,
|
|
|
+ publisher=None, root_id=None, **options):
|
|
|
+ app = self.app
|
|
|
+ if app.conf.CELERY_ALWAYS_EAGER:
|
|
|
+ return self.apply(args, kwargs, **options)
|
|
|
+ tasks, results = self.prepare_steps(
|
|
|
+ args, self.tasks, root_id, link_error,
|
|
|
+ )
|
|
|
+ if not results:
|
|
|
+ return
|
|
|
+ result = results[-1]
|
|
|
+ last_task = tasks[-1]
|
|
|
+ if group_id:
|
|
|
+ last_task.set(group_id=group_id)
|
|
|
+ if chord:
|
|
|
+ last_task.set(chord=chord)
|
|
|
+ if task_id:
|
|
|
+ last_task.set(task_id=task_id)
|
|
|
+ result = last_task.type.AsyncResult(task_id)
|
|
|
+ # make sure we can do a link() and link_error() on a chain object.
|
|
|
+ if link:
|
|
|
+ tasks[-1].set(link=link)
|
|
|
+ tasks[0].apply_async(**options)
|
|
|
+ return result
|
|
|
+
|
|
|
+ def prepare_steps(self, args, tasks,
|
|
|
+ root_id=None, link_error=None, app=None):
|
|
|
+ app = app or self.app
|
|
|
+ steps = deque(tasks)
|
|
|
+ next_step = prev_task = prev_res = None
|
|
|
+ tasks, results = [], []
|
|
|
+ i = 0
|
|
|
+ while steps:
|
|
|
+ task = steps.popleft()
|
|
|
+ if not i: # first task
|
|
|
+ # first task gets partial args from chain
|
|
|
+ task = task.clone(args)
|
|
|
+ res = task.freeze(root_id=root_id)
|
|
|
+ root_id = res.id if root_id is None else root_id
|
|
|
+ else:
|
|
|
+ task = task.clone()
|
|
|
+ res = task.freeze(root_id=root_id)
|
|
|
+ i += 1
|
|
|
+
|
|
|
+ if isinstance(task, group):
|
|
|
+ task = maybe_unroll_group(task)
|
|
|
+
|
|
|
+ if isinstance(task, chain):
|
|
|
+ # splice the chain
|
|
|
+ steps.extendleft(reversed(task.tasks))
|
|
|
+ continue
|
|
|
+ elif isinstance(task, group) and steps and \
|
|
|
+ not isinstance(steps[0], group):
|
|
|
+ # automatically upgrade group(...) | s to chord(group, s)
|
|
|
+ try:
|
|
|
+ next_step = steps.popleft()
|
|
|
+ # for chords we freeze by pretending it's a normal
|
|
|
+ # signature instead of a group.
|
|
|
+ res = Signature.freeze(next_step)
|
|
|
+ task = chord(
|
|
|
+ task, body=next_step,
|
|
|
+ task_id=res.task_id, root_id=root_id,
|
|
|
+ )
|
|
|
+ except IndexError:
|
|
|
+ pass # no callback, so keep as group.
|
|
|
+
|
|
|
+ if prev_task:
|
|
|
+ # link previous task to this task.
|
|
|
+ prev_task.link(task)
|
|
|
+ # set AsyncResult.parent
|
|
|
+ if not res.parent:
|
|
|
+ res.parent = prev_res
|
|
|
+
|
|
|
+ if link_error:
|
|
|
+ task.set(link_error=link_error)
|
|
|
+
|
|
|
+ if not isinstance(prev_task, chord):
|
|
|
+ results.append(res)
|
|
|
+ tasks.append(task)
|
|
|
+ prev_task, prev_res = task, res
|
|
|
+
|
|
|
+ return tasks, results
|
|
|
+
|
|
|
+ def apply(self, args=(), kwargs={}, **options):
|
|
|
+ last, fargs = None, args
|
|
|
+ for task in self.tasks:
|
|
|
+ res = task.clone(fargs).apply(
|
|
|
+ last and (last.get(), ), **options
|
|
|
+ )
|
|
|
+ res.parent, last, fargs = last, res, None
|
|
|
+ return last
|
|
|
+
|
|
|
@classmethod
|
|
|
def from_dict(self, d, app=None):
|
|
|
tasks = d['kwargs']['tasks']
|
|
@@ -357,11 +452,14 @@ class chain(Signature):
|
|
|
return chain(*d['kwargs']['tasks'], app=app, **kwdict(d['options']))
|
|
|
|
|
|
@property
|
|
|
- def type(self):
|
|
|
- try:
|
|
|
- return self._type or self.tasks[0].type.app.tasks['celery.chain']
|
|
|
- except KeyError:
|
|
|
- return self.app.tasks['celery.chain']
|
|
|
+ def app(self):
|
|
|
+ app = self._app
|
|
|
+ if app is None:
|
|
|
+ try:
|
|
|
+ app = self.tasks[0]._app
|
|
|
+ except (KeyError, IndexError):
|
|
|
+ pass
|
|
|
+ return app or current_app
|
|
|
|
|
|
def __repr__(self):
|
|
|
return ' | '.join(repr(t) for t in self.tasks)
|
|
@@ -452,11 +550,6 @@ def _maybe_group(tasks):
|
|
|
return tasks
|
|
|
|
|
|
|
|
|
-def _maybe_clone(tasks, app):
|
|
|
- return [s.clone() if isinstance(s, Signature) else signature(s, app=app)
|
|
|
- for s in tasks]
|
|
|
-
|
|
|
-
|
|
|
@Signature.register_type
|
|
|
class group(Signature):
|
|
|
|
|
@@ -477,14 +570,58 @@ class group(Signature):
|
|
|
task['args'] = task._merge(d['args'])[0]
|
|
|
return group(tasks, app=app, **kwdict(d['options']))
|
|
|
|
|
|
- def apply_async(self, args=(), kwargs=None, add_to_parent=True, **options):
|
|
|
- tasks = _maybe_clone(self.tasks, app=self._app)
|
|
|
- if not tasks:
|
|
|
+ def _prepared(self, tasks, partial_args, group_id, root_id):
|
|
|
+ for task in tasks:
|
|
|
+ task = task.clone(partial_args)
|
|
|
+ yield task, task.freeze(group_id=group_id, root_id=root_id)
|
|
|
+
|
|
|
+ def _apply_tasks(self, tasks, producer=None, app=None, **options):
|
|
|
+ app = app or self.app
|
|
|
+ with app.producer_or_acquire(producer) as producer:
|
|
|
+ for sig, res in tasks:
|
|
|
+ sig.apply_async(producer=producer, add_to_parent=False,
|
|
|
+ **options)
|
|
|
+ yield res
|
|
|
+
|
|
|
+ def _freeze_gid(self, options):
|
|
|
+ # remove task_id and use that as the group_id,
|
|
|
+ # if we don't remove it then every task will have the same id...
|
|
|
+ options = dict(self.options, **options)
|
|
|
+ options['group_id'] = group_id = (
|
|
|
+ options.pop('task_id', uuid()))
|
|
|
+ return options, group_id, options.get('root_id')
|
|
|
+
|
|
|
+ def apply_async(self, args=(), kwargs=None, add_to_parent=True,
|
|
|
+ producer=None, **options):
|
|
|
+ app = self.app
|
|
|
+ if app.conf.CELERY_ALWAYS_EAGER:
|
|
|
+ return self.apply(args, kwargs, **options)
|
|
|
+ if not self.tasks:
|
|
|
return self.freeze()
|
|
|
- type = self.type
|
|
|
+
|
|
|
+ options, group_id, root_id = self._freeze_gid(options)
|
|
|
+ tasks = self._prepared(self.tasks, args, group_id, root_id)
|
|
|
+ result = self.app.GroupResult(
|
|
|
+ group_id, list(self._apply_tasks(tasks, producer, app, **options)),
|
|
|
+ )
|
|
|
+ parent_task = get_current_worker_task()
|
|
|
+ if add_to_parent and parent_task:
|
|
|
+ parent_task.add_trail(result)
|
|
|
+ return result
|
|
|
+
|
|
|
return type(*type.prepare(dict(self.options, **options), tasks, args),
|
|
|
add_to_parent=add_to_parent)
|
|
|
|
|
|
+ def apply(self, args=(), kwargs={}, **options):
|
|
|
+ app = self.app
|
|
|
+ if not self.tasks:
|
|
|
+ return self.freeze() # empty group returns GroupResult
|
|
|
+ options, group_id, root_id = self._freeze_gid(options)
|
|
|
+ tasks = self._prepared(self.tasks, args, group_id, root_id)
|
|
|
+ return app.GroupResult(group_id, [
|
|
|
+ sig.apply(**options) for sig, _ in tasks
|
|
|
+ ])
|
|
|
+
|
|
|
def set_immutable(self, immutable):
|
|
|
for task in self.tasks:
|
|
|
task.set_immutable(immutable)
|
|
@@ -498,15 +635,10 @@ class group(Signature):
|
|
|
sig = sig.clone().set(immutable=True)
|
|
|
return self.tasks[0].link_error(sig)
|
|
|
|
|
|
- def apply(self, *args, **kwargs):
|
|
|
- if not self.tasks:
|
|
|
- return self.freeze() # empty group returns GroupResult
|
|
|
- return Signature.apply(self, *args, **kwargs)
|
|
|
-
|
|
|
def __call__(self, *partial_args, **options):
|
|
|
return self.apply_async(partial_args, **options)
|
|
|
|
|
|
- def freeze(self, _id=None, group_id=None, chord=None):
|
|
|
+ def freeze(self, _id=None, group_id=None, chord=None, root_id=None):
|
|
|
opts = self.options
|
|
|
try:
|
|
|
gid = opts['task_id']
|
|
@@ -516,10 +648,13 @@ class group(Signature):
|
|
|
opts['group_id'] = group_id
|
|
|
if chord:
|
|
|
opts['chord'] = group_id
|
|
|
+ root_id = opts.setdefault('root_id', root_id)
|
|
|
new_tasks, results = [], []
|
|
|
for task in self.tasks:
|
|
|
task = maybe_signature(task, app=self._app).clone()
|
|
|
- results.append(task.freeze(group_id=group_id, chord=chord))
|
|
|
+ results.append(task.freeze(
|
|
|
+ group_id=group_id, chord=chord, root_id=root_id,
|
|
|
+ ))
|
|
|
new_tasks.append(task)
|
|
|
self.tasks = self.kwargs['tasks'] = new_tasks
|
|
|
return self.app.GroupResult(gid, results)
|
|
@@ -538,14 +673,14 @@ class group(Signature):
|
|
|
return repr(self.tasks)
|
|
|
|
|
|
@property
|
|
|
- def type(self):
|
|
|
- if self._type:
|
|
|
- return self._type
|
|
|
- # taking the app from the first task in the list, there may be a
|
|
|
- # better solution for this, e.g. to consolidate tasks with the same
|
|
|
- # app and apply them in batches.
|
|
|
- app = self._app if self._app else self.tasks[0].type.app
|
|
|
- return app.tasks[self['task']]
|
|
|
+ def app(self):
|
|
|
+ app = self._app
|
|
|
+ if app is None:
|
|
|
+ try:
|
|
|
+ app = self.tasks[0]._app
|
|
|
+ except (KeyError, IndexError):
|
|
|
+ pass
|
|
|
+ return app if app is not None else current_app
|
|
|
|
|
|
|
|
|
@Signature.register_type
|
|
@@ -560,8 +695,8 @@ class chord(Signature):
|
|
|
)
|
|
|
self.subtask_type = 'chord'
|
|
|
|
|
|
- def freeze(self, _id=None, group_id=None, chord=None):
|
|
|
- return self.body.freeze(_id, group_id=group_id, chord=chord)
|
|
|
+ def freeze(self, *args, **kwargs):
|
|
|
+ return self.body.freeze(*args, **kwargs)
|
|
|
|
|
|
@classmethod
|
|
|
def from_dict(self, d, app=None):
|
|
@@ -574,20 +709,14 @@ class chord(Signature):
|
|
|
# than manually popping things off.
|
|
|
return (header, body), kwargs
|
|
|
|
|
|
- @property
|
|
|
- def type(self):
|
|
|
- if self._type:
|
|
|
- return self._type
|
|
|
- # we will be able to fix this mess in 3.2 when we no longer
|
|
|
- # require an actual task implementation for chord/group
|
|
|
- if self._app:
|
|
|
- app = self._app
|
|
|
- else:
|
|
|
- try:
|
|
|
- app = self.tasks[0].type.app
|
|
|
- except IndexError:
|
|
|
- app = self.body.type.app
|
|
|
- return app.tasks['celery.chord']
|
|
|
+ @cached_property
|
|
|
+ def app(self):
|
|
|
+ app = self._app
|
|
|
+ if app is None:
|
|
|
+ app = self.tasks[0]._app
|
|
|
+ if app is None:
|
|
|
+ app = self.body._app
|
|
|
+ return app if app is not None else current_app
|
|
|
|
|
|
def apply_async(self, args=(), kwargs={}, task_id=None,
|
|
|
producer=None, publisher=None, connection=None,
|
|
@@ -595,14 +724,41 @@ class chord(Signature):
|
|
|
body = kwargs.get('body') or self.kwargs['body']
|
|
|
kwargs = dict(self.kwargs, **kwargs)
|
|
|
body = body.clone(**options)
|
|
|
+ app = self.app
|
|
|
+ tasks = (self.tasks.clone() if isinstance(self.tasks, group)
|
|
|
+ else group(self.tasks))
|
|
|
+ if app.conf.CELERY_ALWAYS_EAGER:
|
|
|
+ return self.apply((), kwargs,
|
|
|
+ body=body, task_id=task_id, **options)
|
|
|
+ return self.run(tasks, body, args, task_id=task_id, **options)
|
|
|
+
|
|
|
+ def apply(self, args=(), kwargs={}, propagate=True, body=None, **options):
|
|
|
+ body = self.body if body is None else body
|
|
|
+ tasks = (self.tasks.clone() if isinstance(self.tasks, group)
|
|
|
+ else group(self.tasks))
|
|
|
+ return body.apply(
|
|
|
+ args=(tasks.apply().get(propagate=propagate), ),
|
|
|
+ )
|
|
|
|
|
|
- _chord = self.type
|
|
|
- if _chord.app.conf.CELERY_ALWAYS_EAGER:
|
|
|
- return self.apply((), kwargs, task_id=task_id, **options)
|
|
|
- res = body.freeze(task_id)
|
|
|
- parent = _chord(self.tasks, body, args, **options)
|
|
|
- res.parent = parent
|
|
|
- return res
|
|
|
+ def run(self, header, body, partial_args, app=None, interval=None,
|
|
|
+ countdown=1, max_retries=None, propagate=None, eager=False,
|
|
|
+ task_id=None, **options):
|
|
|
+ app = app or self.app
|
|
|
+ propagate = (app.conf.CELERY_CHORD_PROPAGATES
|
|
|
+ if propagate is None else propagate)
|
|
|
+ group_id = uuid()
|
|
|
+ root_id = body.options.get('root_id')
|
|
|
+ body.setdefault('chord_size', len(header.tasks))
|
|
|
+ results = header.freeze(
|
|
|
+ group_id=group_id, chord=body, root_id=root_id).results
|
|
|
+ bodyres = body.freeze(task_id, root_id=root_id)
|
|
|
+
|
|
|
+ parent = app.backend.apply_chord(
|
|
|
+ header, partial_args, group_id, body,
|
|
|
+ interval=interval, countdown=countdown,
|
|
|
+ max_retries=max_retries, propagate=propagate, result=results)
|
|
|
+ bodyres.parent = parent
|
|
|
+ return bodyres
|
|
|
|
|
|
def __call__(self, body=None, **options):
|
|
|
return self.apply_async((), {'body': body} if body else {}, **options)
|