Przeglądaj źródła

Adds autoreloader component

Mher Movsisyan 13 lat temu
rodzic
commit
145d6cce54

+ 1 - 0
celery/app/defaults.py

@@ -151,6 +151,7 @@ NAMESPACES = {
     },
     "CELERYD": {
         "AUTOSCALER": Option("celery.worker.autoscale.Autoscaler"),
+        "AUTORELOADER": Option("celery.worker.autoreload.AutoReloader"),
         "BOOT_STEPS": Option((), type="tuple"),
         "CONCURRENCY": Option(0, type="int"),
         "ETA_SCHEDULER": Option(None, type="string"),

+ 10 - 1
celery/apps/worker.py

@@ -17,6 +17,7 @@ from ..app import app_or_default
 from ..app.abstract import configurated, from_config
 from ..exceptions import ImproperlyConfigured, SystemTerminate
 from ..utils import isatty, LOG_LEVELS, cry, qualname
+from ..utils.functional import maybe_list
 from ..worker import WorkController
 
 try:
@@ -79,7 +80,7 @@ class Worker(configurated):
 
     def __init__(self, hostname=None, discard=False, embed_clockservice=False,
             queues=None, include=None, app=None, pidfile=None,
-            autoscale=None, **kwargs):
+            autoscale=None, autoreload=False, **kwargs):
         self.app = app = app_or_default(app)
         self.setup_defaults(kwargs, namespace="celeryd")
         if not self.concurrency:
@@ -107,6 +108,13 @@ class Worker(configurated):
         if isinstance(self.include, basestring):
             self.include = self.include.split(",")
 
+        self.autoreload = autoreload
+        if autoreload:
+            imports = list(self.include)
+            imports.extend(maybe_list(
+                self.app.conf.get("CELERY_IMPORTS") or ()))
+            self.autoreload = set(imports)
+
         if not isinstance(self.loglevel, int):
             try:
                 self.loglevel = LOG_LEVELS[self.loglevel.upper()]
@@ -219,6 +227,7 @@ class Worker(configurated):
                                     ready_callback=self.on_consumer_ready,
                                     embed_clockservice=self.embed_clockservice,
                                     autoscale=self.autoscale,
+                                    autoreload=self.autoreload,
                                     **self.confopts_as_dict())
         self.install_platform_tweaks(worker)
         worker.start()

+ 3 - 0
celery/bin/celeryd.py

@@ -181,6 +181,9 @@ class WorkerCommand(Command):
                      "max_concurrency,min_concurrency. Example: "
                      "--autoscale=10,3 (always keep 3 processes, "
                      "but grow to 10 if necessary)."),
+            Option('--autoreload', dest="autoreload",
+                    action="store_true", default=False,
+                help="Enable autoreloading."),
         )
 
 

+ 2 - 0
celery/worker/__init__.py

@@ -40,6 +40,7 @@ TERMINATE = 0x3
 class Namespace(abstract.Namespace):
     name = "worker"
     builtin_boot_steps = ("celery.worker.autoscale",
+                          "celery.worker.autoreload",
                           "celery.worker.consumer",
                           "celery.worker.mediator")
 
@@ -144,6 +145,7 @@ class WorkController(configurated):
     eta_scheduler_cls = from_config("eta_scheduler")
     eta_scheduler_precision = from_config()
     autoscaler_cls = from_config("autoscaler")
+    autoreloader_cls = from_config("autoreloader")
     schedule_filename = from_config()
     scheduler_cls = from_config("celerybeat_scheduler")
     task_time_limit = from_config()

+ 32 - 4
celery/worker/autoreload.py

@@ -13,10 +13,27 @@ import sys
 import time
 import select
 import hashlib
+import threading
 
 from collections import defaultdict
 
 from .. import current_app
+from ..abstract import StartStopComponent
+
+
+class WorkerComponent(StartStopComponent):
+    name = "worker.autoreloader"
+    requires = ("pool", )
+
+    def __init__(self, w, **kwargs):
+        self.enabled = w.autoreload
+        w.autoreloader = None
+
+    def create(self, w):
+        w.autoreloader = self.instantiate(w.autoreloader_cls,
+                                          modules=w.autoreload,
+                                          logger=w.logger)
+        return w.autoreloader
 
 
 def file_hash(filename, algorithm='md5'):
@@ -151,11 +168,16 @@ else:
     _monitor_cls = StatMonitor
 
 
-class AutoReloader(object):
+class AutoReloader(threading.Thread):
     """Tracks changes in modules and fires reload commands"""
-    def __init__(self, modules, monitor_cls=_monitor_cls, *args, **kwargs):
-        self._monitor = monitor_cls(modules, self.on_change, *args, **kwargs)
-        self._hashes = dict([(f, file_hash(f)) for f in modules])
+    def __init__(self, modules, monitor_cls=_monitor_cls, logger=None,
+                 *args, **kwargs):
+        super(AutoReloader, self).__init__()
+        self.daemon = True
+        self.logger = logger
+        files = [sys.modules[m].__file__ for m in modules]
+        self._monitor = monitor_cls(files, self.on_change, *args, **kwargs)
+        self._hashes = dict([(f, file_hash(f)) for f in files])
 
     def start(self):
         self._monitor.start()
@@ -168,6 +190,8 @@ class AutoReloader(object):
                 modified.append(f)
                 self._hashes[f] = fhash
         if modified:
+            self.logger.debug("Detected modified modules: %s" %\
+                    map(self._module_name, modified))
             self._reload(map(self._module_name, modified))
 
     def _reload(self, modules):
@@ -177,3 +201,7 @@ class AutoReloader(object):
     @classmethod
     def _module_name(cls, path):
         return os.path.splitext(os.path.basename(path))[0]
+
+    def stop(self):
+        if hasattr(self._monitor, 'stop'):
+            self._monitor.stop()