Browse Source

Fixes --autoreload with kqueue. Closes celery/kombu#267

Ask Solem 11 years ago
parent
commit
7b9d982495
3 changed files with 17 additions and 5 deletions
  1. 5 1
      celery/concurrency/prefork.py
  2. 3 0
      celery/worker/__init__.py
  3. 9 4
      celery/worker/autoreload.py

+ 5 - 1
celery/concurrency/prefork.py

@@ -20,6 +20,7 @@ from celery._state import set_default_app, _set_task_join_will_block
 from celery.app import trace
 from celery.concurrency.base import BasePool
 from celery.five import items
+from celery.utils.functional import noop
 from celery.utils.log import get_logger
 
 from .asynpool import AsynPool
@@ -131,7 +132,10 @@ class TaskPool(BasePool):
         self.grow = P.grow
         self.shrink = P.shrink
         self.flush = getattr(P, 'flush', None)  # FIXME add to billiard
-        self.restart = P.restart
+
+    def restart(self):
+        self._pool.restart()
+        self._pool.apply_async(noop)
 
     def did_start_ok(self):
         return self._pool.did_start_ok()

+ 3 - 0
celery/worker/__init__.py

@@ -280,6 +280,9 @@ class WorkController(object):
             elif reload:
                 logger.debug('reloading module %s', module)
                 reload_from_cwd(sys.modules[module], reloader)
+
+        self.consumer.update_strategies()
+        self.consumer.reset_rate_limits()
         self.pool.restart()
 
     def info(self):

+ 9 - 4
celery/worker/autoreload.py

@@ -135,11 +135,14 @@ class KQueueMonitor(BaseMonitor):
         self.fdmap = {}
 
     def register_with_event_loop(self, hub):
-        self.add_events(hub.poller)
-        hub.poller.on_file_change = self.handle_event
+        from kombu.utils.eventio import _kqueue
+        self._kq = _kqueue()
+        self.add_events(self._kq)
+        self._kq.on_file_change = self.handle_event
+        hub.add_reader(self._kq._kqueue, self._kq.poll, 0)
 
     def on_event_loop_close(self, hub):
-        self.close(hub.poller)
+        self.close(self._kq)
 
     def add_events(self, poller):
         for f in self.filemap:
@@ -224,7 +227,9 @@ class InotifyMonitor(_ProcessEvent):
 
 
 def default_implementation():
-    if sys.platform.startswith('linux') and pyinotify:
+    if hasattr(select, 'kqueue'):
+        return 'kqueue'
+    elif sys.platform.startswith('linux') and pyinotify:
         return 'inotify'
     else:
         return 'stat'