瀏覽代碼

Evented mode now uses a custom semaphore not using threads

Ask Solem 13 年之前
父節點
當前提交
f8a54a689f
共有 4 個文件被更改,包括 61 次插入12 次删除
  1. 20 10
      celery/worker/__init__.py
  2. 3 1
      celery/worker/consumer.py
  3. 35 0
      celery/worker/hub.py
  4. 3 1
      celery/worker/mediator.py

+ 20 - 10
celery/worker/__init__.py

@@ -38,6 +38,7 @@ from celery.utils.timer2 import Schedule
 from . import abstract
 from . import state
 from .buckets import TaskBucket, FastQueue
+from .hub import BoundedSemaphore
 
 RUN = 0x1
 CLOSE = 0x2
@@ -92,16 +93,20 @@ class Pool(abstract.StartStopComponent):
         w.use_eventloop = (detect_environment() == "default" and
                            w.app.broker_connection().is_evented)
 
-    def create(self, w):
+    def create(self, w, semaphore=None):
         forking_enable(w.no_execv or not w.force_execv)
-        pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
-                            initargs=(w.app, w.hostname),
-                            maxtasksperchild=w.max_tasks_per_child,
-                            timeout=w.task_time_limit,
-                            soft_timeout=w.task_soft_time_limit,
-                            putlocks=w.pool_putlocks,
-                            lost_worker_timeout=w.worker_lost_wait,
-                            start_result_thread=not w.use_eventloop)
+        procs = w.min_concurrency
+        if w.use_eventloop:
+            semaphore = w.semaphore = BoundedSemaphore(procs)
+        pool = w.pool = self.instantiate(w.pool_cls, procs,
+                initargs=(w.app, w.hostname),
+                maxtasksperchild=w.max_tasks_per_child,
+                timeout=w.task_time_limit,
+                soft_timeout=w.task_soft_time_limit,
+                putlocks=w.pool_putlocks and not w.use_eventloop,
+                lost_worker_timeout=w.worker_lost_wait,
+                start_result_thread=not w.use_eventloop,
+                semaphore=semaphore)
         return pool
 
 
@@ -136,7 +141,9 @@ class Queues(abstract.Component):
             w.disable_rate_limits = True
         if w.disable_rate_limits:
             w.ready_queue = FastQueue()
-            if not w.pool_cls.requires_mediator:
+            if w.use_eventloop:
+                w.ready_queue.put = w.process_task_sem
+            elif not w.pool_cls.requires_mediator:
                 # just send task directly to pool, skip the mediator.
                 w.ready_queue.put = w.process_task
         else:
@@ -268,6 +275,9 @@ class WorkController(configurated):
         # makes sure all greenthreads have exited.
         self._shutdown_complete.wait()
 
+    def process_task_sem(self, req):
+        return self.semaphore.acquire(self.process_task, req)
+
     def process_task(self, req):
         """Process task by sending it to the pool of workers."""
         try:

+ 3 - 1
celery/worker/consumer.py

@@ -338,6 +338,8 @@ class Consumer(object):
 
         self._does_info = logger.isEnabledFor(logging.INFO)
         self.strategies = {}
+        if self.use_eventloop:
+            self.hub = Hub(self.priority_timer)
 
     def update_strategies(self):
         S = self.strategies
@@ -390,7 +392,7 @@ class Consumer(object):
         on_poll_start = self.connection.transport.on_poll_start
 
         qos = self.qos
-        with Hub(self.priority_timer) as hub:
+        with self.hub as hub:
             update = hub.update
             fdmap = hub.fdmap
             poll = hub.poller.poll

+ 35 - 0
celery/worker/hub.py

@@ -1,11 +1,45 @@
 from __future__ import absolute_import
 
+from collections import deque
+
 from kombu.utils import cached_property
 from kombu.utils.eventio import poll, POLL_READ, POLL_ERR
 
 from celery.utils.timer2 import Schedule
 
 
+class BoundedSemaphore(object):
+
+    def __init__(self, value=1):
+        self.initial_value = self.value = value
+        self._waiting = set()
+
+    def grow(self):
+        self.initial_value += 1
+
+    def shrink(self):
+        self.initial_value -= 1
+
+    def acquire(self, callback, *partial_args, **partial_kwargs):
+        if self.value <= 0:
+            self._waiting.add((callback, partial_args))
+            return False
+        else:
+            self.value = max(self.value - 1, 0)
+            callback(*partial_args, **partial_kwargs)
+            return True
+
+    def release(self):
+        self.value = min(self.value + 1, self.initial_value)
+        if self._waiting:
+            waiter, args = self._waiting.pop()
+            waiter(*args)
+
+
+    def clear(self):
+        pass
+
+
 class Hub(object):
     eventflags = POLL_READ | POLL_ERR
 
@@ -13,6 +47,7 @@ class Hub(object):
         self.fdmap = {}
         self.poller = poll()
         self.schedule = Schedule() if schedule is None else schedule
+        self._on_event = set()
 
     def __enter__(self):
         return self

+ 3 - 1
celery/worker/mediator.py

@@ -39,7 +39,9 @@ class WorkerComponent(StartStopComponent):
         w.mediator = None
 
     def include_if(self, w):
-        return not w.disable_rate_limits or w.pool_cls.requires_mediator
+        return (not w.disable_rate_limits or
+                w.pool_cls.requires_mediator and
+                not w.use_eventloop)
 
     def create(self, w):
         m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,