|
@@ -271,14 +271,22 @@ class Signature(dict):
|
|
|
args, kwargs, options = self.args, self.kwargs, self.options
|
|
|
return _apply(args, kwargs, **options)
|
|
|
|
|
|
- def append_to_list_option(self, key, value):
|
|
|
+ def _with_list_option(self, key):
|
|
|
items = self.options.setdefault(key, [])
|
|
|
if not isinstance(items, MutableSequence):
|
|
|
items = self.options[key] = [items]
|
|
|
+ return items
|
|
|
+
|
|
|
+ def append_to_list_option(self, key, value):
|
|
|
+ items = self._with_list_option(key)
|
|
|
if value not in items:
|
|
|
items.append(value)
|
|
|
return value
|
|
|
|
|
|
+ def extend_list_option(self, key, value):
|
|
|
+ items = self._with_list_option(key)
|
|
|
+ items.extend(maybe_list(value))
|
|
|
+
|
|
|
def link(self, callback):
|
|
|
return self.append_to_list_option('link', callback)
|
|
|
|
|
@@ -418,6 +426,8 @@ class chain(Signature):
|
|
|
producer=None, root_id=None, parent_id=None, app=None, **options):
|
|
|
app = app or self.app
|
|
|
use_link = self._use_link
|
|
|
+ if use_link is None and app.conf.task_protocol == 1:
|
|
|
+ use_link = True
|
|
|
args = (tuple(args) + tuple(self.args)
|
|
|
if args and not self.immutable else self.args)
|
|
|
|
|
@@ -431,7 +441,7 @@ class chain(Signature):
|
|
|
|
|
|
if results:
|
|
|
if link:
|
|
|
- tasks[0].set(link=link)
|
|
|
+ tasks[0].extend_list_option('link', link)
|
|
|
first_task = tasks.pop()
|
|
|
first_task.apply_async(
|
|
|
chain=tasks if not use_link else None, **options)
|
|
@@ -456,8 +466,8 @@ class chain(Signature):
|
|
|
# (why is pickle using recursion? or better yet why cannot python
|
|
|
# do tail call optimization making recursion actually useful?)
|
|
|
use_link = self._use_link
|
|
|
- if use_link is None and app.conf.task_protocol > 1:
|
|
|
- use_link = False
|
|
|
+ if use_link is None and app.conf.task_protocol == 1:
|
|
|
+ use_link = True
|
|
|
steps = deque(tasks)
|
|
|
|
|
|
steps_pop = steps.pop
|