Ask Solem 12 роки тому
батько
коміт
81dab6d296

+ 14 - 3
celery/events/__init__.py

@@ -48,6 +48,10 @@ def Event(type, _fields=None, **fields):
     return event
 
 
+def domain(type):
+    return type.split('-', 1)
+
+
 class EventDispatcher(object):
     """Send events as messages.
 
@@ -69,10 +73,11 @@ class EventDispatcher(object):
     You need to :meth:`close` this after use.
 
     """
+    DISABLED_TRANSPORTS = set(['sql'])
 
     def __init__(self, connection=None, hostname=None, enabled=True,
             channel=None, buffer_while_offline=True, app=None,
-            serializer=None):
+            serializer=None, domains=None):
         self.app = app_or_default(app or self.app)
         self.connection = connection
         self.channel = channel
@@ -85,10 +90,12 @@ class EventDispatcher(object):
         self.on_enabled = set()
         self.on_disabled = set()
         self.tzoffset = [-time.timezone, -time.altzone]
-
-        self.enabled = enabled
+        self.domains = set(domains or [])
         if not connection and channel:
             self.connection = channel.connection.client
+        self.enabled = enabled
+        if self.connection.transport.driver_type in self.DISABLED_TRANSPORTS:
+            self.enabled = False
         if self.enabled:
             self.enable()
 
@@ -127,6 +134,10 @@ class EventDispatcher(object):
 
         """
         if self.enabled:
+            domains = self.domains
+            if domains and domain(type) not in domains:
+                return
+
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
                                     clock=self.app.clock.forward(),

+ 5 - 2
celery/events/state.py

@@ -275,8 +275,11 @@ class State(object):
         hostname = fields.pop('hostname', None)
         if hostname:
             worker = self.get_or_create_worker(hostname)
-            handler = getattr(worker, 'on_' + type, None)
-            if handler:
+            try:
+                handler = self.__dict__['on_' + type]
+            except KeyError:
+                pass
+            else:
                 handler(**fields)
 
     def task_event(self, type, fields):

+ 29 - 1
celery/worker/consumer.py

@@ -10,9 +10,12 @@ up and running.
 """
 from __future__ import absolute_import
 
+import kombu
 import logging
 import socket
 
+from functools import partial
+
 from kombu.common import QoS, ignore_errors
 from kombu.syn import _detect_environment
 from kombu.utils.encoding import safe_repr
@@ -119,6 +122,7 @@ class Consumer(object):
             'celery.worker.consumer:Tasks',
             'celery.worker.consumer:Evloop',
             'celery.worker.consumer:Agent',
+            'celery.worker.consumer:Gossip',
         ]
 
         def shutdown(self, parent):
@@ -382,13 +386,15 @@ class Events(bootsteps.StartStopStep):
 
     def __init__(self, c, send_events=None, **kwargs):
         self.send_events = send_events
+        self.domains = None if self.send_events else ['worker']
         c.event_dispatcher = None
 
     def start(self, c):
         # Flush events sent while connection was down.
         prev = c.event_dispatcher
         dis = c.event_dispatcher = c.app.events.Dispatcher(
-            c.connection, hostname=c.hostname, enabled=self.send_events,
+            c.connection, hostname=c.hostname,
+            enabled=self.send_events, domains=self.domains,
         )
         if prev:
             dis.copy_buffer(prev)
@@ -467,6 +473,28 @@ class Agent(bootsteps.StartStopStep):
         return agent
 
 
+class Gossip(bootsteps.ConsumerStep):
+    label = 'gossip'
+    requires = (Connection, )
+
+    def __init__(self, c, **kwargs):
+        self.Receiver = c.app.events.Receiver
+        self.hostname = c.hostname
+
+        self.state = c.cluster = c.app.events.State()
+        self.update_state = self.state.worker_event
+
+    def get_consumers(self, channel):
+        events = self.Receiver(channel, routing_key='worker.#')
+        events.process = self.on_event
+        return events.get_consumers(partial(kombu.Consumer, channel), channel)
+
+    def on_event(self, type, event):
+        if event['hostname'] != self.hostname:
+            print('Got event: %r %r' % (type, event))
+            self.update_state(type, event)
+
+
 class Evloop(bootsteps.StartStopStep):
     label = 'event loop'
     last = True

+ 4 - 3
celery/worker/control.py

@@ -62,7 +62,8 @@ def report(panel):
 @Panel.register
 def enable_events(panel):
     dispatcher = panel.consumer.event_dispatcher
-    if not dispatcher.enabled:
+    if 'task' not in dispatcher.domains:
+        dispatcher.domains.add('task')
         dispatcher.enable()
         dispatcher.send('worker-online')
         logger.info('Events enabled by remote.')
@@ -73,9 +74,9 @@ def enable_events(panel):
 @Panel.register
 def disable_events(panel):
     dispatcher = panel.consumer.event_dispatcher
-    if dispatcher.enabled:
+    if 'task' in dispatcher.domains:
+        dispatcher.domains.discard('task')
         dispatcher.send('worker-offline')
-        dispatcher.disable()
         logger.info('Events disabled by remote.')
         return {'ok': 'events disabled'}
     return {'ok': 'events already disabled'}