|
@@ -28,8 +28,10 @@ from celery.result import GroupResult
|
|
|
from celery.utils import abstract
|
|
|
from celery.utils.functional import (
|
|
|
maybe_list, is_list, _regen, regen, chunks as _chunks,
|
|
|
+ seq_concat_seq, seq_concat_item,
|
|
|
)
|
|
|
-from celery.utils.text import truncate
|
|
|
+from celery.utils.objects import getitem_property
|
|
|
+from celery.utils.text import truncate, remove_repeating_from_task
|
|
|
|
|
|
__all__ = [
|
|
|
'Signature', 'chain', 'xmap', 'xstarmap', 'chunks',
|
|
@@ -42,90 +44,6 @@ PY3 = sys.version_info[0] == 3
|
|
|
JSON_NEEDS_UNICODE_KEYS = PY3 and not try_import('simplejson')
|
|
|
|
|
|
|
|
|
-def _shorten_names(task_name, s):
|
|
|
- # type: (str, str) -> str
|
|
|
- """Remove repeating module names from string.
|
|
|
-
|
|
|
- Arguments:
|
|
|
- task_name (str): Task name (full path including module),
|
|
|
- to use as the basis for removing module names.
|
|
|
- s (str): The string we want to work on.
|
|
|
-
|
|
|
- Example:
|
|
|
-
|
|
|
- >>> _shorten_names(
|
|
|
- ... 'x.tasks.add',
|
|
|
- ... 'x.tasks.add(2, 2) | x.tasks.add(4) | x.tasks.mul(8)',
|
|
|
- ... )
|
|
|
- 'x.tasks.add(2, 2) | add(4) | mul(8)'
|
|
|
- """
|
|
|
- # This is used by repr(), to remove repeating module names.
|
|
|
-
|
|
|
- # extract the module part of the task name
|
|
|
- module = str(task_name).rpartition('.')[0] + '.'
|
|
|
- # find the first occurance of the module name in the string.
|
|
|
- index = s.find(module)
|
|
|
- if index >= 0:
|
|
|
- s = ''.join([
|
|
|
- # leave the first occurance of the module name untouched.
|
|
|
- s[:index + len(module)],
|
|
|
- # strip seen module name from the rest of the string.
|
|
|
- s[index + len(module):].replace(module, ''),
|
|
|
- ])
|
|
|
- return s
|
|
|
-
|
|
|
-
|
|
|
-class _getitem_property(object):
|
|
|
- """Attribute -> dict key descriptor.
|
|
|
-
|
|
|
- The target object must support ``__getitem__``,
|
|
|
- and optionally ``__setitem__``.
|
|
|
-
|
|
|
- Example:
|
|
|
- >>> from collections import defaultdict
|
|
|
-
|
|
|
- >>> class Me(dict):
|
|
|
- ... deep = defaultdict(dict)
|
|
|
- ...
|
|
|
- ... foo = _getitem_property('foo')
|
|
|
- ... deep_thing = _getitem_property('deep.thing')
|
|
|
-
|
|
|
-
|
|
|
- >>> me = Me()
|
|
|
- >>> me.foo
|
|
|
- None
|
|
|
-
|
|
|
- >>> me.foo = 10
|
|
|
- >>> me.foo
|
|
|
- 10
|
|
|
- >>> me['foo']
|
|
|
- 10
|
|
|
-
|
|
|
- >>> me.deep_thing = 42
|
|
|
- >>> me.deep_thing
|
|
|
- 42
|
|
|
- >>> me.deep
|
|
|
- defaultdict(<type 'dict'>, {'thing': 42})
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, keypath, doc=None):
|
|
|
- path, _, self.key = keypath.rpartition('.')
|
|
|
- self.path = path.split('.') if path else None
|
|
|
- self.__doc__ = doc
|
|
|
-
|
|
|
- def _path(self, obj):
|
|
|
- return (reduce(lambda d, k: d[k], [obj] + self.path) if self.path
|
|
|
- else obj)
|
|
|
-
|
|
|
- def __get__(self, obj, type=None):
|
|
|
- if obj is None:
|
|
|
- return type
|
|
|
- return self._path(obj).get(self.key)
|
|
|
-
|
|
|
- def __set__(self, obj, value):
|
|
|
- self._path(obj)[self.key] = value
|
|
|
-
|
|
|
-
|
|
|
def maybe_unroll_group(g):
|
|
|
"""Unroll group with only one member."""
|
|
|
# Issue #1656
|
|
@@ -152,34 +70,6 @@ def _upgrade(fields, sig):
|
|
|
return sig
|
|
|
|
|
|
|
|
|
-def _seq_concat_item(seq, item):
|
|
|
- """Return copy of sequence seq with item added.
|
|
|
-
|
|
|
- Returns:
|
|
|
- Sequence: if seq is a tuple, the result will be a tuple,
|
|
|
- otherwise it depends on the implementation of ``__add__``.
|
|
|
- """
|
|
|
- return seq + (item,) if isinstance(seq, tuple) else seq + [item]
|
|
|
-
|
|
|
-
|
|
|
-def _seq_concat_seq(a, b):
|
|
|
- """Concatenate two sequences: ``a + b``.
|
|
|
-
|
|
|
- Returns:
|
|
|
- Sequence: The return value will depend on the largest sequence
|
|
|
- - if b is larger and is a tuple, the return value will be a tuple.
|
|
|
- - if a is larger and is a list, the return value will be a list,
|
|
|
- """
|
|
|
- # find the type of the largest sequence
|
|
|
- prefer = type(max([a, b], key=len))
|
|
|
- # convert the smallest list to the type of the largest sequence.
|
|
|
- if not isinstance(a, prefer):
|
|
|
- a = prefer(a)
|
|
|
- if not isinstance(b, prefer):
|
|
|
- b = prefer(b)
|
|
|
- return a + b
|
|
|
-
|
|
|
-
|
|
|
@abstract.CallableSignature.register
|
|
|
@python_2_unicode_compatible
|
|
|
class Signature(dict):
|
|
@@ -512,7 +402,7 @@ class Signature(dict):
|
|
|
if not isinstance(self, _chain) and isinstance(other, _chain):
|
|
|
# task | chain -> chain
|
|
|
return _chain(
|
|
|
- _seq_concat_seq((self,), other.tasks), app=self._app)
|
|
|
+ seq_concat_seq((self,), other.tasks), app=self._app)
|
|
|
elif isinstance(other, _chain):
|
|
|
# chain | chain -> chain
|
|
|
sig = self.clone()
|
|
@@ -545,7 +435,7 @@ class Signature(dict):
|
|
|
else:
|
|
|
# chain | task -> chain
|
|
|
return _chain(
|
|
|
- _seq_concat_item(self.tasks, other), app=self._app)
|
|
|
+ seq_concat_item(self.tasks, other), app=self._app)
|
|
|
# task | task -> chain
|
|
|
return _chain(self, other, app=self._app)
|
|
|
return NotImplemented
|
|
@@ -614,24 +504,24 @@ class Signature(dict):
|
|
|
return self.type.apply_async
|
|
|
except KeyError:
|
|
|
return _partial(self.app.send_task, self['task'])
|
|
|
- id = _getitem_property('options.task_id', 'Task UUID')
|
|
|
- parent_id = _getitem_property('options.parent_id', 'Task parent UUID.')
|
|
|
- root_id = _getitem_property('options.root_id', 'Task root UUID.')
|
|
|
- task = _getitem_property('task', 'Name of task.')
|
|
|
- args = _getitem_property('args', 'Positional arguments to task.')
|
|
|
- kwargs = _getitem_property('kwargs', 'Keyword arguments to task.')
|
|
|
- options = _getitem_property('options', 'Task execution options.')
|
|
|
- subtask_type = _getitem_property('subtask_type', 'Type of signature')
|
|
|
- chord_size = _getitem_property(
|
|
|
+ id = getitem_property('options.task_id', 'Task UUID')
|
|
|
+ parent_id = getitem_property('options.parent_id', 'Task parent UUID.')
|
|
|
+ root_id = getitem_property('options.root_id', 'Task root UUID.')
|
|
|
+ task = getitem_property('task', 'Name of task.')
|
|
|
+ args = getitem_property('args', 'Positional arguments to task.')
|
|
|
+ kwargs = getitem_property('kwargs', 'Keyword arguments to task.')
|
|
|
+ options = getitem_property('options', 'Task execution options.')
|
|
|
+ subtask_type = getitem_property('subtask_type', 'Type of signature')
|
|
|
+ chord_size = getitem_property(
|
|
|
'chord_size', 'Size of chord (if applicable)')
|
|
|
- immutable = _getitem_property(
|
|
|
+ immutable = getitem_property(
|
|
|
'immutable', 'Flag set if no longer accepts new arguments')
|
|
|
|
|
|
|
|
|
@Signature.register_type(name='chain')
|
|
|
@python_2_unicode_compatible
|
|
|
class _chain(Signature):
|
|
|
- tasks = _getitem_property('kwargs.tasks', 'Tasks in chain.')
|
|
|
+ tasks = getitem_property('kwargs.tasks', 'Tasks in chain.')
|
|
|
|
|
|
@classmethod
|
|
|
def from_dict(cls, d, app=None):
|
|
@@ -834,7 +724,7 @@ class _chain(Signature):
|
|
|
if not self.tasks:
|
|
|
return '<{0}@{1:#x}: empty>'.format(
|
|
|
type(self).__name__, id(self))
|
|
|
- return _shorten_names(
|
|
|
+ return remove_repeating_from_task(
|
|
|
self.tasks[0]['task'],
|
|
|
' | '.join(repr(t) for t in self.tasks))
|
|
|
|
|
@@ -1041,7 +931,7 @@ class group(Signature):
|
|
|
that can be used to inspect the state of the group).
|
|
|
"""
|
|
|
|
|
|
- tasks = _getitem_property('kwargs.tasks', 'Tasks in group.')
|
|
|
+ tasks = getitem_property('kwargs.tasks', 'Tasks in group.')
|
|
|
|
|
|
@classmethod
|
|
|
def from_dict(cls, d, app=None):
|
|
@@ -1231,7 +1121,7 @@ class group(Signature):
|
|
|
|
|
|
def __repr__(self):
|
|
|
if self.tasks:
|
|
|
- return _shorten_names(
|
|
|
+ return remove_repeating_from_task(
|
|
|
self.tasks[0]['task'],
|
|
|
'group({0.tasks!r})'.format(self))
|
|
|
return 'group(<empty>)'
|
|
@@ -1419,14 +1309,14 @@ class chord(Signature):
|
|
|
def __repr__(self):
|
|
|
if self.body:
|
|
|
if isinstance(self.body, chain):
|
|
|
- return _shorten_names(
|
|
|
+ return remove_repeating_from_task(
|
|
|
self.body.tasks[0]['task'],
|
|
|
'%({0} | {1!r})'.format(
|
|
|
self.body.tasks[0].reprcall(self.tasks),
|
|
|
chain(self.body.tasks[1:], app=self._app),
|
|
|
),
|
|
|
)
|
|
|
- return '%' + _shorten_names(
|
|
|
+ return '%' + remove_repeating_from_task(
|
|
|
self.body['task'], self.body.reprcall(self.tasks))
|
|
|
return '<chord without body: {0.tasks!r}>'.format(self)
|
|
|
|
|
@@ -1446,8 +1336,8 @@ class chord(Signature):
|
|
|
app = body._app
|
|
|
return app if app is not None else current_app
|
|
|
|
|
|
- tasks = _getitem_property('kwargs.header', 'Tasks in chord header.')
|
|
|
- body = _getitem_property('kwargs.body', 'Body task of chord.')
|
|
|
+ tasks = getitem_property('kwargs.header', 'Tasks in chord header.')
|
|
|
+ body = getitem_property('kwargs.body', 'Body task of chord.')
|
|
|
|
|
|
|
|
|
def signature(varies, *args, **kwargs):
|