浏览代码

Support passing execute options to a TaskSets list of args.

Ask Solem 15 年之前
父节点
当前提交
c4151cd11e
共有 3 个文件被更改,包括 10 次插入5 次删除
  1. 7 3
      celery/task/base.py
  2. 1 0
      celery/worker/__init__.py
  3. 2 2
      celery/worker/listener.py

+ 7 - 3
celery/task/base.py

@@ -532,14 +532,18 @@ class TaskSet(object):
         conn = self.task.establish_connection(connect_timeout=connect_timeout)
         publisher = self.task.get_publisher(connection=conn)
         try:
-            subtasks = [apply_async(self.task, args, kwargs,
-                                    taskset_id=taskset_id, publisher=publisher)
-                            for args, kwargs in self.arguments]
+            subtasks = [self.apply_part(self.arguments, taskset_id, publisher)
+                            for arglist in self.arguments]
         finally:
             publisher.close()
             conn.close()
         return TaskSetResult(taskset_id, subtasks)
 
+    def apply_part(self, arglist, taskset_id, publisher):
+        args, kwargs, opts = mexpand(arglist, 4)
+        return apply_async(self.task, args, kwargs,
+                           taskset_id=taskset_id, publisher=publisher, **opts)
+
     def apply(self):
         taskset_id = gen_unique_id()
         subtasks = [apply(self.task, args, kwargs)

+ 1 - 0
celery/worker/__init__.py

@@ -13,6 +13,7 @@ from celery import platform
 from celery import signals
 from celery.log import setup_logger
 from celery.beat import ClockServiceThread
+
 from celery.worker.pool import TaskPool
 from celery.worker.buckets import TaskBucket
 from celery.worker.listener import CarrotListener

+ 2 - 2
celery/worker/listener.py

@@ -63,7 +63,7 @@ class CarrotListener(object):
         """Start the consumer.
 
         If the connection is lost, it tries to re-establish the connection
-        over time and restart consuming messages.
+        and restarts consuming messages.
 
         """
 
@@ -87,7 +87,7 @@ class CarrotListener(object):
 
         prev_pcount = None
         while 1:
-            pcount = int(self.prefetch_count) # Convert SharedCounter to int
+            pcount = int(self.prefetch_count) # SharedCounter() -> int()
             if not prev_pcount or pcount != prev_pcount:
                 task_consumer.qos(prefetch_count=pcount)
                 prev_pcount = pcount