Browse Source

Disable mediator thread when using eventloop

Ask Solem 13 years ago
parent
commit
7f2b9dad4c
2 changed files with 7 additions and 5 deletions
  1. 5 2
      celery/worker/__init__.py
  2. 2 3
      celery/worker/hub.py

+ 5 - 2
celery/worker/__init__.py

@@ -102,7 +102,7 @@ class Pool(abstract.StartStopComponent):
                             maxtasksperchild=w.max_tasks_per_child,
                             timeout=w.task_time_limit,
                             soft_timeout=w.task_soft_time_limit,
-                            putlocks=w.pool_putlocks,
+                            putlocks=w.pool_putlocks and threaded,
                             lost_worker_timeout=w.worker_lost_wait,
                             with_task_thread=threaded,
                             with_result_thread=threaded,
@@ -143,7 +143,10 @@ class Queues(abstract.Component):
         if w.disable_rate_limits:
             w.ready_queue = FastQueue()
             if w.use_eventloop:
-                w.ready_queue.put = w.process_task_sem
+                if w.pool_putlocks:
+                    w.ready_queue.put = w.process_task_sem
+                else:
+                    w.ready_queue.put = w.process_task
             elif not w.pool_cls.requires_mediator:
                 # just send task directly to pool, skip the mediator.
                 w.ready_queue.put = w.process_task

+ 2 - 3
celery/worker/hub.py

@@ -12,7 +12,7 @@ class BoundedSemaphore(object):
 
     def __init__(self, value=1):
         self.initial_value = self.value = value
-        self._waiting = set()
+        self._waiting = []
 
     def grow(self):
         self.initial_value += 1
@@ -22,7 +22,7 @@ class BoundedSemaphore(object):
 
     def acquire(self, callback, *partial_args, **partial_kwargs):
         if self.value <= 0:
-            self._waiting.add((callback, partial_args))
+            self._waiting.append((callback, partial_args))
             return False
         else:
             self.value = max(self.value - 1, 0)
@@ -35,7 +35,6 @@ class BoundedSemaphore(object):
             waiter, args = self._waiting.pop()
             waiter(*args)
 
-
     def clear(self):
         pass