Bläddra i källkod

eventloop: AIO versiosn of multiprocessing Supervisor, and disable TaskHandler thread

Ask Solem 13 år sedan
förälder
incheckning
8092efe8f7

+ 10 - 0
celery/concurrency/base.py

@@ -46,6 +46,12 @@ class BasePool(object):
     _state = None
     _pool = None
 
+    #: only used by multiprocessing pool
+    on_process_started = None
+
+    #: only used by multiprocessing pool
+    on_process_down = None
+
     def __init__(self, limit=None, putlocks=True, **options):
         self.limit = limit
         self.putlocks = putlocks
@@ -124,3 +130,7 @@ class BasePool(object):
     @property
     def eventmap(self):
         return {}
+
+    @property
+    def timers(self):
+        return {}

+ 23 - 0
celery/concurrency/processes/__init__.py

@@ -104,6 +104,25 @@ class TaskPool(BasePool):
                 "put-guarded-by-semaphore": self.putlocks,
                 "timeouts": (self._pool.soft_timeout, self._pool.timeout)}
 
+    def set_on_process_started(self, callback):
+        self._pool.on_process_created
+
+
+    def _get_on_process_started(self):
+        return self._pool.on_process_started
+
+    def _set_on_process_started(self, fun):
+        self._pool.on_process_started = fun
+    on_process_started = property(_get_on_process_started,
+                                  _set_on_process_started)
+    def _get_on_process_down(self):
+        return self._pool.on_process_down
+
+    def _set_on_process_down(self, fun):
+        self._pool.on_process_down = fun
+    on_process_down = property(_get_on_process_down,
+                               _set_on_process_down)
+
     @property
     def num_processes(self):
         return self._pool._processes
@@ -111,3 +130,7 @@ class TaskPool(BasePool):
     @property
     def eventmap(self):
         return self._pool.eventmap
+
+    @property
+    def timers(self):
+        return self._pool.timers

+ 5 - 2
celery/events/__init__.py

@@ -35,7 +35,7 @@ event_exchange = Exchange("celeryev", type="topic")
 
 def get_exchange(conn):
     ex = copy(event_exchange)
-    if "redis" in conn.transport_cls:
+    if "redis" in type(conn.transport).__module__:
         # quick hack for #436
         ex.type = "fanout"
     return ex
@@ -103,7 +103,10 @@ class EventDispatcher(object):
         self.close()
 
     def get_exchange(self):
-        return get_exchange(self.connection)
+        if self.connection:
+            return get_exchange(self.connection)
+        else:
+            return get_exchange(self.channel.connection.client)
 
     def enable(self):
         self.publisher = Producer(self.channel or self.connection,

+ 4 - 1
celery/worker/__init__.py

@@ -91,6 +91,7 @@ class Pool(abstract.StartStopComponent):
             w.max_concurrency, w.min_concurrency = w.autoscale
 
     def create(self, w):
+        threaded = not w.use_eventloop
         forking_enable(w.no_execv or not w.force_execv)
         pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
                             initargs=(w.app, w.hostname),
@@ -99,7 +100,9 @@ class Pool(abstract.StartStopComponent):
                             soft_timeout=w.task_soft_time_limit,
                             putlocks=w.pool_putlocks,
                             lost_worker_timeout=w.worker_lost_wait,
-                            start_result_thread=not w.use_eventloop)
+                            with_task_thread=threaded,
+                            with_result_thread=threaded,
+                            with_supervisor_thread=threaded)
         return pool
 
 

+ 13 - 0
celery/worker/consumer.py

@@ -372,7 +372,20 @@ class Consumer(object):
             transport = self.connection.transport
             on_poll_start = transport.on_poll_start
 
+            self.task_consumer.callbacks.append(fire_timers)
+
             update_fds(self.connection.eventmap, self.pool.eventmap)
+            for handler, interval in self.pool.timers.iteritems():
+                self.timer.apply_interval(interval * 1000.0, handler)
+
+            def on_process_started(w):
+                hub.add(w._popen.sentinel, self.pool._pool.maintain_pool)
+            self.pool.on_process_started = on_process_started
+
+            def on_process_down(w):
+                hub.remove(w._popen.sentinel)
+            self.pool.on_process_down = on_process_down
+
             transport.on_poll_init(hub.poller)
 
             while self._state != CLOSE and self.connection:

+ 8 - 6
celery/worker/hub.py

@@ -20,12 +20,14 @@ class Hub(object):
     def __exit__(self, *exc_info):
         return self.close()
 
-    def fire_timers(self, min_delay=10, max_delay=10):
-        while 1:
-            delay, entry = self.scheduler.next()
-            if entry is None:
-                break
-            self.timer.apply_entry(entry)
+    def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
+        delay = None
+        if self.timer._queue:
+            for i in xrange(max_timers):
+                delay, entry = self.scheduler.next()
+                if entry is None:
+                    break
+                self.timer.apply_entry(entry)
         return min(max(delay, min_delay), max_delay)
 
     def add(self, fd, callback, flags=None):