Procházet zdrojové kódy

celeryd: Can now enable/disable events using remote control:

    >>> from celery.task.control import broadcast
    >>> broadcast("enable_events")
    >>> broadcast("disable_events")
Ask Solem před 15 roky
rodič
revize
f4ac0307df
2 změnil soubory, kde provedl 31 přidání a 4 odebrání
  1. 13 4
      celery/events/__init__.py
  2. 18 0
      celery/worker/control/builtins.py

+ 13 - 4
celery/events/__init__.py

@@ -35,16 +35,25 @@ class EventDispatcher(object):
 
     """
 
-    def __init__(self, connection, hostname=None, enabled=True,
-            publisher=None):
+    def __init__(self, connection, hostname=None, enabled=True):
         self.connection = connection
         self.hostname = hostname or socket.gethostname()
         self.enabled = enabled
         self._lock = threading.Lock()
-
         self.publisher = None
+
         if self.enabled:
-            self.publisher = publisher or EventPublisher(self.connection)
+            self.enable()
+
+    def enable(self):
+        self.enabled = True
+        self.publisher = EventPublisher(self.connection)
+
+    def disable(self):
+        self.enabled = False
+        if self.publisher is not None:
+            self.publisher.close()
+            self.publisher = None
 
     def send(self, type, **fields):
         """Send event.

+ 18 - 0
celery/worker/control/builtins.py

@@ -26,6 +26,24 @@ def revoke(panel, task_id, task_name=None, **kwargs):
     return True
 
 
+@Panel.register
+def enable_events(panel):
+    dispatcher = panel.listener.event_dispatcher
+    dispatcher.enable()
+    dispatcher.send("worker-online")
+    panel.logger.warn("Events enabled by remote.")
+    return {"ok": "events enabled"}
+
+
+@Panel.register
+def disable_events(panel):
+    dispatcher = panel.listener.event_dispatcher
+    dispatcher.send("worker-offline")
+    dispatcher.disable()
+    panel.logger.warn("Events disabled by remote.")
+    return {"ok": "events disabled"}
+
+
 @Panel.register
 def rate_limit(panel, task_name, rate_limit, **kwargs):
     """Set new rate limit for a task type.