|
@@ -481,6 +481,21 @@ class Signature(dict):
|
|
|
return chain(self, other, app=self._app)
|
|
|
return NotImplemented
|
|
|
|
|
|
+ def election(self):
|
|
|
+ type = self.type
|
|
|
+ app = type.app
|
|
|
+ tid = self.options.get('task_id') or uuid()
|
|
|
+
|
|
|
+ with app.producer_or_acquire(None) as P:
|
|
|
+ props = type.backend.on_task_call(P, tid)
|
|
|
+ app.control.election(tid, 'task', self.clone(task_id=tid, **props),
|
|
|
+ connection=P.connection)
|
|
|
+ return type.AsyncResult(tid)
|
|
|
+
|
|
|
+ def reprcall(self, *args, **kwargs):
|
|
|
+ args, kwargs, _ = self._merge(args, kwargs, {}, force=True)
|
|
|
+ return reprcall(self['task'], args, kwargs)
|
|
|
+
|
|
|
def __deepcopy__(self, memo):
|
|
|
memo[id(self)] = self
|
|
|
return dict(self)
|
|
@@ -496,21 +511,6 @@ class Signature(dict):
|
|
|
def __json__(self):
|
|
|
return dict(self)
|
|
|
|
|
|
- def reprcall(self, *args, **kwargs):
|
|
|
- args, kwargs, _ = self._merge(args, kwargs, {}, force=True)
|
|
|
- return reprcall(self['task'], args, kwargs)
|
|
|
-
|
|
|
- def election(self):
|
|
|
- type = self.type
|
|
|
- app = type.app
|
|
|
- tid = self.options.get('task_id') or uuid()
|
|
|
-
|
|
|
- with app.producer_or_acquire(None) as P:
|
|
|
- props = type.backend.on_task_call(P, tid)
|
|
|
- app.control.election(tid, 'task', self.clone(task_id=tid, **props),
|
|
|
- connection=P.connection)
|
|
|
- return type.AsyncResult(tid)
|
|
|
-
|
|
|
def __repr__(self):
|
|
|
return self.reprcall()
|
|
|
|
|
@@ -611,6 +611,16 @@ class chain(Signature):
|
|
|
|
|
|
tasks = _getitem_property('kwargs.tasks', 'Tasks in chain.')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ tasks = d['kwargs']['tasks']
|
|
|
+ if tasks:
|
|
|
+ if isinstance(tasks, tuple): # aaaargh
|
|
|
+ tasks = d['kwargs']['tasks'] = list(tasks)
|
|
|
+ # First task must be signature object to get app
|
|
|
+ tasks[0] = maybe_signature(tasks[0], app=app)
|
|
|
+ return _upgrade(d, chain(tasks, app=app, **d['options']))
|
|
|
+
|
|
|
def __init__(self, *tasks, **options):
|
|
|
tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
|
|
|
else tasks)
|
|
@@ -789,16 +799,6 @@ class chain(Signature):
|
|
|
res.parent, last, fargs = last, res, None
|
|
|
return last
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(cls, d, app=None):
|
|
|
- tasks = d['kwargs']['tasks']
|
|
|
- if tasks:
|
|
|
- if isinstance(tasks, tuple): # aaaargh
|
|
|
- tasks = d['kwargs']['tasks'] = list(tasks)
|
|
|
- # First task must be signature object to get app
|
|
|
- tasks[0] = maybe_signature(tasks[0], app=app)
|
|
|
- return _upgrade(d, chain(tasks, app=app, **d['options']))
|
|
|
-
|
|
|
@property
|
|
|
def app(self):
|
|
|
app = self._app
|
|
@@ -820,6 +820,12 @@ class _basemap(Signature):
|
|
|
_task_name = None
|
|
|
_unpack_args = itemgetter('task', 'it')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ return _upgrade(
|
|
|
+ d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
|
|
|
+ )
|
|
|
+
|
|
|
def __init__(self, task, it, **options):
|
|
|
Signature.__init__(
|
|
|
self, self._task_name, (),
|
|
@@ -834,12 +840,6 @@ class _basemap(Signature):
|
|
|
route_name=task_name_from(self.kwargs.get('task')), **opts
|
|
|
)
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(cls, d, app=None):
|
|
|
- return _upgrade(
|
|
|
- d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
|
|
|
- )
|
|
|
-
|
|
|
|
|
|
@Signature.register_type
|
|
|
@python_2_unicode_compatible
|
|
@@ -855,8 +855,8 @@ class xmap(_basemap):
|
|
|
|
|
|
def __repr__(self):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return '[{0}(x) for x in {1}]'.format(task.task,
|
|
|
- truncate(repr(it), 100))
|
|
|
+ return '[{0}(x) for x in {1}]'.format(
|
|
|
+ task.task, truncate(repr(it), 100))
|
|
|
|
|
|
|
|
|
@Signature.register_type
|
|
@@ -868,8 +868,8 @@ class xstarmap(_basemap):
|
|
|
|
|
|
def __repr__(self):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return '[{0}(*x) for x in {1}]'.format(task.task,
|
|
|
- truncate(repr(it), 100))
|
|
|
+ return '[{0}(*x) for x in {1}]'.format(
|
|
|
+ task.task, truncate(repr(it), 100))
|
|
|
|
|
|
|
|
|
@Signature.register_type
|
|
@@ -878,6 +878,13 @@ class chunks(Signature):
|
|
|
|
|
|
_unpack_args = itemgetter('task', 'it', 'n')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ return _upgrade(
|
|
|
+ d, chunks(*cls._unpack_args(
|
|
|
+ d['kwargs']), app=app, **d['options']),
|
|
|
+ )
|
|
|
+
|
|
|
def __init__(self, task, it, n, **options):
|
|
|
Signature.__init__(
|
|
|
self, 'celery.chunks', (),
|
|
@@ -885,12 +892,8 @@ class chunks(Signature):
|
|
|
immutable=True, **options
|
|
|
)
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(cls, d, app=None):
|
|
|
- return _upgrade(
|
|
|
- d, chunks(*cls._unpack_args(
|
|
|
- d['kwargs']), app=app, **d['options']),
|
|
|
- )
|
|
|
+ def __call__(self, **options):
|
|
|
+ return self.apply_async(**options)
|
|
|
|
|
|
def apply_async(self, args=(), kwargs={}, **opts):
|
|
|
return self.group().apply_async(
|
|
@@ -898,9 +901,6 @@ class chunks(Signature):
|
|
|
route_name=task_name_from(self.kwargs.get('task')), **opts
|
|
|
)
|
|
|
|
|
|
- def __call__(self, **options):
|
|
|
- return self.apply_async(**options)
|
|
|
-
|
|
|
def group(self):
|
|
|
# need to evaluate generators
|
|
|
task, it, n = self._unpack_args(self.kwargs)
|
|
@@ -960,6 +960,12 @@ class group(Signature):
|
|
|
|
|
|
tasks = _getitem_property('kwargs.tasks', 'Tasks in group.')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ return _upgrade(
|
|
|
+ d, group(d['kwargs']['tasks'], app=app, **d['options']),
|
|
|
+ )
|
|
|
+
|
|
|
def __init__(self, *tasks, **options):
|
|
|
if len(tasks) == 1:
|
|
|
tasks = tasks[0]
|
|
@@ -972,14 +978,67 @@ class group(Signature):
|
|
|
)
|
|
|
self.subtask_type = 'group'
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(cls, d, app=None):
|
|
|
- return _upgrade(
|
|
|
- d, group(d['kwargs']['tasks'], app=app, **d['options']),
|
|
|
- )
|
|
|
+ def __call__(self, *partial_args, **options):
|
|
|
+ return self.apply_async(partial_args, **options)
|
|
|
|
|
|
- def __len__(self):
|
|
|
- return len(self.tasks)
|
|
|
+ def skew(self, start=1.0, stop=None, step=1.0):
|
|
|
+ it = fxrange(start, stop, step, repeatlast=True)
|
|
|
+ for task in self.tasks:
|
|
|
+ task.set(countdown=next(it))
|
|
|
+ return self
|
|
|
+
|
|
|
+ def apply_async(self, args=(), kwargs=None, add_to_parent=True,
|
|
|
+ producer=None, **options):
|
|
|
+ app = self.app
|
|
|
+ if app.conf.task_always_eager:
|
|
|
+ return self.apply(args, kwargs, **options)
|
|
|
+ if not self.tasks:
|
|
|
+ return self.freeze()
|
|
|
+
|
|
|
+ options, group_id, root_id = self._freeze_gid(options)
|
|
|
+ tasks = self._prepared(self.tasks, [], group_id, root_id, app)
|
|
|
+ p = barrier()
|
|
|
+ results = list(self._apply_tasks(tasks, producer, app, p,
|
|
|
+ args=args, kwargs=kwargs, **options))
|
|
|
+ result = self.app.GroupResult(group_id, results, ready_barrier=p)
|
|
|
+ p.finalize()
|
|
|
+
|
|
|
+ # - Special case of group(A.s() | group(B.s(), C.s()))
|
|
|
+ # That is, group with single item that's a chain but the
|
|
|
+ # last task in that chain is a group.
|
|
|
+ #
|
|
|
+ # We cannot actually support arbitrary GroupResults in chains,
|
|
|
+ # but this special case we can.
|
|
|
+ if len(result) == 1 and isinstance(result[0], GroupResult):
|
|
|
+ result = result[0]
|
|
|
+
|
|
|
+ parent_task = app.current_worker_task
|
|
|
+ if add_to_parent and parent_task:
|
|
|
+ parent_task.add_trail(result)
|
|
|
+ return result
|
|
|
+
|
|
|
+ def apply(self, args=(), kwargs={}, **options):
|
|
|
+ app = self.app
|
|
|
+ if not self.tasks:
|
|
|
+ return self.freeze() # empty group returns GroupResult
|
|
|
+ options, group_id, root_id = self._freeze_gid(options)
|
|
|
+ tasks = self._prepared(self.tasks, [], group_id, root_id, app)
|
|
|
+ return app.GroupResult(group_id, [
|
|
|
+ sig.apply(args=args, kwargs=kwargs, **options) for sig, _ in tasks
|
|
|
+ ])
|
|
|
+
|
|
|
+ def set_immutable(self, immutable):
|
|
|
+ for task in self.tasks:
|
|
|
+ task.set_immutable(immutable)
|
|
|
+
|
|
|
+ def link(self, sig):
|
|
|
+ # Simply link to first task
|
|
|
+ sig = sig.clone().set(immutable=True)
|
|
|
+ return self.tasks[0].link(sig)
|
|
|
+
|
|
|
+ def link_error(self, sig):
|
|
|
+ sig = sig.clone().set(immutable=True)
|
|
|
+ return self.tasks[0].link_error(sig)
|
|
|
|
|
|
def _prepared(self, tasks, partial_args, group_id, root_id, app,
|
|
|
CallableSignature=abstract.CallableSignature,
|
|
@@ -1041,76 +1100,6 @@ class group(Signature):
|
|
|
for task in self.tasks:
|
|
|
task.set_parent_id(parent_id)
|
|
|
|
|
|
- def apply_async(self, args=(), kwargs=None, add_to_parent=True,
|
|
|
- producer=None, **options):
|
|
|
- app = self.app
|
|
|
- if app.conf.task_always_eager:
|
|
|
- return self.apply(args, kwargs, **options)
|
|
|
- if not self.tasks:
|
|
|
- return self.freeze()
|
|
|
-
|
|
|
- options, group_id, root_id = self._freeze_gid(options)
|
|
|
- tasks = self._prepared(self.tasks, [], group_id, root_id, app)
|
|
|
- p = barrier()
|
|
|
- results = list(self._apply_tasks(tasks, producer, app, p,
|
|
|
- args=args, kwargs=kwargs, **options))
|
|
|
- result = self.app.GroupResult(group_id, results, ready_barrier=p)
|
|
|
- p.finalize()
|
|
|
-
|
|
|
- # - Special case of group(A.s() | group(B.s(), C.s()))
|
|
|
- # That is, group with single item that's a chain but the
|
|
|
- # last task in that chain is a group.
|
|
|
- #
|
|
|
- # We cannot actually support arbitrary GroupResults in chains,
|
|
|
- # but this special case we can.
|
|
|
- if len(result) == 1 and isinstance(result[0], GroupResult):
|
|
|
- result = result[0]
|
|
|
-
|
|
|
- parent_task = app.current_worker_task
|
|
|
- if add_to_parent and parent_task:
|
|
|
- parent_task.add_trail(result)
|
|
|
- return result
|
|
|
-
|
|
|
- def apply(self, args=(), kwargs={}, **options):
|
|
|
- app = self.app
|
|
|
- if not self.tasks:
|
|
|
- return self.freeze() # empty group returns GroupResult
|
|
|
- options, group_id, root_id = self._freeze_gid(options)
|
|
|
- tasks = self._prepared(self.tasks, [], group_id, root_id, app)
|
|
|
- return app.GroupResult(group_id, [
|
|
|
- sig.apply(args=args, kwargs=kwargs, **options) for sig, _ in tasks
|
|
|
- ])
|
|
|
-
|
|
|
- def set_immutable(self, immutable):
|
|
|
- for task in self.tasks:
|
|
|
- task.set_immutable(immutable)
|
|
|
-
|
|
|
- def link(self, sig):
|
|
|
- # Simply link to first task
|
|
|
- sig = sig.clone().set(immutable=True)
|
|
|
- return self.tasks[0].link(sig)
|
|
|
-
|
|
|
- def link_error(self, sig):
|
|
|
- sig = sig.clone().set(immutable=True)
|
|
|
- return self.tasks[0].link_error(sig)
|
|
|
-
|
|
|
- def __call__(self, *partial_args, **options):
|
|
|
- return self.apply_async(partial_args, **options)
|
|
|
-
|
|
|
- def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
|
|
|
- # pylint: disable=redefined-outer-name
|
|
|
- # XXX chord is also a class in outer scope.
|
|
|
- stack = deque(self.tasks)
|
|
|
- while stack:
|
|
|
- task = maybe_signature(stack.popleft(), app=self._app).clone()
|
|
|
- if isinstance(task, group):
|
|
|
- stack.extendleft(task.tasks)
|
|
|
- else:
|
|
|
- new_tasks.append(task)
|
|
|
- yield task.freeze(group_id=group_id,
|
|
|
- chord=chord, root_id=root_id,
|
|
|
- parent_id=parent_id)
|
|
|
-
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
# pylint: disable=redefined-outer-name
|
|
@@ -1139,11 +1128,19 @@ class group(Signature):
|
|
|
return self.app.GroupResult(gid, results)
|
|
|
_freeze = freeze
|
|
|
|
|
|
- def skew(self, start=1.0, stop=None, step=1.0):
|
|
|
- it = fxrange(start, stop, step, repeatlast=True)
|
|
|
- for task in self.tasks:
|
|
|
- task.set(countdown=next(it))
|
|
|
- return self
|
|
|
+ def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
+ stack = deque(self.tasks)
|
|
|
+ while stack:
|
|
|
+ task = maybe_signature(stack.popleft(), app=self._app).clone()
|
|
|
+ if isinstance(task, group):
|
|
|
+ stack.extendleft(task.tasks)
|
|
|
+ else:
|
|
|
+ new_tasks.append(task)
|
|
|
+ yield task.freeze(group_id=group_id,
|
|
|
+ chord=chord, root_id=root_id,
|
|
|
+ parent_id=parent_id)
|
|
|
|
|
|
def __iter__(self):
|
|
|
return iter(self.tasks)
|
|
@@ -1151,6 +1148,9 @@ class group(Signature):
|
|
|
def __repr__(self):
|
|
|
return 'group({0.tasks!r})'.format(self)
|
|
|
|
|
|
+ def __len__(self):
|
|
|
+ return len(self.tasks)
|
|
|
+
|
|
|
@property
|
|
|
def app(self):
|
|
|
app = self._app
|
|
@@ -1191,6 +1191,17 @@ class chord(Signature):
|
|
|
12
|
|
|
"""
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ args, d['kwargs'] = cls._unpack_args(**d['kwargs'])
|
|
|
+ return _upgrade(d, cls(*args, app=app, **d))
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _unpack_args(header=None, body=None, **kwargs):
|
|
|
+ # Python signatures are better at extracting keys from dicts
|
|
|
+ # than manually popping things off.
|
|
|
+ return (header, body), kwargs
|
|
|
+
|
|
|
def __init__(self, header, body=None, task='celery.chord',
|
|
|
args=(), kwargs={}, app=None, **options):
|
|
|
Signature.__init__(
|
|
@@ -1200,6 +1211,9 @@ class chord(Signature):
|
|
|
)
|
|
|
self.subtask_type = 'chord'
|
|
|
|
|
|
+ def __call__(self, body=None, **options):
|
|
|
+ return self.apply_async((), {'body': body} if body else {}, **options)
|
|
|
+
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
# pylint: disable=redefined-outer-name
|
|
@@ -1221,33 +1235,6 @@ class chord(Signature):
|
|
|
task.set_parent_id(parent_id)
|
|
|
self.parent_id = parent_id
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(cls, d, app=None):
|
|
|
- args, d['kwargs'] = cls._unpack_args(**d['kwargs'])
|
|
|
- return _upgrade(d, cls(*args, app=app, **d))
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def _unpack_args(header=None, body=None, **kwargs):
|
|
|
- # Python signatures are better at extracting keys from dicts
|
|
|
- # than manually popping things off.
|
|
|
- return (header, body), kwargs
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def app(self):
|
|
|
- return self._get_app(self.body)
|
|
|
-
|
|
|
- def _get_app(self, body=None):
|
|
|
- app = self._app
|
|
|
- if app is None:
|
|
|
- try:
|
|
|
- tasks = self.tasks.tasks # is a group
|
|
|
- except AttributeError:
|
|
|
- tasks = self.tasks
|
|
|
- app = tasks[0]._app
|
|
|
- if app is None and body is not None:
|
|
|
- app = body._app
|
|
|
- return app if app is not None else current_app
|
|
|
-
|
|
|
def apply_async(self, args=(), kwargs={}, task_id=None,
|
|
|
producer=None, publisher=None, connection=None,
|
|
|
router=None, result_cls=None, **options):
|
|
@@ -1309,9 +1296,6 @@ class chord(Signature):
|
|
|
bodyres.parent = parent
|
|
|
return bodyres
|
|
|
|
|
|
- def __call__(self, body=None, **options):
|
|
|
- return self.apply_async((), {'body': body} if body else {}, **options)
|
|
|
-
|
|
|
def clone(self, *args, **kwargs):
|
|
|
s = Signature.clone(self, *args, **kwargs)
|
|
|
# need to make copy of body
|
|
@@ -1339,6 +1323,22 @@ class chord(Signature):
|
|
|
return self.body.reprcall(self.tasks)
|
|
|
return '<chord without body: {0.tasks!r}>'.format(self)
|
|
|
|
|
|
+ @cached_property
|
|
|
+ def app(self):
|
|
|
+ return self._get_app(self.body)
|
|
|
+
|
|
|
+ def _get_app(self, body=None):
|
|
|
+ app = self._app
|
|
|
+ if app is None:
|
|
|
+ try:
|
|
|
+ tasks = self.tasks.tasks # is a group
|
|
|
+ except AttributeError:
|
|
|
+ tasks = self.tasks
|
|
|
+ app = tasks[0]._app
|
|
|
+ if app is None and body is not None:
|
|
|
+ app = body._app
|
|
|
+ return app if app is not None else current_app
|
|
|
+
|
|
|
tasks = _getitem_property('kwargs.header', 'Tasks in chord header.')
|
|
|
body = _getitem_property('kwargs.body', 'Body task of chord.')
|
|
|
|