Bläddra i källkod

Processes pool always require mediator thread so that the pool semaphore never blocks the MainThread

Ask Solem 13 år sedan
förälder
incheckning
3836dd840b

+ 12 - 0
celery/concurrency/base.py

@@ -24,8 +24,20 @@ class BasePool(object):
 
     Timer = timer2.Timer
 
+    #: set to true if the pool can be shutdown from within
+    #: a signal handler.
     signal_safe = True
+
+    #: set to true if pool supports rate limits.
+    #: (this is here for gevent, which currently does not implement
+    #:  the necessary timers).
     rlimit_safe = True
+
+    #: set to true if pool requires the use of a mediator
+    #: thread (e.g. if applying new items can block the current thread).
+    requires_mediator = False
+
+    #: set to true if pool uses greenlets.
     is_green = False
 
     _state = None

+ 3 - 15
celery/concurrency/processes/__init__.py

@@ -46,23 +46,11 @@ def process_initializer(app, hostname):
 
 
 class TaskPool(BasePool):
-    """Process Pool for processing tasks in parallel.
-
-    :param processes: see :attr:`processes`.
-    :param logger: see :attr:`logger`.
-
-
-    .. attribute:: limit
-
-        The number of processes that can run simultaneously.
-
-    .. attribute:: logger
-
-        The logger used for debugging.
-
-    """
+    """Multiprocessing Pool implementation."""
     Pool = Pool
 
+    requires_mediator = True
+
     def on_start(self):
         """Run the task pool.
 

+ 3 - 1
celery/worker/__init__.py

@@ -126,7 +126,9 @@ class Queues(abstract.Component):
             w.disable_rate_limits = True
         if w.disable_rate_limits:
             w.ready_queue = FastQueue()
-            w.ready_queue.put = w.process_task
+            if not w.pool_cls.requires_mediator:
+                # just send task directly to pool, skip the mediator.
+                w.ready_queue.put = w.process_task
         else:
             w.ready_queue = TaskBucket(task_registry=registry.tasks)
 

+ 2 - 1
celery/worker/mediator.py

@@ -35,7 +35,8 @@ class WorkerComponent(StartStopComponent):
 
     def __init__(self, w, **kwargs):
         w.mediator = None
-        self.enabled = not w.disable_rate_limits
+        if w.disable_rate_limits and not w.pool_cls.requires_mediator:
+            self.enabled = False
 
     def create(self, w):
         m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,