|
@@ -144,11 +144,15 @@ class TaskSet(UserList):
|
|
|
self.total = len(self.tasks)
|
|
|
self.Publisher = Publisher or self.app.amqp.TaskPublisher
|
|
|
|
|
|
- def apply_async(self, connection=None, connect_timeout=None):
|
|
|
+ def apply_async(self, connection=None, connect_timeout=None,
|
|
|
+ publisher=None):
|
|
|
return self.app.with_default_connection(self._apply_async)(
|
|
|
- connection=connection, connect_timeout=connect_timeout)
|
|
|
+ connection=connection,
|
|
|
+ connect_timeout=connect_timeout,
|
|
|
+ publisher=publisher)
|
|
|
|
|
|
- def _apply_async(self, connection=None, connect_timeout=None):
|
|
|
+ def _apply_async(self, connection=None, connect_timeout=None,
|
|
|
+ publisher=None):
|
|
|
"""Run all tasks in the taskset.
|
|
|
|
|
|
Returns a :class:`celery.result.TaskSetResult` instance.
|
|
@@ -182,13 +186,14 @@ class TaskSet(UserList):
|
|
|
return self.apply()
|
|
|
|
|
|
taskset_id = gen_unique_id()
|
|
|
- publisher = self.Publisher(connection=connection)
|
|
|
+ pub = publisher or self.Publisher(connection=connection)
|
|
|
try:
|
|
|
results = [task.apply_async(taskset_id=taskset_id,
|
|
|
- publisher=publisher)
|
|
|
+ publisher=pub)
|
|
|
for task in self.tasks]
|
|
|
finally:
|
|
|
- publisher.close()
|
|
|
+ if not publisher: # created by us.
|
|
|
+ pub.close()
|
|
|
|
|
|
return self.app.TaskSetResult(taskset_id, results)
|
|
|
|