|
@@ -7,7 +7,7 @@ from billiard.serialization import pickle
|
|
|
|
|
|
from celery import conf
|
|
|
from celery.log import setup_logger
|
|
|
-from celery.utils import gen_unique_id, get_full_cls_name
|
|
|
+from celery.utils import gen_unique_id, get_full_cls_name, mexpand
|
|
|
from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
|
|
|
from celery.execute import apply_async, apply
|
|
|
from celery.registry import tasks
|
|
@@ -532,7 +532,7 @@ class TaskSet(object):
|
|
|
conn = self.task.establish_connection(connect_timeout=connect_timeout)
|
|
|
publisher = self.task.get_publisher(connection=conn)
|
|
|
try:
|
|
|
- subtasks = [self.apply_part(self.arguments, taskset_id, publisher)
|
|
|
+ subtasks = [self.apply_part(arglist, taskset_id, publisher)
|
|
|
for arglist in self.arguments]
|
|
|
finally:
|
|
|
publisher.close()
|
|
@@ -540,7 +540,7 @@ class TaskSet(object):
|
|
|
return TaskSetResult(taskset_id, subtasks)
|
|
|
|
|
|
def apply_part(self, arglist, taskset_id, publisher):
|
|
|
- args, kwargs, opts = mexpand(arglist, 4)
|
|
|
+ args, kwargs, opts = mexpand(arglist, 3, default={})
|
|
|
return apply_async(self.task, args, kwargs,
|
|
|
taskset_id=taskset_id, publisher=publisher, **opts)
|
|
|
|