|
@@ -164,19 +164,24 @@ class TaskSet(UserList):
|
|
|
setid = taskset_id or gen_unique_id()
|
|
|
pub = publisher or self.Publisher(connection=connection)
|
|
|
try:
|
|
|
- results = [task.apply_async(taskset_id=setid, publisher=pub)
|
|
|
- for task in self.tasks]
|
|
|
+ results = self._async_results(setid, pub)
|
|
|
finally:
|
|
|
if not publisher: # created by us.
|
|
|
pub.close()
|
|
|
|
|
|
return self.app.TaskSetResult(setid, results)
|
|
|
|
|
|
+ def _async_results(self, taskset_id, publisher):
|
|
|
+ return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
|
|
|
+ for task in self.tasks]
|
|
|
+
|
|
|
def apply(self, taskset_id=None):
|
|
|
"""Applies the taskset locally by blocking until all tasks return."""
|
|
|
setid = taskset_id or gen_unique_id()
|
|
|
- return self.app.TaskSetResult(setid, [task.apply(taskset_id=setid)
|
|
|
- for task in self.tasks])
|
|
|
+ return self.app.TaskSetResult(setid, self._sync_results(setid))
|
|
|
+
|
|
|
+ def _sync_results(self, taskset_id):
|
|
|
+ return [task.apply(taskset_id=taskset_id) for task in self.tasks]
|
|
|
|
|
|
@property
|
|
|
def tasks(self):
|