Sfoglia il codice sorgente

Autoreloader fixes

Ask Solem 13 anni fa
parent
commit
8058082c36

+ 2 - 9
celery/apps/worker.py

@@ -17,7 +17,6 @@ from ..app import app_or_default
 from ..app.abstract import configurated, from_config
 from ..exceptions import ImproperlyConfigured, SystemTerminate
 from ..utils import isatty, LOG_LEVELS, cry, qualname
-from ..utils.functional import maybe_list
 from ..worker import WorkController
 
 try:
@@ -96,6 +95,7 @@ class Worker(configurated):
         self.include = [] if include is None else include
         self.pidfile = pidfile
         self.autoscale = None
+        self.autoreload = autoreload
         if autoscale:
             max_c, _, min_c = autoscale.partition(",")
             self.autoscale = [int(max_c), min_c and int(min_c) or 0]
@@ -108,13 +108,6 @@ class Worker(configurated):
         if isinstance(self.include, basestring):
             self.include = self.include.split(",")
 
-        self.autoreload = autoreload
-        if autoreload:
-            imports = list(self.include)
-            imports.extend(maybe_list(
-                self.app.conf.get("CELERY_IMPORTS") or ()))
-            self.autoreload = set(imports)
-
         if not isinstance(self.loglevel, int):
             try:
                 self.loglevel = LOG_LEVELS[self.loglevel.upper()]
@@ -163,7 +156,7 @@ class Worker(configurated):
         self.loader = self.app.loader
         self.settings = self.app.conf
         for module in self.include:
-            self.loader.import_from_cwd(module)
+            self.loader.import_task_module(module)
 
     def redirect_stdouts_to_logger(self):
         self.app.log.setup(self.loglevel, self.logfile,

+ 2 - 0
celery/loaders/base.py

@@ -63,6 +63,7 @@ class BaseLoader(object):
     def __init__(self, app=None, **kwargs):
         from ..app import app_or_default
         self.app = app_or_default(app)
+        self.task_modules = set()
 
     def now(self):
         return datetime.utcnow()
@@ -85,6 +86,7 @@ class BaseLoader(object):
         pass
 
     def import_task_module(self, module):
+        self.task_modules.add(module)
         return self.import_from_cwd(module)
 
     def import_module(self, module, package=None):

+ 24 - 20
celery/tests/test_worker/test_worker_control.py

@@ -15,11 +15,12 @@ from celery.task import task
 from celery.registry import tasks
 from celery.utils import uuid
 from celery.utils.timer2 import Timer
+from celery.worker import WorkController as _WC
+from celery.worker import control
+from celery.worker import state
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
-from celery.worker import state
 from celery.worker.state import revoked
-from celery.worker import control
 from celery.worker.control import Panel
 from celery.tests.utils import Case
 
@@ -379,58 +380,61 @@ class test_ControlPanel(Case):
 
     def test_pool_restart(self):
         consumer = Consumer()
-        consumer.pool.restart = Mock()
+        consumer.controller = _WC()
+        consumer.controller.pool.restart = Mock()
         panel = self.create_panel(consumer=consumer)
         panel.app = self.app
         _import = panel.app.loader.import_from_cwd = Mock()
         _reload = Mock()
 
-        panel.handle("pool_restart", {"reload": _reload})
-        self.assertTrue(consumer.pool.restart.called)
+        panel.handle("pool_restart", {"reloader": _reload})
+        self.assertTrue(consumer.controller.pool.restart.called)
         self.assertFalse(_reload.called)
         self.assertFalse(_import.called)
 
     def test_pool_restart_import_modules(self):
         consumer = Consumer()
-        consumer.pool.restart = Mock()
+        consumer.controller = _WC()
+        consumer.controller.pool.restart = Mock()
         panel = self.create_panel(consumer=consumer)
         panel.app = self.app
-        _import = panel.app.loader.import_from_cwd = Mock()
+        _import = consumer.controller.app.loader.import_from_cwd = Mock()
         _reload = Mock()
 
-        panel.handle("pool_restart", {"imports": ["foo", "bar"],
-                                      "reload": _reload})
+        panel.handle("pool_restart", {"modules": ["foo", "bar"],
+                                      "reloader": _reload})
 
-        self.assertTrue(consumer.pool.restart.called)
+        self.assertTrue(consumer.controller.pool.restart.called)
         self.assertFalse(_reload.called)
         self.assertEqual([(("foo",), {}), (("bar",), {})],
                           _import.call_args_list)
 
     def test_pool_restart_relaod_modules(self):
         consumer = Consumer()
-        consumer.pool.restart = Mock()
+        consumer.controller = _WC()
+        consumer.controller.pool.restart = Mock()
         panel = self.create_panel(consumer=consumer)
         panel.app = self.app
         _import = panel.app.loader.import_from_cwd = Mock()
         _reload = Mock()
 
         with patch.dict(sys.modules, {"foo": None}):
-            panel.handle("pool_restart", {"imports": ["foo"],
-                                          "reload_imports": False,
-                                          "reload": _reload})
+            panel.handle("pool_restart", {"modules": ["foo"],
+                                          "reload": False,
+                                          "reloader": _reload})
 
-            self.assertTrue(consumer.pool.restart.called)
+            self.assertTrue(consumer.controller.pool.restart.called)
             self.assertFalse(_reload.called)
             self.assertFalse(_import.called)
 
             _import.reset_mock()
             _reload.reset_mock()
-            consumer.pool.restart.reset_mock()
+            consumer.controller.pool.restart.reset_mock()
 
-            panel.handle("pool_restart", {"imports": ["foo"],
-                                          "reload_imports": True,
-                                          "reload": _reload})
+            panel.handle("pool_restart", {"modules": ["foo"],
+                                          "reload": True,
+                                          "reloader": _reload})
 
-            self.assertTrue(consumer.pool.restart.called)
+            self.assertTrue(consumer.controller.pool.restart.called)
             self.assertTrue(_reload.called)
             self.assertFalse(_import.called)

+ 5 - 0
celery/utils/compat.py

@@ -15,6 +15,11 @@ from __future__ import absolute_import
 ############## py3k #########################################################
 import sys
 
+try:
+    reload = reload                     # noqa
+except NameError:
+    from imp import reload              # noqa
+
 try:
     from UserList import UserList       # noqa
 except ImportError:

+ 15 - 0
celery/worker/__init__.py

@@ -31,6 +31,7 @@ from ..app.abstract import configurated, from_config
 from ..exceptions import SystemTerminate
 from ..log import SilenceRepeated
 from ..utils import noop, qualname
+from ..utils.compat import reload as _reload
 
 from . import state
 from .buckets import TaskBucket, FastQueue
@@ -295,6 +296,20 @@ class WorkController(configurated):
         self._state = self.TERMINATE
         self._shutdown_complete.set()
 
+    def reload(self, modules=None, reload=False, reloader=None):
+        reloader = _reload if reloader is None else reloader
+        modules = self.app.loader.task_modules if modules is None else modules
+        imp = self.app.loader.import_from_cwd
+
+        for module in set(modules or ()):
+            if module not in sys.modules:
+                self.logger.debug("importing module %s" % (module, ))
+                imp(module)
+            elif reload:
+                self.logger.debug("reloading module %s" % (module, ))
+                reloader(sys.modules[module])
+        self.pool.restart()
+
     def on_timer_error(self, einfo):
         self.logger.error("Timer error: %r", einfo[1], exc_info=einfo)
 

+ 12 - 9
celery/worker/autoreload.py

@@ -17,7 +17,6 @@ import time
 
 from collections import defaultdict
 
-from .. import current_app
 from ..abstract import StartStopComponent
 from ..utils.threads import bgThread, Event
 
@@ -39,6 +38,7 @@ class WorkerComponent(StartStopComponent):
 
     def create(self, w):
         w.autoreloader = self.instantiate(w.autoreloader_cls,
+                                          controller=w,
                                           modules=w.autoreload,
                                           logger=w.logger)
         return w.autoreloader
@@ -185,17 +185,21 @@ class Autoreloader(bgThread):
     """Tracks changes in modules and fires reload commands"""
     Monitor = Monitor
 
-    def __init__(self, modules, monitor_cls=None, logger=None, **kwargs):
+    def __init__(self, controller, modules=None, monitor_cls=None,
+            logger=None, **options):
         super(Autoreloader, self).__init__()
-        self.daemon = True
+        self.controller = controller
+        app = self.controller.app
+        self.modules = app.loader.task_modules if modules is None else modules
         self.logger = logger
-        files = [sys.modules[m].__file__ for m in modules]
+        self.options = options
         self.Monitor = monitor_cls or self.Monitor
-        self._monitor = self.Monitor(files, self.on_change,
-                shutdown_event=self._is_shutdown, **kwargs)
-        self._hashes = dict([(f, file_hash(f)) for f in files])
 
     def body(self):
+        files = [sys.modules[m].__file__ for m in self.modules]
+        self._monitor = self.Monitor(files, self.on_change,
+                shutdown_event=self._is_shutdown, **self.options)
+        self._hashes = dict([(f, file_hash(f)) for f in files])
         try:
             self._monitor.start()
         except OSError, exc:
@@ -217,8 +221,7 @@ class Autoreloader(bgThread):
             self._reload(map(self._module_name, modified))
 
     def _reload(self, modules):
-        current_app.control.broadcast("pool_restart",
-                arguments={"imports": modules, "reload_modules": True})
+        self.controller.reload(modules, reload=True)
 
     def stop(self):
         self._monitor.stop()

+ 3 - 18
celery/worker/control.py

@@ -24,11 +24,6 @@ from ..utils.encoding import safe_repr
 from . import state
 from .state import revoked
 
-try:
-    reload
-except NameError:
-    from imp import reload
-
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 
 
@@ -244,19 +239,9 @@ def pool_shrink(panel, n=1, **kwargs):
 
 
 @Panel.register
-def pool_restart(panel, imports=None, reload_imports=False,
-                 reload=reload, **kwargs):
-    imports = set(imports or [])
-    for m in imports:
-        if m not in sys.modules:
-            panel.app.loader.import_from_cwd(m)
-            panel.logger.debug("imported %s module" % m)
-        elif reload_imports:
-            reload(sys.modules[m])
-            panel.logger.debug("reloaded %s module" % m)
-
-    panel.consumer.pool.restart()
-    return {"ok": "started restarting worker processes"}
+def pool_restart(panel, modules=None, reload=False, reloader=None, **kwargs):
+    panel.consumer.controller.reload(modules, reload, reloader=reloader)
+    return {"ok": "reload started"}
 
 
 @Panel.register

+ 17 - 13
docs/userguide/workers.rst

@@ -367,29 +367,33 @@ modules.  This command does not interrupt executing tasks.
 Example
 ~~~~~~~
 
-Runnig the following command will result in the `foo` and `bar` modules
+Running the following command will result in the `foo` and `bar` modules
 being imported by the worker processes:
 
 .. code-block:: python
 
     >>> from celery.task.control import broadcast
-    >>> broadcast("pool_restart", arguments={"imports":["foo", "bar"]})
+    >>> broadcast("pool_restart", arguments={"modules": ["foo", "bar"]})
 
-If you want to reload all modules you can use:
+Use the ``reload`` argument to reload modules it has already imported:
 
 .. code-block:: python
 
-    >>> from celery.task.control import broadcast
-    >>> from celery import current_app
-    >>> modules = current_app.conf.CELERY_IMPORTS
-    >>> broadcast("pool_restart",
-                  arguments={"imports":modules, "reload_modules":True})
-
-`imports` argument is a list of modules to modify. `reload_modules`
-specifies whether to reload modules if they are previously imported.
-By default `reload_modules` is `False`. The `pool_restart` command uses the
+    >>> broadcast("pool_restart", arguments={"modules": ["foo"],
+                                             "reload": True})
+
+If you don't specify any modules then all known tasks modules will
+be imported/reloaded:
+
+.. code-block:: python
+
+    >>> broadcast("pool_restart", arguments={"reload": True})
+
+The ``modules`` argument is a list of modules to modify. ``reload``
+specifies whether to reload modules if they have previously been imported.
+By default ``reload`` is disabled. The `pool_restart` command uses the
 Python :func:`reload` function to reload modules, or you can provide
-your own custom reloader.
+your own custom reloader by passing the ``reloader`` argument.
 
 .. note::