Browse Source

Adds missing commands to app.control

Ask Solem 8 years ago
parent
commit
9ac8de10ad
3 changed files with 98 additions and 5 deletions
  1. 50 2
      celery/app/control.py
  2. 2 3
      celery/worker/control.py
  3. 46 0
      t/unit/app/test_control.py

+ 50 - 2
celery/app/control.py

@@ -205,6 +205,18 @@ class Control(object):
                                          'terminate': terminate,
                                          'signal': signal}, **kwargs)
 
+    def terminate(self, task_id,
+                  destination=None, signal=TERM_SIGNAME, **kwargs):
+        """Tell all (or specific) workers to terminate a task by id.
+
+        See Also:
+            This is just a shortcut to :meth:`revoke` with the terminate
+            argument enabled.
+        """
+        return self.revoke(
+            task_id,
+            destination=destination, terminate=True, signal=signal, **kwargs)
+
     def ping(self, destination=None, timeout=1, **kwargs):
         """Ping all (or specific) workers.
 
@@ -326,12 +338,48 @@ class Control(object):
     def autoscale(self, max, min, destination=None, **kwargs):
         """Change worker(s) autoscale setting.
 
-        Supports the same arguments as :meth:`broadcast`.
-
+        See Also:
+            Supports the same arguments as :meth:`broadcast`.
         """
         return self.broadcast(
             'autoscale', {'max': max, 'min': min}, destination, **kwargs)
 
+    def shutdown(self, destination=None, **kwargs):
+        """Shutdown worker(s).
+
+        See Also:
+            Supports the same arguments as :meth:`broadcast`
+        """
+        return self.broadcast(
+            'shutdown', {}, destination, **kwargs)
+
+    def pool_restart(self, modules=None, reload=False, reloader=None,
+                     destination=None, **kwargs):
+        """Restart the execution pools of all or specific workers.
+
+        Keyword Arguments:
+            modules (Sequence[str]): List of modules to reload.
+            reload (bool): Flag to enable module reloading.  Default is False.
+            reloader (Any):  Function to reload a module.
+            destination (Sequence[str]): List of worker names to send this
+                command to.
+
+        See Also:
+            Supports the same arguments as :meth:`broadcast`
+        """
+        return self.broadcast(
+            'pool_restart',
+            {'modules': modules, 'reload': reload, 'reloader': reloader},
+            destination, **kwargs)
+
+    def heartbeat(self, destination=None, **kwargs):
+        """Tell worker(s) to send a heartbeat immediately.
+
+        See Also:
+            Supports the same arguments as :meth:`broadcast`
+        """
+        return self.broadcast('heartbeat', {}, destination, **kwargs)
+
     def broadcast(self, command, arguments=None, destination=None,
                   connection=None, reply=False, timeout=1, limit=None,
                   callback=None, channel=None, **extra_kwargs):

+ 2 - 3
celery/worker/control.py

@@ -43,9 +43,8 @@ def nok(value):
 class Panel(UserDict):
     """Global registry of remote control commands."""
 
-    data = dict()  # global dict.
-    meta = dict()
-    by_alias = dict()
+    data = {}      # global dict.
+    meta = {}      # -"-
 
     @classmethod
     def register(cls, *args, **kwargs):

+ 46 - 0
t/unit/app/test_control.py

@@ -8,6 +8,32 @@ from vine.utils import wraps
 from celery import uuid
 from celery.app import control
 from celery.exceptions import DuplicateNodenameWarning
+from celery.five import items
+
+
+def _info_for_commandclass(type_):
+    from celery.worker.control import Panel
+    return [
+        (name, info)
+        for name, info in items(Panel.meta)
+        if info.type == type_
+    ]
+
+
+def test_client_implements_all_commands(app):
+    commands = _info_for_commandclass('control')
+    assert commands
+    for name, info in commands:
+        assert getattr(app.control, name)
+
+
+def test_inspect_implements_all_commands(app):
+    inspect = app.control.inspect()
+    commands = _info_for_commandclass('inspect')
+    assert commands
+    for name, info in commands:
+        if info.type == 'inspect':
+            assert getattr(inspect, name)
 
 
 class MockMailbox(Mailbox):
@@ -204,6 +230,26 @@ class test_Broadcast:
         self.control.cancel_consumer('foo')
         assert 'cancel_consumer' in MockMailbox.sent
 
+    @with_mock_broadcast
+    def test_shutdown(self):
+        self.control.shutdown()
+        assert 'shutdown' in MockMailbox.sent
+
+    @with_mock_broadcast
+    def test_heartbeat(self):
+        self.control.heartbeat()
+        assert 'heartbeat' in MockMailbox.sent
+
+    @with_mock_broadcast
+    def test_pool_restart(self):
+        self.control.pool_restart()
+        assert 'pool_restart' in MockMailbox.sent
+
+    @with_mock_broadcast
+    def test_terminate(self):
+        self.control.terminate('124')
+        assert 'revoke' in MockMailbox.sent
+
     @with_mock_broadcast
     def test_enable_events(self):
         self.control.enable_events()