Browse Source

Added TaskSet.Publisher: Can be used to override the publisher class used to send subtasks

Ask Solem 14 years ago
parent
commit
0f40a2b753
1 changed files with 3 additions and 2 deletions
  1. 3 2
      celery/task/sets.py

+ 3 - 2
celery/task/sets.py

@@ -125,7 +125,7 @@ class TaskSet(UserList):
     _task = None                # compat
     _task_name = None           # compat
 
-    def __init__(self, task=None, tasks=None, app=None):
+    def __init__(self, task=None, tasks=None, app=None, Publisher=None):
         if task is not None:
             if hasattr(task, "__iter__"):
                 tasks = task
@@ -143,6 +143,7 @@ class TaskSet(UserList):
         self.app = app_or_default(app)
         self.data = list(tasks)
         self.total = len(self.tasks)
+        self.Publisher = Publisher or self.app.amqp.TaskPublisher
 
     def apply_async(self, connection=None, connect_timeout=None):
         return self.app.with_default_connection(self._apply_async)(
@@ -182,7 +183,7 @@ class TaskSet(UserList):
             return self.apply()
 
         taskset_id = gen_unique_id()
-        publisher = self.app.amqp.TaskPublisher(connection=connection)
+        publisher = self.Publisher(connection=connection)
         try:
             results = [task.apply_async(taskset_id=taskset_id,
                                         publisher=publisher)