|
@@ -590,17 +590,6 @@ class Task(object):
|
|
|
raise ret
|
|
|
return ret
|
|
|
|
|
|
- def replace(self, sig):
|
|
|
- request = self.request
|
|
|
- sig.set_immutable(True)
|
|
|
- chord_id, request.chord = request.chord, None
|
|
|
- group_id, request.group = request.group, None
|
|
|
- callbacks, request.callbacks = request.callbacks, [sig]
|
|
|
- if group_id or chord_id:
|
|
|
- sig.set(group=group_id, chord=chord_id)
|
|
|
- sig |= callbacks[0]
|
|
|
- return sig
|
|
|
-
|
|
|
def apply(self, args=None, kwargs=None,
|
|
|
link=None, link_error=None, **options):
|
|
|
"""Execute this task locally, by blocking until the task returns.
|
|
@@ -697,7 +686,28 @@ class Task(object):
|
|
|
with self.app.events.default_dispatcher(hostname=req.hostname) as d:
|
|
|
return d.send(type_, uuid=req.id, **fields)
|
|
|
|
|
|
+ def replace(self, sig):
|
|
|
+ request = self.request
|
|
|
+ sig.set_immutable(True)
|
|
|
+ chord_id, request.chord = request.chord, None
|
|
|
+ group_id, request.group = request.group, None
|
|
|
+ callbacks, request.callbacks = request.callbacks, [sig]
|
|
|
+ if group_id or chord_id:
|
|
|
+ sig.set(group=group_id, chord=chord_id)
|
|
|
+ sig |= callbacks[0]
|
|
|
+ return sig
|
|
|
+
|
|
|
def replace_in_chord(self, sig):
|
|
|
+ """Replace the current task (which must be a member of a chord)
|
|
|
+ with a new task.
|
|
|
+
|
|
|
+ Note that this will raise :exc:`~@Ignore`, so the best practice
|
|
|
+ is to always use ``return self.replace_in_chord(...)`` to convey
|
|
|
+ to the reader that the task will not continue after being replaced.
|
|
|
+
|
|
|
+ :param: Signature of new task.
|
|
|
+
|
|
|
+ """
|
|
|
sig.freeze(self.request.id,
|
|
|
group_id=self.request.group,
|
|
|
chord=self.request.chord,
|