|
@@ -107,9 +107,39 @@ def _upgrade(fields, sig):
|
|
|
return sig
|
|
|
|
|
|
|
|
|
+def _seq_concat_item(seq, item):
|
|
|
+ """Return copy of sequence seq with item added.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Sequence: if seq is a tuple, the result will be a tuple,
|
|
|
+ otherwise it depends on the implementation of ``__add__``.
|
|
|
+ """
|
|
|
+ return seq + (item,) if isinstance(seq, tuple) else seq + [item]
|
|
|
+
|
|
|
+
|
|
|
+def _seq_concat_seq(a, b):
|
|
|
+ """Concatenate two sequences: ``a + b``.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Sequence: The return value will depend on the largest sequence
|
|
|
+ - if b is larger and is a tuple, the return value will be a tuple.
|
|
|
+ - if a is larger and is a list, the return value will be a list,
|
|
|
+ """
|
|
|
+ # find the type of the largest sequence
|
|
|
+ prefer = type(max([a, b], key=len))
|
|
|
+ # convert the smallest list to the type of the largest sequence.
|
|
|
+ if not isinstance(a, prefer):
|
|
|
+ a = prefer(a)
|
|
|
+ if not isinstance(b, prefer):
|
|
|
+ b = prefer(b)
|
|
|
+ return a + b
|
|
|
+
|
|
|
+
|
|
|
@abstract.CallableSignature.register
|
|
|
class Signature(dict):
|
|
|
- """Class that wraps the arguments and execution options
|
|
|
+ """Task Signature.
|
|
|
+
|
|
|
+ Class that wraps the arguments and execution options
|
|
|
for a single task invocation.
|
|
|
|
|
|
Used as the parts in a :class:`group` and other constructs,
|
|
@@ -118,7 +148,7 @@ class Signature(dict):
|
|
|
|
|
|
Signatures can also be created from tasks:
|
|
|
|
|
|
- - Using the ``.signature()`` method which has the same signature
|
|
|
+ - Using the ``.signature()`` method that has the same signature
|
|
|
as ``Task.apply_async``:
|
|
|
|
|
|
.. code-block:: pycon
|
|
@@ -183,26 +213,26 @@ class Signature(dict):
|
|
|
type=None, subtask_type=None, immutable=False,
|
|
|
app=None, **ex):
|
|
|
self._app = app
|
|
|
- init = dict.__init__
|
|
|
|
|
|
if isinstance(task, dict):
|
|
|
- return init(self, task) # works like dict(d)
|
|
|
-
|
|
|
- # Also supports using task class/instance instead of string name.
|
|
|
- try:
|
|
|
- task_name = task.name
|
|
|
- except AttributeError:
|
|
|
- task_name = task
|
|
|
+ super(Signature, self).__init__(task) # works like dict(d)
|
|
|
else:
|
|
|
- self._type = task
|
|
|
-
|
|
|
- init(self,
|
|
|
- task=task_name, args=tuple(args or ()),
|
|
|
- kwargs=kwargs or {},
|
|
|
- options=dict(options or {}, **ex),
|
|
|
- subtask_type=subtask_type,
|
|
|
- immutable=immutable,
|
|
|
- chord_size=None)
|
|
|
+ # Also supports using task class/instance instead of string name.
|
|
|
+ try:
|
|
|
+ task_name = task.name
|
|
|
+ except AttributeError:
|
|
|
+ task_name = task
|
|
|
+ else:
|
|
|
+ self._type = task
|
|
|
+
|
|
|
+ super(Signature, self).__init__(
|
|
|
+ task=task_name, args=tuple(args or ()),
|
|
|
+ kwargs=kwargs or {},
|
|
|
+ options=dict(options or {}, **ex),
|
|
|
+ subtask_type=subtask_type,
|
|
|
+ immutable=immutable,
|
|
|
+ chord_size=None,
|
|
|
+ )
|
|
|
|
|
|
def __call__(self, *partial_args, **partial_kwargs):
|
|
|
"""Call the task directly (in the current process)."""
|
|
@@ -214,8 +244,11 @@ class Signature(dict):
|
|
|
return self.apply_async(partial_args, partial_kwargs)
|
|
|
|
|
|
def apply(self, args=(), kwargs={}, **options):
|
|
|
- """Same as :meth:`apply_async` but executed the task inline instead
|
|
|
- of sending a task message."""
|
|
|
+ """Call task locally.
|
|
|
+
|
|
|
+ Same as :meth:`apply_async` but executed the task inline instead
|
|
|
+ of sending a task message.
|
|
|
+ """
|
|
|
# For callbacks: extra args are prepended to the stored args.
|
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
|
return self.type.apply(args, kwargs, **options)
|
|
@@ -245,6 +278,8 @@ class Signature(dict):
|
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
|
else:
|
|
|
args, kwargs, options = self.args, self.kwargs, self.options
|
|
|
+ # pylint: disable=too-many-function-args
|
|
|
+ # Borks on this, as it's a property
|
|
|
return _apply(args, kwargs, **options)
|
|
|
|
|
|
def _merge(self, args=(), kwargs={}, options={}, force=False):
|
|
@@ -282,13 +317,15 @@ class Signature(dict):
|
|
|
root_id=None, parent_id=None):
|
|
|
"""Finalize the signature by adding a concrete task id.
|
|
|
|
|
|
- The task will not be called and you should not call the signature
|
|
|
- twice after freezing it as that will result in two task messages
|
|
|
+ The task won't be called and you shouldn't call the signature
|
|
|
+ twice after freezing it as that'll result in two task messages
|
|
|
using the same task id.
|
|
|
|
|
|
Returns:
|
|
|
~@AsyncResult: promise of future evaluation.
|
|
|
"""
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
opts = self.options
|
|
|
try:
|
|
|
tid = opts['task_id']
|
|
@@ -304,13 +341,17 @@ class Signature(dict):
|
|
|
opts['group_id'] = group_id
|
|
|
if chord:
|
|
|
opts['chord'] = chord
|
|
|
+ # pylint: disable=too-many-function-args
|
|
|
+ # Borks on this, as it's a property.
|
|
|
return self.AsyncResult(tid)
|
|
|
_freeze = freeze
|
|
|
|
|
|
def replace(self, args=None, kwargs=None, options=None):
|
|
|
"""Replace the args, kwargs or options set for this signature.
|
|
|
+
|
|
|
These are only replaced if the argument for the section is
|
|
|
- not :const:`None`."""
|
|
|
+ not :const:`None`.
|
|
|
+ """
|
|
|
s = self.clone()
|
|
|
if args is not None:
|
|
|
s.args = args
|
|
@@ -325,7 +366,7 @@ class Signature(dict):
|
|
|
|
|
|
Returns:
|
|
|
Signature: This is a chaining method call
|
|
|
- (i.e. it will return ``self``).
|
|
|
+ (i.e., it will return ``self``).
|
|
|
"""
|
|
|
if immutable is not None:
|
|
|
self.set_immutable(immutable)
|
|
@@ -355,8 +396,7 @@ class Signature(dict):
|
|
|
items.extend(maybe_list(value))
|
|
|
|
|
|
def link(self, callback):
|
|
|
- """Add a callback task to be applied if this task
|
|
|
- executes successfully.
|
|
|
+ """Add callback task to be applied if this task succeeds.
|
|
|
|
|
|
Returns:
|
|
|
Signature: the argument passed, for chaining
|
|
@@ -365,8 +405,7 @@ class Signature(dict):
|
|
|
return self.append_to_list_option('link', callback)
|
|
|
|
|
|
def link_error(self, errback):
|
|
|
- """Add a callback task to be applied if an error occurs
|
|
|
- while executing this task.
|
|
|
+ """Add callback task to be applied on error in task execution.
|
|
|
|
|
|
Returns:
|
|
|
Signature: the argument passed, for chaining
|
|
@@ -388,8 +427,10 @@ class Signature(dict):
|
|
|
return self
|
|
|
|
|
|
def flatten_links(self):
|
|
|
- """Return a recursive list of dependencies (unchain if you will,
|
|
|
- but with links intact)."""
|
|
|
+ """Return a recursive list of dependencies.
|
|
|
+
|
|
|
+ "unchain" if you will, but with links intact.
|
|
|
+ """
|
|
|
return list(_chain.from_iterable(_chain(
|
|
|
[[self]],
|
|
|
(link.flatten_links()
|
|
@@ -399,21 +440,50 @@ class Signature(dict):
|
|
|
def __or__(self, other):
|
|
|
if isinstance(self, group):
|
|
|
if isinstance(other, group):
|
|
|
+ # group() | group() -> single group
|
|
|
return group(_chain(self.tasks, other.tasks), app=self.app)
|
|
|
+ # group() | task -> chord
|
|
|
return chord(self, body=other, app=self._app)
|
|
|
elif isinstance(other, group):
|
|
|
+ # task | group() -> unroll group with one member
|
|
|
other = maybe_unroll_group(other)
|
|
|
-
|
|
|
+ return chain(self, other, app=self._app)
|
|
|
if not isinstance(self, chain) and isinstance(other, chain):
|
|
|
- return chain((self,) + other.tasks, app=self._app)
|
|
|
+ # task | chain -> chain
|
|
|
+ return chain(
|
|
|
+ _seq_concat_seq((self,), other.tasks), app=self._app)
|
|
|
elif isinstance(other, chain):
|
|
|
- return chain(*self.tasks + other.tasks, app=self._app)
|
|
|
+ # chain | chain -> chain
|
|
|
+ return chain(
|
|
|
+ _seq_concat_seq(self.tasks, other.tasks), app=self._app)
|
|
|
+ elif isinstance(self, chord):
|
|
|
+ sig = self.clone()
|
|
|
+ sig.body = sig.body | other
|
|
|
+ return sig
|
|
|
elif isinstance(other, Signature):
|
|
|
if isinstance(self, chain):
|
|
|
- return chain(*self.tasks + (other,), app=self._app)
|
|
|
+ # chain | task -> chain
|
|
|
+ return chain(
|
|
|
+ _seq_concat_item(self.tasks, other), app=self._app)
|
|
|
+ # task | task -> chain
|
|
|
return chain(self, other, app=self._app)
|
|
|
return NotImplemented
|
|
|
|
|
|
+ def election(self):
|
|
|
+ type = self.type
|
|
|
+ app = type.app
|
|
|
+ tid = self.options.get('task_id') or uuid()
|
|
|
+
|
|
|
+ with app.producer_or_acquire(None) as P:
|
|
|
+ props = type.backend.on_task_call(P, tid)
|
|
|
+ app.control.election(tid, 'task', self.clone(task_id=tid, **props),
|
|
|
+ connection=P.connection)
|
|
|
+ return type.AsyncResult(tid)
|
|
|
+
|
|
|
+ def reprcall(self, *args, **kwargs):
|
|
|
+ args, kwargs, _ = self._merge(args, kwargs, {}, force=True)
|
|
|
+ return reprcall(self['task'], args, kwargs)
|
|
|
+
|
|
|
def __deepcopy__(self, memo):
|
|
|
memo[id(self)] = self
|
|
|
return dict(self)
|
|
@@ -429,21 +499,6 @@ class Signature(dict):
|
|
|
def __json__(self):
|
|
|
return dict(self)
|
|
|
|
|
|
- def reprcall(self, *args, **kwargs):
|
|
|
- args, kwargs, _ = self._merge(args, kwargs, {}, force=True)
|
|
|
- return reprcall(self['task'], args, kwargs)
|
|
|
-
|
|
|
- def election(self):
|
|
|
- type = self.type
|
|
|
- app = type.app
|
|
|
- tid = self.options.get('task_id') or uuid()
|
|
|
-
|
|
|
- with app.producer_or_acquire(None) as P:
|
|
|
- props = type.backend.on_task_call(P, tid)
|
|
|
- app.control.election(tid, 'task', self.clone(task_id=tid, **props),
|
|
|
- connection=P.connection)
|
|
|
- return type.AsyncResult(tid)
|
|
|
-
|
|
|
def __repr__(self):
|
|
|
return self.reprcall()
|
|
|
|
|
@@ -489,13 +544,15 @@ class Signature(dict):
|
|
|
|
|
|
@Signature.register_type
|
|
|
class chain(Signature):
|
|
|
- """Chains tasks together, so that each tasks follows each other
|
|
|
+ """Chain tasks together.
|
|
|
+
|
|
|
+ Each tasks follows one another,
|
|
|
by being applied as a callback of the previous task.
|
|
|
|
|
|
Note:
|
|
|
If called with only one argument, then that argument must
|
|
|
- be an iterable of tasks to chain, which means you can
|
|
|
- use this with a generator expression.
|
|
|
+ be an iterable of tasks to chain: this allows us
|
|
|
+ to use generator expressions.
|
|
|
|
|
|
Example:
|
|
|
This is effectively :math:`((2 + 2) + 4)`:
|
|
@@ -524,7 +581,7 @@ class chain(Signature):
|
|
|
Arguments:
|
|
|
*tasks (Signature): List of task signatures to chain.
|
|
|
If only one argument is passed and that argument is
|
|
|
- an iterable, then that will be used as the list of signatures
|
|
|
+ an iterable, then that'll be used as the list of signatures
|
|
|
to chain instead. This means that you can use a generator
|
|
|
expression.
|
|
|
|
|
@@ -533,8 +590,19 @@ class chain(Signature):
|
|
|
task in the chain. When that task succeeed the next task in the
|
|
|
chain is applied, and so on.
|
|
|
"""
|
|
|
+
|
|
|
tasks = _getitem_property('kwargs.tasks', 'Tasks in chain.')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ tasks = d['kwargs']['tasks']
|
|
|
+ if tasks:
|
|
|
+ if isinstance(tasks, tuple): # aaaargh
|
|
|
+ tasks = d['kwargs']['tasks'] = list(tasks)
|
|
|
+ # First task must be signature object to get app
|
|
|
+ tasks[0] = maybe_signature(tasks[0], app=app)
|
|
|
+ return _upgrade(d, chain(tasks, app=app, **d['options']))
|
|
|
+
|
|
|
def __init__(self, *tasks, **options):
|
|
|
tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
|
|
|
else tasks)
|
|
@@ -550,8 +618,12 @@ class chain(Signature):
|
|
|
return self.apply_async(args, kwargs)
|
|
|
|
|
|
def clone(self, *args, **kwargs):
|
|
|
+ to_signature = maybe_signature
|
|
|
s = Signature.clone(self, *args, **kwargs)
|
|
|
- s.kwargs['tasks'] = [sig.clone() for sig in s.kwargs['tasks']]
|
|
|
+ s.kwargs['tasks'] = [
|
|
|
+ to_signature(sig, app=self._app, clone=True)
|
|
|
+ for sig in s.kwargs['tasks']
|
|
|
+ ]
|
|
|
return s
|
|
|
|
|
|
def apply_async(self, args=(), kwargs={}, **options):
|
|
@@ -565,6 +637,8 @@ class chain(Signature):
|
|
|
def run(self, args=(), kwargs={}, group_id=None, chord=None,
|
|
|
task_id=None, link=None, link_error=None,
|
|
|
producer=None, root_id=None, parent_id=None, app=None, **options):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
app = app or self.app
|
|
|
use_link = self._use_link
|
|
|
if use_link is None and app.conf.task_protocol == 1:
|
|
@@ -584,12 +658,17 @@ class chain(Signature):
|
|
|
if link:
|
|
|
tasks[0].extend_list_option('link', link)
|
|
|
first_task = tasks.pop()
|
|
|
- first_task.apply_async(
|
|
|
- chain=tasks if not use_link else None, **options)
|
|
|
+ # chain option may already be set, resulting in
|
|
|
+ # "multiple values for keyword argument 'chain'" error.
|
|
|
+ # Issue #3379.
|
|
|
+ options['chain'] = tasks if not use_link else None
|
|
|
+ first_task.apply_async(**options)
|
|
|
return results[0]
|
|
|
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
_, results = self._frozen = self.prepare_steps(
|
|
|
self.args, self.tasks, root_id, parent_id, None,
|
|
|
self.app, _id, group_id, chord, clone=False,
|
|
@@ -649,6 +728,7 @@ class chain(Signature):
|
|
|
task_id=prev_res.id, root_id=root_id, app=app,
|
|
|
)
|
|
|
prev_res = prev_prev_res
|
|
|
+
|
|
|
if is_last_task:
|
|
|
# chain(task_id=id) means task id is set for the last task
|
|
|
# in the chain. If the chord is part of a chord/group
|
|
@@ -672,7 +752,17 @@ class chain(Signature):
|
|
|
task.link(prev_task)
|
|
|
|
|
|
if prev_res:
|
|
|
- prev_res.parent = res
|
|
|
+ if isinstance(prev_task, chord):
|
|
|
+ # If previous task was a chord,
|
|
|
+ # the freeze above would have set a parent for
|
|
|
+ # us, but we'd be overwriting it here.
|
|
|
+
|
|
|
+ # so fix this relationship so it's:
|
|
|
+ # chord body -> group -> THIS RES
|
|
|
+ assert isinstance(prev_res.parent, GroupResult)
|
|
|
+ prev_res.parent.parent = res
|
|
|
+ else:
|
|
|
+ prev_res.parent = res
|
|
|
|
|
|
if is_first_task and parent_id is not None:
|
|
|
task.set_parent_id(parent_id)
|
|
@@ -702,16 +792,6 @@ class chain(Signature):
|
|
|
res.parent, last, fargs = last, res, None
|
|
|
return last
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(self, d, app=None):
|
|
|
- tasks = d['kwargs']['tasks']
|
|
|
- if tasks:
|
|
|
- if isinstance(tasks, tuple): # aaaargh
|
|
|
- tasks = d['kwargs']['tasks'] = list(tasks)
|
|
|
- # First task must be signature object to get app
|
|
|
- tasks[0] = maybe_signature(tasks[0], app=app)
|
|
|
- return _upgrade(d, chain(*tasks, app=app, **d['options']))
|
|
|
-
|
|
|
@property
|
|
|
def app(self):
|
|
|
app = self._app
|
|
@@ -723,6 +803,9 @@ class chain(Signature):
|
|
|
return app or current_app
|
|
|
|
|
|
def __repr__(self):
|
|
|
+ if not self.tasks:
|
|
|
+ return '<{0}@{1:#x}: empty>'.format(
|
|
|
+ type(self).__name__, id(self))
|
|
|
return ' | '.join(repr(t) for t in self.tasks)
|
|
|
|
|
|
|
|
@@ -730,6 +813,12 @@ class _basemap(Signature):
|
|
|
_task_name = None
|
|
|
_unpack_args = itemgetter('task', 'it')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ return _upgrade(
|
|
|
+ d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
|
|
|
+ )
|
|
|
+
|
|
|
def __init__(self, task, it, **options):
|
|
|
Signature.__init__(
|
|
|
self, self._task_name, (),
|
|
@@ -744,37 +833,49 @@ class _basemap(Signature):
|
|
|
route_name=task_name_from(self.kwargs.get('task')), **opts
|
|
|
)
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(cls, d, app=None):
|
|
|
- return _upgrade(
|
|
|
- d, cls(*cls._unpack_args(d['kwargs']), app=app, **d['options']),
|
|
|
- )
|
|
|
-
|
|
|
|
|
|
@Signature.register_type
|
|
|
class xmap(_basemap):
|
|
|
+ """Map operation for tasks.
|
|
|
+
|
|
|
+ Note:
|
|
|
+ Tasks executed sequentially in process, this is not a
|
|
|
+ parallel operation like :class:`group`.
|
|
|
+ """
|
|
|
+
|
|
|
_task_name = 'celery.map'
|
|
|
|
|
|
def __repr__(self):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return '[{0}(x) for x in {1}]'.format(task.task,
|
|
|
- truncate(repr(it), 100))
|
|
|
+ return '[{0}(x) for x in {1}]'.format(
|
|
|
+ task.task, truncate(repr(it), 100))
|
|
|
|
|
|
|
|
|
@Signature.register_type
|
|
|
class xstarmap(_basemap):
|
|
|
+ """Map operation for tasks, using star arguments."""
|
|
|
+
|
|
|
_task_name = 'celery.starmap'
|
|
|
|
|
|
def __repr__(self):
|
|
|
task, it = self._unpack_args(self.kwargs)
|
|
|
- return '[{0}(*x) for x in {1}]'.format(task.task,
|
|
|
- truncate(repr(it), 100))
|
|
|
+ return '[{0}(*x) for x in {1}]'.format(
|
|
|
+ task.task, truncate(repr(it), 100))
|
|
|
|
|
|
|
|
|
@Signature.register_type
|
|
|
class chunks(Signature):
|
|
|
+ """Partition of tasks in n chunks."""
|
|
|
+
|
|
|
_unpack_args = itemgetter('task', 'it', 'n')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ return _upgrade(
|
|
|
+ d, chunks(*cls._unpack_args(
|
|
|
+ d['kwargs']), app=app, **d['options']),
|
|
|
+ )
|
|
|
+
|
|
|
def __init__(self, task, it, n, **options):
|
|
|
Signature.__init__(
|
|
|
self, 'celery.chunks', (),
|
|
@@ -782,12 +883,8 @@ class chunks(Signature):
|
|
|
immutable=True, **options
|
|
|
)
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(self, d, app=None):
|
|
|
- return _upgrade(
|
|
|
- d, chunks(*self._unpack_args(
|
|
|
- d['kwargs']), app=app, **d['options']),
|
|
|
- )
|
|
|
+ def __call__(self, **options):
|
|
|
+ return self.apply_async(**options)
|
|
|
|
|
|
def apply_async(self, args=(), kwargs={}, **opts):
|
|
|
return self.group().apply_async(
|
|
@@ -795,9 +892,6 @@ class chunks(Signature):
|
|
|
route_name=task_name_from(self.kwargs.get('task')), **opts
|
|
|
)
|
|
|
|
|
|
- def __call__(self, **options):
|
|
|
- return self.apply_async(**options)
|
|
|
-
|
|
|
def group(self):
|
|
|
# need to evaluate generators
|
|
|
task, it, n = self._unpack_args(self.kwargs)
|
|
@@ -814,7 +908,7 @@ def _maybe_group(tasks, app):
|
|
|
if isinstance(tasks, dict):
|
|
|
tasks = signature(tasks, app=app)
|
|
|
|
|
|
- if isinstance(tasks, group):
|
|
|
+ if isinstance(tasks, (group, chain)):
|
|
|
tasks = tasks.tasks
|
|
|
elif isinstance(tasks, abstract.CallableSignature):
|
|
|
tasks = [tasks]
|
|
@@ -832,8 +926,8 @@ class group(Signature):
|
|
|
|
|
|
Note:
|
|
|
If only one argument is passed, and that argument is an iterable
|
|
|
- then that will be used as the list of tasks instead, which
|
|
|
- means you can use ``group`` with generator expressions.
|
|
|
+ then that'll be used as the list of tasks instead: this
|
|
|
+ allows us to use ``group`` with generator expressions.
|
|
|
|
|
|
Example:
|
|
|
>>> lazy_group = group([add.s(2, 2), add.s(4, 4)])
|
|
@@ -843,8 +937,8 @@ class group(Signature):
|
|
|
|
|
|
Arguments:
|
|
|
*tasks (Signature): A list of signatures that this group will call.
|
|
|
- If there is only one argument, and that argument is an iterable,
|
|
|
- then that will define the list of signatures instead.
|
|
|
+ If there's only one argument, and that argument is an iterable,
|
|
|
+ then that'll define the list of signatures instead.
|
|
|
**options (Any): Execution options applied to all tasks
|
|
|
in the group.
|
|
|
|
|
@@ -853,8 +947,15 @@ class group(Signature):
|
|
|
tasks in the group (and return a :class:`GroupResult` instance
|
|
|
that can be used to inspect the state of the group).
|
|
|
"""
|
|
|
+
|
|
|
tasks = _getitem_property('kwargs.tasks', 'Tasks in group.')
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ return _upgrade(
|
|
|
+ d, group(d['kwargs']['tasks'], app=app, **d['options']),
|
|
|
+ )
|
|
|
+
|
|
|
def __init__(self, *tasks, **options):
|
|
|
if len(tasks) == 1:
|
|
|
tasks = tasks[0]
|
|
@@ -867,14 +968,72 @@ class group(Signature):
|
|
|
)
|
|
|
self.subtask_type = 'group'
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(self, d, app=None):
|
|
|
- return _upgrade(
|
|
|
- d, group(d['kwargs']['tasks'], app=app, **d['options']),
|
|
|
- )
|
|
|
+ def __call__(self, *partial_args, **options):
|
|
|
+ return self.apply_async(partial_args, **options)
|
|
|
|
|
|
- def __len__(self):
|
|
|
- return len(self.tasks)
|
|
|
+ def skew(self, start=1.0, stop=None, step=1.0):
|
|
|
+ it = fxrange(start, stop, step, repeatlast=True)
|
|
|
+ for task in self.tasks:
|
|
|
+ task.set(countdown=next(it))
|
|
|
+ return self
|
|
|
+
|
|
|
+ def apply_async(self, args=(), kwargs=None, add_to_parent=True,
|
|
|
+ producer=None, link=None, link_error=None, **options):
|
|
|
+ if link is not None:
|
|
|
+ raise TypeError('Cannot add link to group: use a chord')
|
|
|
+ if link_error is not None:
|
|
|
+ raise TypeError(
|
|
|
+ 'Cannot add link to group: do that on individual tasks')
|
|
|
+ app = self.app
|
|
|
+ if app.conf.task_always_eager:
|
|
|
+ return self.apply(args, kwargs, **options)
|
|
|
+ if not self.tasks:
|
|
|
+ return self.freeze()
|
|
|
+
|
|
|
+ options, group_id, root_id = self._freeze_gid(options)
|
|
|
+ tasks = self._prepared(self.tasks, [], group_id, root_id, app)
|
|
|
+ p = barrier()
|
|
|
+ results = list(self._apply_tasks(tasks, producer, app, p,
|
|
|
+ args=args, kwargs=kwargs, **options))
|
|
|
+ result = self.app.GroupResult(group_id, results, ready_barrier=p)
|
|
|
+ p.finalize()
|
|
|
+
|
|
|
+ # - Special case of group(A.s() | group(B.s(), C.s()))
|
|
|
+ # That is, group with single item that's a chain but the
|
|
|
+ # last task in that chain is a group.
|
|
|
+ #
|
|
|
+ # We cannot actually support arbitrary GroupResults in chains,
|
|
|
+ # but this special case we can.
|
|
|
+ if len(result) == 1 and isinstance(result[0], GroupResult):
|
|
|
+ result = result[0]
|
|
|
+
|
|
|
+ parent_task = app.current_worker_task
|
|
|
+ if add_to_parent and parent_task:
|
|
|
+ parent_task.add_trail(result)
|
|
|
+ return result
|
|
|
+
|
|
|
+ def apply(self, args=(), kwargs={}, **options):
|
|
|
+ app = self.app
|
|
|
+ if not self.tasks:
|
|
|
+ return self.freeze() # empty group returns GroupResult
|
|
|
+ options, group_id, root_id = self._freeze_gid(options)
|
|
|
+ tasks = self._prepared(self.tasks, [], group_id, root_id, app)
|
|
|
+ return app.GroupResult(group_id, [
|
|
|
+ sig.apply(args=args, kwargs=kwargs, **options) for sig, _ in tasks
|
|
|
+ ])
|
|
|
+
|
|
|
+ def set_immutable(self, immutable):
|
|
|
+ for task in self.tasks:
|
|
|
+ task.set_immutable(immutable)
|
|
|
+
|
|
|
+ def link(self, sig):
|
|
|
+ # Simply link to first task
|
|
|
+ sig = sig.clone().set(immutable=True)
|
|
|
+ return self.tasks[0].link(sig)
|
|
|
+
|
|
|
+ def link_error(self, sig):
|
|
|
+ sig = sig.clone().set(immutable=True)
|
|
|
+ return self.tasks[0].link_error(sig)
|
|
|
|
|
|
def _prepared(self, tasks, partial_args, group_id, root_id, app,
|
|
|
CallableSignature=abstract.CallableSignature,
|
|
@@ -883,7 +1042,7 @@ class group(Signature):
|
|
|
for task in tasks:
|
|
|
if isinstance(task, CallableSignature):
|
|
|
# local sigs are always of type Signature, and we
|
|
|
- # clone them to make sure we do not modify the originals.
|
|
|
+ # clone them to make sure we don't modify the originals.
|
|
|
task = task.clone()
|
|
|
else:
|
|
|
# serialized sigs must be converted to Signature.
|
|
@@ -901,12 +1060,16 @@ class group(Signature):
|
|
|
yield task, task.freeze(group_id=group_id, root_id=root_id)
|
|
|
|
|
|
def _apply_tasks(self, tasks, producer=None, app=None, p=None,
|
|
|
- add_to_parent=None, chord=None, **options):
|
|
|
+ add_to_parent=None, chord=None,
|
|
|
+ args=None, kwargs=None, **options):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
app = app or self.app
|
|
|
with app.producer_or_acquire(producer) as producer:
|
|
|
for sig, res in tasks:
|
|
|
sig.apply_async(producer=producer, add_to_parent=False,
|
|
|
chord=sig.options.get('chord') or chord,
|
|
|
+ args=args, kwargs=kwargs,
|
|
|
**options)
|
|
|
|
|
|
# adding callback to result, such that it will gradually
|
|
@@ -932,75 +1095,10 @@ class group(Signature):
|
|
|
for task in self.tasks:
|
|
|
task.set_parent_id(parent_id)
|
|
|
|
|
|
- def apply_async(self, args=(), kwargs=None, add_to_parent=True,
|
|
|
- producer=None, **options):
|
|
|
- app = self.app
|
|
|
- if app.conf.task_always_eager:
|
|
|
- return self.apply(args, kwargs, **options)
|
|
|
- if not self.tasks:
|
|
|
- return self.freeze()
|
|
|
-
|
|
|
- options, group_id, root_id = self._freeze_gid(options)
|
|
|
- tasks = self._prepared(self.tasks, args, group_id, root_id, app)
|
|
|
- p = barrier()
|
|
|
- results = list(self._apply_tasks(tasks, producer, app, p, **options))
|
|
|
- result = self.app.GroupResult(group_id, results, ready_barrier=p)
|
|
|
- p.finalize()
|
|
|
-
|
|
|
- # - Special case of group(A.s() | group(B.s(), C.s()))
|
|
|
- # That is, group with single item that is a chain but the
|
|
|
- # last task in that chain is a group.
|
|
|
- #
|
|
|
- # We cannot actually support arbitrary GroupResults in chains,
|
|
|
- # but this special case we can.
|
|
|
- if len(result) == 1 and isinstance(result[0], GroupResult):
|
|
|
- result = result[0]
|
|
|
-
|
|
|
- parent_task = app.current_worker_task
|
|
|
- if add_to_parent and parent_task:
|
|
|
- parent_task.add_trail(result)
|
|
|
- return result
|
|
|
-
|
|
|
- def apply(self, args=(), kwargs={}, **options):
|
|
|
- app = self.app
|
|
|
- if not self.tasks:
|
|
|
- return self.freeze() # empty group returns GroupResult
|
|
|
- options, group_id, root_id = self._freeze_gid(options)
|
|
|
- tasks = self._prepared(self.tasks, args, group_id, root_id, app)
|
|
|
- return app.GroupResult(group_id, [
|
|
|
- sig.apply(**options) for sig, _ in tasks
|
|
|
- ])
|
|
|
-
|
|
|
- def set_immutable(self, immutable):
|
|
|
- for task in self.tasks:
|
|
|
- task.set_immutable(immutable)
|
|
|
-
|
|
|
- def link(self, sig):
|
|
|
- # Simply link to first task
|
|
|
- sig = sig.clone().set(immutable=True)
|
|
|
- return self.tasks[0].link(sig)
|
|
|
-
|
|
|
- def link_error(self, sig):
|
|
|
- sig = sig.clone().set(immutable=True)
|
|
|
- return self.tasks[0].link_error(sig)
|
|
|
-
|
|
|
- def __call__(self, *partial_args, **options):
|
|
|
- return self.apply_async(partial_args, **options)
|
|
|
-
|
|
|
- def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
|
|
|
- stack = deque(self.tasks)
|
|
|
- while stack:
|
|
|
- task = maybe_signature(stack.popleft(), app=self._app).clone()
|
|
|
- if isinstance(task, group):
|
|
|
- stack.extendleft(task.tasks)
|
|
|
- else:
|
|
|
- new_tasks.append(task)
|
|
|
- yield task.freeze(group_id=group_id,
|
|
|
- chord=chord, root_id=root_id,
|
|
|
- parent_id=parent_id)
|
|
|
-
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
opts = self.options
|
|
|
try:
|
|
|
gid = opts['task_id']
|
|
@@ -1025,11 +1123,19 @@ class group(Signature):
|
|
|
return self.app.GroupResult(gid, results)
|
|
|
_freeze = freeze
|
|
|
|
|
|
- def skew(self, start=1.0, stop=None, step=1.0):
|
|
|
- it = fxrange(start, stop, step, repeatlast=True)
|
|
|
- for task in self.tasks:
|
|
|
- task.set(countdown=next(it))
|
|
|
- return self
|
|
|
+ def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
+ stack = deque(self.tasks)
|
|
|
+ while stack:
|
|
|
+ task = maybe_signature(stack.popleft(), app=self._app).clone()
|
|
|
+ if isinstance(task, group):
|
|
|
+ stack.extendleft(task.tasks)
|
|
|
+ else:
|
|
|
+ new_tasks.append(task)
|
|
|
+ yield task.freeze(group_id=group_id,
|
|
|
+ chord=chord, root_id=root_id,
|
|
|
+ parent_id=parent_id)
|
|
|
|
|
|
def __iter__(self):
|
|
|
return iter(self.tasks)
|
|
@@ -1037,6 +1143,9 @@ class group(Signature):
|
|
|
def __repr__(self):
|
|
|
return 'group({0.tasks!r})'.format(self)
|
|
|
|
|
|
+ def __len__(self):
|
|
|
+ return len(self.tasks)
|
|
|
+
|
|
|
@property
|
|
|
def app(self):
|
|
|
app = self._app
|
|
@@ -1050,7 +1159,7 @@ class group(Signature):
|
|
|
|
|
|
@Signature.register_type
|
|
|
class chord(Signature):
|
|
|
- """Barrier synchronization primitive.
|
|
|
+ r"""Barrier synchronization primitive.
|
|
|
|
|
|
A chord consists of a header and a body.
|
|
|
|
|
@@ -1075,22 +1184,41 @@ class chord(Signature):
|
|
|
>>> res.get()
|
|
|
12
|
|
|
"""
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_dict(cls, d, app=None):
|
|
|
+ args, d['kwargs'] = cls._unpack_args(**d['kwargs'])
|
|
|
+ return _upgrade(d, cls(*args, app=app, **d))
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _unpack_args(header=None, body=None, **kwargs):
|
|
|
+ # Python signatures are better at extracting keys from dicts
|
|
|
+ # than manually popping things off.
|
|
|
+ return (header, body), kwargs
|
|
|
+
|
|
|
def __init__(self, header, body=None, task='celery.chord',
|
|
|
args=(), kwargs={}, app=None, **options):
|
|
|
Signature.__init__(
|
|
|
self, task, args,
|
|
|
- dict(kwargs, header=_maybe_group(header, app),
|
|
|
+ dict(kwargs=kwargs, header=_maybe_group(header, app),
|
|
|
body=maybe_signature(body, app=app)), app=app, **options
|
|
|
)
|
|
|
self.subtask_type = 'chord'
|
|
|
|
|
|
+ def __call__(self, body=None, **options):
|
|
|
+ return self.apply_async((), {'body': body} if body else {}, **options)
|
|
|
+
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
if not isinstance(self.tasks, group):
|
|
|
self.tasks = group(self.tasks, app=self.app)
|
|
|
- bodyres = self.body.freeze(_id, parent_id=self.id, root_id=root_id)
|
|
|
- self.tasks.freeze(
|
|
|
+ header_result = self.tasks.freeze(
|
|
|
parent_id=parent_id, root_id=root_id, chord=self.body)
|
|
|
+ bodyres = self.body.freeze(
|
|
|
+ _id, parent_id=header_result.id, root_id=root_id)
|
|
|
+ bodyres.parent = header_result
|
|
|
self.id = self.tasks.id
|
|
|
self.body.set_parent_id(self.id)
|
|
|
return bodyres
|
|
@@ -1103,40 +1231,14 @@ class chord(Signature):
|
|
|
task.set_parent_id(parent_id)
|
|
|
self.parent_id = parent_id
|
|
|
|
|
|
- @classmethod
|
|
|
- def from_dict(self, d, app=None):
|
|
|
- args, d['kwargs'] = self._unpack_args(**d['kwargs'])
|
|
|
- return _upgrade(d, self(*args, app=app, **d))
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def _unpack_args(header=None, body=None, **kwargs):
|
|
|
- # Python signatures are better at extracting keys from dicts
|
|
|
- # than manually popping things off.
|
|
|
- return (header, body), kwargs
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def app(self):
|
|
|
- return self._get_app(self.body)
|
|
|
-
|
|
|
- def _get_app(self, body=None):
|
|
|
- app = self._app
|
|
|
- if app is None:
|
|
|
- try:
|
|
|
- tasks = self.tasks.tasks # is a group
|
|
|
- except AttributeError:
|
|
|
- tasks = self.tasks
|
|
|
- app = tasks[0]._app
|
|
|
- if app is None and body is not None:
|
|
|
- app = body._app
|
|
|
- return app if app is not None else current_app
|
|
|
-
|
|
|
def apply_async(self, args=(), kwargs={}, task_id=None,
|
|
|
producer=None, connection=None,
|
|
|
router=None, result_cls=None, **options):
|
|
|
+ kwargs = kwargs or {}
|
|
|
args = (tuple(args) + tuple(self.args)
|
|
|
if args and not self.immutable else self.args)
|
|
|
- body = kwargs.get('body') or self.kwargs['body']
|
|
|
- kwargs = dict(self.kwargs, **kwargs)
|
|
|
+ body = kwargs.pop('body', None) or self.kwargs['body']
|
|
|
+ kwargs = dict(self.kwargs['kwargs'], **kwargs)
|
|
|
body = body.clone(**options)
|
|
|
app = self._get_app(body)
|
|
|
tasks = (self.tasks.clone() if isinstance(self.tasks, group)
|
|
@@ -1170,7 +1272,7 @@ class chord(Signature):
|
|
|
countdown=1, max_retries=None, eager=False,
|
|
|
task_id=None, **options):
|
|
|
app = app or self._get_app(body)
|
|
|
- group_id = uuid()
|
|
|
+ group_id = header.options.get('task_id') or uuid()
|
|
|
root_id = body.options.get('root_id')
|
|
|
body.chord_size = self.__length_hint__()
|
|
|
options = dict(self.options, **options) if options else self.options
|
|
@@ -1180,6 +1282,7 @@ class chord(Signature):
|
|
|
|
|
|
results = header.freeze(
|
|
|
group_id=group_id, chord=body, root_id=root_id).results
|
|
|
+ body.set_parent_id(group_id)
|
|
|
bodyres = body.freeze(task_id, root_id=root_id)
|
|
|
|
|
|
parent = app.backend.apply_chord(
|
|
@@ -1190,14 +1293,11 @@ class chord(Signature):
|
|
|
bodyres.parent = parent
|
|
|
return bodyres
|
|
|
|
|
|
- def __call__(self, body=None, **options):
|
|
|
- return self.apply_async((), {'body': body} if body else {}, **options)
|
|
|
-
|
|
|
def clone(self, *args, **kwargs):
|
|
|
s = Signature.clone(self, *args, **kwargs)
|
|
|
# need to make copy of body
|
|
|
try:
|
|
|
- s.kwargs['body'] = s.kwargs['body'].clone()
|
|
|
+ s.kwargs['body'] = maybe_signature(s.kwargs['body'], clone=True)
|
|
|
except (AttributeError, KeyError):
|
|
|
pass
|
|
|
return s
|
|
@@ -1220,12 +1320,28 @@ class chord(Signature):
|
|
|
return self.body.reprcall(self.tasks)
|
|
|
return '<chord without body: {0.tasks!r}>'.format(self)
|
|
|
|
|
|
+ @cached_property
|
|
|
+ def app(self):
|
|
|
+ return self._get_app(self.body)
|
|
|
+
|
|
|
+ def _get_app(self, body=None):
|
|
|
+ app = self._app
|
|
|
+ if app is None:
|
|
|
+ try:
|
|
|
+ tasks = self.tasks.tasks # is a group
|
|
|
+ except AttributeError:
|
|
|
+ tasks = self.tasks
|
|
|
+ app = tasks[0]._app
|
|
|
+ if app is None and body is not None:
|
|
|
+ app = body._app
|
|
|
+ return app if app is not None else current_app
|
|
|
+
|
|
|
tasks = _getitem_property('kwargs.header', 'Tasks in chord header.')
|
|
|
body = _getitem_property('kwargs.body', 'Body task of chord.')
|
|
|
|
|
|
|
|
|
def signature(varies, *args, **kwargs):
|
|
|
- """Create new signature
|
|
|
+ """Create new signature.
|
|
|
|
|
|
- if the first argument is a signature already then it's cloned.
|
|
|
- if the first argument is a dict, then a Signature version is returned.
|
|
@@ -1241,11 +1357,28 @@ def signature(varies, *args, **kwargs):
|
|
|
return Signature(varies, *args, **kwargs)
|
|
|
|
|
|
|
|
|
-def maybe_signature(d, app=None):
|
|
|
+def maybe_signature(d, app=None, clone=False):
|
|
|
+ """Ensure obj is a signature, or None.
|
|
|
+
|
|
|
+ Arguments:
|
|
|
+ d (Optional[Union[abstract.CallableSignature, Mapping]]):
|
|
|
+ Signature or dict-serialized signature.
|
|
|
+ app (celery.Celery):
|
|
|
+ App to bind signature to.
|
|
|
+ clone (bool):
|
|
|
+ If d' is already a signature, the signature
|
|
|
+ will be cloned when this flag is enabled.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[abstract.CallableSignature]
|
|
|
+ """
|
|
|
if d is not None:
|
|
|
- if (isinstance(d, dict) and
|
|
|
- not isinstance(d, abstract.CallableSignature)):
|
|
|
+ if isinstance(d, abstract.CallableSignature):
|
|
|
+ if clone:
|
|
|
+ d = d.clone()
|
|
|
+ elif isinstance(d, dict):
|
|
|
d = signature(d)
|
|
|
+
|
|
|
if app is not None:
|
|
|
d._app = app
|
|
|
- return d
|
|
|
+ return d
|