|
@@ -197,10 +197,9 @@ class Signature(dict):
|
|
|
type=None, subtask_type=None, immutable=False,
|
|
|
app=None, **ex):
|
|
|
self._app = app
|
|
|
- init = dict.__init__
|
|
|
|
|
|
if isinstance(task, dict):
|
|
|
- init(self, task) # works like dict(d)
|
|
|
+ super(Signature, self).__init__(task) # works like dict(d)
|
|
|
else:
|
|
|
# Also supports using task class/instance instead of string name.
|
|
|
try:
|
|
@@ -210,8 +209,7 @@ class Signature(dict):
|
|
|
else:
|
|
|
self._type = task
|
|
|
|
|
|
- init(
|
|
|
- self,
|
|
|
+ super(Signature, self).__init__(
|
|
|
task=task_name, args=tuple(args or ()),
|
|
|
kwargs=kwargs or {},
|
|
|
options=dict(options or {}, **ex),
|
|
@@ -264,6 +262,8 @@ class Signature(dict):
|
|
|
args, kwargs, options = self._merge(args, kwargs, options)
|
|
|
else:
|
|
|
args, kwargs, options = self.args, self.kwargs, self.options
|
|
|
+ # pylint: disable=too-many-function-args
|
|
|
+ # Borks on this, as it's a property
|
|
|
return _apply(args, kwargs, **options)
|
|
|
|
|
|
def _merge(self, args=(), kwargs={}, options={}, force=False):
|
|
@@ -308,6 +308,8 @@ class Signature(dict):
|
|
|
Returns:
|
|
|
~@AsyncResult: promise of future evaluation.
|
|
|
"""
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
opts = self.options
|
|
|
try:
|
|
|
tid = opts['task_id']
|
|
@@ -323,6 +325,8 @@ class Signature(dict):
|
|
|
opts['group_id'] = group_id
|
|
|
if chord:
|
|
|
opts['chord'] = chord
|
|
|
+ # pylint: disable=too-many-function-args
|
|
|
+ # Borks on this, as it's a property.
|
|
|
return self.AsyncResult(tid)
|
|
|
_freeze = freeze
|
|
|
|
|
@@ -595,6 +599,8 @@ class chain(Signature):
|
|
|
def run(self, args=(), kwargs={}, group_id=None, chord=None,
|
|
|
task_id=None, link=None, link_error=None, publisher=None,
|
|
|
producer=None, root_id=None, parent_id=None, app=None, **options):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
app = app or self.app
|
|
|
use_link = self._use_link
|
|
|
if use_link is None and app.conf.task_protocol == 1:
|
|
@@ -620,6 +626,8 @@ class chain(Signature):
|
|
|
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
_, results = self._frozen = self.prepare_steps(
|
|
|
self.args, self.tasks, root_id, parent_id, None,
|
|
|
self.app, _id, group_id, chord, clone=False,
|
|
@@ -947,6 +955,8 @@ class group(Signature):
|
|
|
|
|
|
def _apply_tasks(self, tasks, producer=None, app=None, p=None,
|
|
|
add_to_parent=None, chord=None, **options):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
app = app or self.app
|
|
|
with app.producer_or_acquire(producer) as producer:
|
|
|
for sig, res in tasks:
|
|
@@ -1033,6 +1043,8 @@ class group(Signature):
|
|
|
return self.apply_async(partial_args, **options)
|
|
|
|
|
|
def _freeze_unroll(self, new_tasks, group_id, chord, root_id, parent_id):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
stack = deque(self.tasks)
|
|
|
while stack:
|
|
|
task = maybe_signature(stack.popleft(), app=self._app).clone()
|
|
@@ -1046,6 +1058,8 @@ class group(Signature):
|
|
|
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
opts = self.options
|
|
|
try:
|
|
|
gid = opts['task_id']
|
|
@@ -1133,6 +1147,8 @@ class chord(Signature):
|
|
|
|
|
|
def freeze(self, _id=None, group_id=None, chord=None,
|
|
|
root_id=None, parent_id=None):
|
|
|
+ # pylint: disable=redefined-outer-name
|
|
|
+ # XXX chord is also a class in outer scope.
|
|
|
if not isinstance(self.tasks, group):
|
|
|
self.tasks = group(self.tasks, app=self.app)
|
|
|
bodyres = self.body.freeze(_id, parent_id=self.id, root_id=root_id)
|