Преглед на файлове

Eventloop now runs without Pool TimeoutHandler thread

Ask Solem преди 13 години
родител
ревизия
312d95033e
променени са 4 файла, в които са добавени 56 реда и са изтрити 33 реда
  1. 9 6
      celery/concurrency/base.py
  2. 10 15
      celery/concurrency/processes/__init__.py
  3. 1 0
      celery/worker/__init__.py
  4. 36 12
      celery/worker/consumer.py

+ 9 - 6
celery/concurrency/base.py

@@ -46,12 +46,6 @@ class BasePool(object):
     _state = None
     _pool = None
 
-    #: only used by multiprocessing pool
-    on_process_up = None
-
-    #: only used by multiprocessing pool
-    on_process_down = None
-
     #: only used by multiprocessing pool
     uses_semaphore = False
 
@@ -76,6 +70,12 @@ class BasePool(object):
     def on_terminate(self):
         pass
 
+    def on_soft_timeout(self, job):
+        pass
+
+    def on_hard_timeout(self, job):
+        pass
+
     def terminate_job(self, pid):
         raise NotImplementedError(
                 "%s does not implement kill_job" % (self.__class__, ))
@@ -103,6 +103,9 @@ class BasePool(object):
     def on_close(self):
         pass
 
+    def init_callbacks(self, **kwargs):
+        pass
+
     def apply_async(self, target, args=[], kwargs={}, **options):
         """Equivalent of the :func:`apply` built-in function.
 

+ 10 - 15
celery/concurrency/processes/__init__.py

@@ -108,24 +108,19 @@ class TaskPool(BasePool):
                 "put-guarded-by-semaphore": self.putlocks,
                 "timeouts": (self._pool.soft_timeout, self._pool.timeout)}
 
-    def set_on_process_up(self, callback):
-        self._pool.on_process_up
+    def init_callbacks(self, **kwargs):
+        for k, v in kwargs.iteritems():
+            setattr(self._pool, k, v)
 
-    def _get_on_process_up(self):
-        return self._pool.on_process_up
+    def handle_timeouts(self):
+        if self._pool._timeout_handler:
+            self._pool._timeout_handler.handle_event()
 
-    def _set_on_process_up(self, fun):
-        self._pool.on_process_up = fun
-    on_process_up = property(_get_on_process_up,
-                             _set_on_process_up)
+    def on_soft_timeout(self, job):
+        self._pool._timeout_handler.on_soft_timeout(job)
 
-    def _get_on_process_down(self):
-        return self._pool.on_process_down
-
-    def _set_on_process_down(self, fun):
-        self._pool.on_process_down = fun
-    on_process_down = property(_get_on_process_down,
-                               _set_on_process_down)
+    def on_hard_timeout(self, job):
+        self._pool._timeout_handler.on_hard_timeout(job)
 
     @property
     def num_processes(self):

+ 1 - 0
celery/worker/__init__.py

@@ -108,6 +108,7 @@ class Pool(abstract.StartStopComponent):
                             with_task_thread=threaded,
                             with_result_thread=threaded,
                             with_supervisor_thread=threaded,
+                            with_timeout_thread=threaded,
                             max_restarts=max_restarts,
                             semaphore=semaphore)
         return pool

+ 36 - 12
celery/worker/consumer.py

@@ -80,7 +80,7 @@ import logging
 import socket
 import threading
 
-from time import sleep
+from time import sleep, time
 from Queue import Empty
 
 from billiard.exceptions import WorkerLostError
@@ -371,6 +371,9 @@ class Consumer(object):
             fdmap = hub.fdmap
             poll = hub.poller.poll
             fire_timers = hub.fire_timers
+            apply_interval = hub.timer.apply_interval
+            apply_after = hub.timer.apply_after
+            apply_at = hub.timer.apply_at
             scheduled = hub.timer._queue
             transport = self.connection.transport
             on_poll_start = transport.on_poll_start
@@ -404,15 +407,36 @@ class Consumer(object):
 
             update_readers(self.connection.eventmap, self.pool.readers)
             for handler, interval in self.pool.timers.iteritems():
-                self.timer.apply_interval(interval * 1000.0, handler)
+                apply_interval(interval * 1000.0, handler)
 
-            def on_process_up(w):
-                hub.add(w.sentinel, self.pool._pool.maintain_pool)
-            self.pool.on_process_up = on_process_up
+            def on_timeout_set(R, soft, hard):
 
-            def on_process_down(w):
-                hub.remove(w.sentinel)
-            self.pool.on_process_down = on_process_down
+                def on_soft_timeout():
+                    if hard:
+                        R._tref = apply_at(time() + (hard - soft),
+                                        self.pool.on_hard_timeout, (R, ))
+                    self.pool.on_soft_timeout(R)
+                if soft:
+                    R._tref = apply_after(soft * 1000.0, on_soft_timeout)
+                elif hard:
+                    R._tref = apply_after(hard * 1000.0,
+                            self.pool_on_hard_timeout, (R, ))
+
+
+            def on_timeout_cancel(result):
+                try:
+                    result._tref.cancel()
+                    delattr(result, "_tref")
+                except AttributeError:
+                    pass
+
+            self.pool.init_callbacks(
+                on_process_up=lambda w: hub.add_reader(w.sentinel,
+                    self.pool._pool.maintain_pool),
+                on_process_down=lambda w: hub.remove(w.sentinel),
+                on_timeout_set=on_timeout_set,
+                on_timeout_cancel=on_timeout_cancel,
+            )
 
             transport.on_poll_init(hub.poller)
             self.task_consumer.callbacks = [on_task_received]
@@ -436,8 +460,8 @@ class Consumer(object):
 
                 update_readers(on_poll_start())
                 if fdmap:
-                    for timeout in (time_to_sleep, 0.001):
-                        for fileno, event in poll(timeout) or ():
+                    #for timeout in (time_to_sleep, 0.001):
+                        for fileno, event in poll(time_to_sleep) or ():
                             try:
                                 fdmap[fileno](fileno, event)
                             except Empty:
@@ -445,8 +469,8 @@ class Consumer(object):
                             except socket.error:
                                 if self._state != CLOSE:  # pragma: no cover
                                     raise
-                        if buffer:
-                            flush_buffer()
+                        #if buffer:
+                        #    flush_buffer()
                 else:
                     sleep(min(time_to_sleep, 0.1))