Browse Source

Gossip revival

Ask Solem 12 years ago
parent
commit
3e21670d7d

+ 35 - 16
celery/events/__init__.py

@@ -17,6 +17,7 @@ import threading
 from collections import deque
 from contextlib import contextmanager
 from copy import copy
+from operator import itemgetter
 
 from kombu import Exchange, Queue, Producer
 from kombu.mixins import ConsumerMixin
@@ -24,9 +25,12 @@ from kombu.utils import cached_property
 
 from celery.app import app_or_default
 from celery.utils import uuid
+from celery.utils.timeutils import adjust_timestamp, utcoffset
 
 event_exchange = Exchange('celeryev', type='topic')
 
+_TZGETTER = itemgetter('utcoffset', 'timestamp')
+
 
 def get_exchange(conn):
     ex = copy(event_exchange)
@@ -48,8 +52,8 @@ def Event(type, _fields=None, **fields):
     return event
 
 
-def domain(type):
-    return type.split('-', 1)
+def group_from(type):
+    return type.split('-', 1)[0]
 
 
 class EventDispatcher(object):
@@ -60,9 +64,10 @@ class EventDispatcher(object):
     :keyword hostname: Hostname to identify ourselves as,
         by default uses the hostname returned by :func:`socket.gethostname`.
 
-    :keyword domains: List of domains to send events for.  :meth:`send` will
-        ignore send requests to domains not in this list.
-        If this is :const:`None`, all events will be sent.
+    :keyword groups: List of groups to send events for.  :meth:`send` will
+        ignore send requests to groups not in this list.
+        If this is :const:`None`, all events will be sent. Example groups
+        include ``"task"`` and ``"worker"``.
 
     :keyword enabled: Set to :const:`False` to not actually publish any events,
         making :meth:`send` a noop operation.
@@ -81,7 +86,7 @@ class EventDispatcher(object):
 
     def __init__(self, connection=None, hostname=None, enabled=True,
             channel=None, buffer_while_offline=True, app=None,
-            serializer=None, domains=None):
+            serializer=None, groups=None):
         self.app = app_or_default(app or self.app)
         self.connection = connection
         self.channel = channel
@@ -93,8 +98,7 @@ class EventDispatcher(object):
         self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
         self.on_enabled = set()
         self.on_disabled = set()
-        self.tzoffset = [-time.timezone, -time.altzone]
-        self.domains = set(domains or [])
+        self.groups = set(groups or [])
         if not connection and channel:
             self.connection = channel.connection.client
         self.enabled = enabled
@@ -102,6 +106,7 @@ class EventDispatcher(object):
             self.enabled = False
         if self.enabled:
             self.enable()
+        self.headers = {'hostname': self.hostname}
 
     def __enter__(self):
         return self
@@ -138,17 +143,18 @@ class EventDispatcher(object):
 
         """
         if self.enabled:
-            domains = self.domains
-            if domains and domain(type) not in domains:
+            groups = self.groups
+            if groups and group_from(type) not in groups:
                 return
 
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
                                     clock=self.app.clock.forward(),
-                                    tzoffset=self.tzoffset, **fields)
+                                    utcoffset=utcoffset(), **fields)
                 try:
                     self.publisher.publish(event,
-                                           routing_key=type.replace('-', '.'))
+                                           routing_key=type.replace('-', '.'),
+                                           headers=self.headers)
                 except Exception as exc:
                     if not self.buffer_while_offline:
                         raise
@@ -196,6 +202,7 @@ class EventReceiver(ConsumerMixin):
                            routing_key=self.routing_key,
                            auto_delete=True,
                            durable=False)
+        self.adjust_clock = self.app.clock.adjust
 
     def get_exchange(self):
         return get_exchange(self.connection)
@@ -232,12 +239,24 @@ class EventReceiver(ConsumerMixin):
                                    connection=self.connection,
                                    channel=channel)
 
-    def _receive(self, body, message):
-        type = body.pop('type').lower()
+    def event_from_message(self, body, localize=True):
+        type = body.get('type', '').lower()
         clock = body.get('clock')
         if clock:
-            self.app.clock.adjust(clock)
-        self.process(type, Event(type, body))
+            self.adjust_clock(body.get('clock') or 0)
+        if localize:
+            try:
+                offset, timestamp = _TZGETTER(body)
+            except KeyError:
+                pass
+            else:
+                body['timestamp'] = adjust_timestamp(timestamp, offset)
+        from datetime import datetime
+        print('TS: %s' % (datetime.fromtimestamp(body['timestamp']), ))
+        return type, Event(type, body)
+
+    def _receive(self, body, message):
+        self.process(*self.event_from_message(body))
 
 
 class Events(object):

+ 29 - 20
celery/events/state.py

@@ -33,6 +33,7 @@ from celery.datastructures import AttributeDict, LRUCache
 # then the worker is considered to be offline.
 HEARTBEAT_EXPIRE_WINDOW = 200
 
+from datetime import datetime
 
 def heartbeat_expires(timestamp, freq=60,
         expire_window=HEARTBEAT_EXPIRE_WINDOW):
@@ -75,7 +76,7 @@ class Worker(Element):
                 self.heartbeats = self.heartbeats[self.heartbeat_max:]
 
     def __repr__(self):
-        return '<Worker: {0.hostname} (0.status_string)'.format(self)
+        return '<Worker: {0.hostname} ({0.status_string})'.format(self)
 
     @property
     def status_string(self):
@@ -88,7 +89,7 @@ class Worker(Element):
 
     @property
     def alive(self):
-        return (self.heartbeats and time() < self.heartbeat_expires)
+        return bool(self.heartbeats and time() < self.heartbeat_expires)
 
 
 class Task(Element):
@@ -216,8 +217,10 @@ class State(object):
         self.workers = LRUCache(limit=max_workers_in_memory)
         self.tasks = LRUCache(limit=max_tasks_in_memory)
         self.event_callback = callback
-        self.group_handlers = {'worker': self.worker_event,
-                               'task': self.task_event}
+        self.group_handlers = {
+            'worker': self.worker_event,
+            'task': self.task_event,
+        }
         self._mutex = threading.Lock()
 
     def freeze_while(self, fun, *args, **kwargs):
@@ -253,41 +256,46 @@ class State(object):
             return self._clear(ready)
 
     def get_or_create_worker(self, hostname, **kwargs):
-        """Get or create worker by hostname."""
+        """Get or create worker by hostname.
+
+        Returns tuple of ``(worker, was_created)``.
+        """
         try:
             worker = self.workers[hostname]
             worker.update(kwargs)
+            return worker, False
         except KeyError:
             worker = self.workers[hostname] = Worker(
                     hostname=hostname, **kwargs)
-        return worker
+            return worker, True
 
     def get_or_create_task(self, uuid):
         """Get or create task by uuid."""
         try:
-            return self.tasks[uuid]
+            return self.tasks[uuid], True
         except KeyError:
             task = self.tasks[uuid] = Task(uuid=uuid)
-            return task
+            return task, False
 
     def worker_event(self, type, fields):
         """Process worker event."""
-        hostname = fields.pop('hostname', None)
-        if hostname:
-            worker = self.get_or_create_worker(hostname)
-            try:
-                handler = self.__dict__['on_' + type]
-            except KeyError:
-                pass
-            else:
+        try:
+            hostname = fields['hostname']
+        except KeyError:
+            pass
+        else:
+            worker, created = self.get_or_create_worker(hostname)
+            handler = getattr(worker, 'on_' + type, None)
+            if handler:
                 handler(**fields)
+            return worker, created
 
     def task_event(self, type, fields):
         """Process task event."""
         uuid = fields.pop('uuid')
         hostname = fields.pop('hostname')
-        worker = self.get_or_create_worker(hostname)
-        task = self.get_or_create_task(uuid)
+        worker, _ = self.get_or_create_worker(hostname)
+        task, created = self.get_or_create_task(uuid)
         handler = getattr(task, 'on_' + type, None)
         if type == 'received':
             self.task_count += 1
@@ -296,6 +304,7 @@ class State(object):
         else:
             task.on_unknown_event(type, **fields)
         task.worker = worker
+        return created
 
     def event(self, event):
         with self._mutex:
@@ -304,8 +313,8 @@ class State(object):
     def _dispatch_event(self, event):
         self.event_count += 1
         event = kwdict(event)
-        group, _, type = event.pop('type').partition('-')
-        self.group_handlers[group](type, event)
+        group, _, subject = event.pop('type').partition('-')
+        self.group_handlers[group](subject, event)
         if self.event_callback:
             self.event_callback(self, event)
 

+ 36 - 0
celery/utils/sysinfo.py

@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import os
+
+from math import ceil
+
+from kombu.utils import cached_property
+
+
+def load_average():
+    return tuple(ceil(l * 1e2) / 1e2 for l in os.getloadavg())
+
+
+class df(object):
+
+    def __init__(self, path):
+        self.path = path
+
+    @property
+    def total_blocks(self):
+        return self.stat.f_blocks * self.stat.f_frsize / 1024
+
+    @property
+    def available(self):
+        return self.stat.f_bavail * self.stat.f_frsize / 1024
+
+    @property
+    def capacity(self):
+        avail = self.stat.f_bavail
+        used = self.stat.f_blocks - self.stat.f_bfree
+        return int(ceil(used * 100.0 / (used + avail) + 0.5))
+
+    @cached_property
+    def stat(self):
+        return os.statvfs(os.path.abspath(self.path))

+ 14 - 0
celery/utils/timeutils.py

@@ -335,3 +335,17 @@ class ffwd(object):
             'hour': self.hour, 'minute': self.minute,
             'second': self.second, 'microsecond': self.microsecond,
         }, **extra)
+
+
+__timezone__ = -_time.timezone
+__altzone__ = -_time.altzone
+
+
+def utcoffset():
+    if _time.daylight:
+        return (__timezone__ + __altzone__) // 3600
+    return __timezone__ // 3600
+
+
+def adjust_timestamp(ts, offset, here=utcoffset):
+    return ts - (offset - here()) * 3600

+ 51 - 13
celery/worker/consumer.py

@@ -385,8 +385,8 @@ class Events(bootsteps.StartStopStep):
     requires = (Connection, )
 
     def __init__(self, c, send_events=None, **kwargs):
-        self.send_events = send_events
-        self.domains = None if self.send_events else ['worker']
+        self.send_events = True
+        self.groups = None if send_events else ['worker']
         c.event_dispatcher = None
 
     def start(self, c):
@@ -394,7 +394,7 @@ class Events(bootsteps.StartStopStep):
         prev = c.event_dispatcher
         dis = c.event_dispatcher = c.app.events.Dispatcher(
             c.connection, hostname=c.hostname,
-            enabled=self.send_events, domains=self.domains,
+            enabled=self.send_events, groups=self.groups,
         )
         if prev:
             dis.copy_buffer(prev)
@@ -477,22 +477,60 @@ class Gossip(bootsteps.ConsumerStep):
     label = 'gossip'
     requires = (Connection, )
 
-    def __init__(self, c, **kwargs):
+    def __init__(self, c, interval=5.0, **kwargs):
         self.Receiver = c.app.events.Receiver
         self.hostname = c.hostname
 
-        self.state = c.cluster = c.app.events.State()
+        self.timer = c.timer
+        self.state = c.gossip = c.app.events.State()
+        self.interval = interval
+        self._tref = None
         self.update_state = self.state.worker_event
 
+    def on_node_join(self, worker):
+        info('{0.hostname} joined the party'.format(worker))
+
+    def on_node_leave(self, worker):
+        info('{0.hostname} left'.format(worker))
+
+    def on_node_lost(self, worker):
+        warning('{0.hostname} went missing!')
+
+    def register_timer(self):
+        if self._tref is not None:
+            self._tref.cancel()
+        self.timer.apply_interval(self.interval * 1000.0, self.periodic)
+
+    def periodic(self):
+        for worker in self.state.workers.itervalues():
+            if not worker.alive:
+                try:
+                    self.on_node_lost(worker)
+                finally:
+                    self.state.workers.pop(worker.hostname, None)
+
     def get_consumers(self, channel):
-        events = self.Receiver(channel, routing_key='worker.#')
-        events.process = self.on_event
-        return events.get_consumers(partial(kombu.Consumer, channel), channel)
-
-    def on_event(self, type, event):
-        if event['hostname'] != self.hostname:
-            print('Got event: %r %r' % (type, event))
-            self.update_state(type, event)
+        self.register_timer()
+        ev = self.Receiver(channel, routing_key='worker.#')
+        return [kombu.Consumer(channel,
+                    queues=[ev.queue],
+                    on_message=partial(self.on_message, ev.event_from_message),
+                    no_ack=True)]
+
+    def on_message(self, prepare, message):
+        hostname = (message.headers.get('hostname') or
+                    message.payload['hostname'])
+        if hostname != self.hostname:
+            type, event = prepare(message.payload)
+            group, _, subject = type.partition('-')
+            worker, created = self.update_state(subject, event)
+            if subject == 'offline':
+                try:
+                    self.on_node_leave(worker)
+                finally:
+                    self.state.workers.pop(worker.hostname, None)
+            elif created or subject == 'online':
+                self.on_node_join(worker)
 
 
 class Evloop(bootsteps.StartStopStep):

+ 3 - 0
celery/worker/heartbeat.py

@@ -9,6 +9,8 @@
 """
 from __future__ import absolute_import
 
+from celery.utils.sysinfo import load_average
+
 from .state import SOFTWARE_INFO, active_requests, total_count
 
 
@@ -36,6 +38,7 @@ class Heart(object):
         return self.eventer.send(event, freq=self.interval,
                                  active=len(active_requests),
                                  processed=sum(total_count.itervalues()),
+                                 loadavg=load_average(),
                                  **SOFTWARE_INFO)
 
     def start(self):

+ 1 - 1
celery/worker/state.py

@@ -41,7 +41,7 @@ reserved_requests = set()
 #: set of currently active :class:`~celery.worker.job.Request`'s.
 active_requests = set()
 
-#: count of tasks executed by the worker, sorted by type.
+#: count of tasks accepted by the worker, sorted by type.
 total_count = defaultdict(int)
 
 #: the list of currently revoked tasks.  Persistent if statedb set.