Переглянути джерело

[events] State now orders events by logical clock

Ask Solem 12 роки тому
батько
коміт
d2d198a2bc
3 змінених файлів з 83 додано та 31 видалено
  1. 5 1
      celery/events/__init__.py
  2. 2 3
      celery/events/cursesmon.py
  3. 76 27
      celery/events/state.py

+ 5 - 1
celery/events/__init__.py

@@ -151,9 +151,12 @@ class EventDispatcher(object):
             if groups and group_from(type) not in groups:
                 return
 
+            clock = self.clock.forward()
+
+
             with self.mutex:
                 event = Event(type, hostname=self.hostname,
-                                    clock=self.clock.forward(),
+                                    clock=clock,
                                     utcoffset=utcoffset(),
                                     pid=self.pid, **fields)
                 try:
@@ -247,6 +250,7 @@ class EventReceiver(ConsumerMixin):
     def event_from_message(self, body, localize=True, now=time.time):
         type = body.get('type', '').lower()
         self.adjust_clock(body.get('clock') or 0)
+
         if localize:
             try:
                 offset, timestamp = _TZGETTER(body)

+ 2 - 3
celery/events/cursesmon.py

@@ -452,7 +452,7 @@ class CursesMonitor(object):  # pragma: no cover
 
     @property
     def tasks(self):
-        return self.state.tasks_by_timestamp()[:self.limit]
+        return list(self.state.tasks_by_time(limit=self.limit))
 
     @property
     def workers(self):
@@ -489,8 +489,7 @@ def capture_events(app, state, display):  # pragma: no cover
                 recv = app.events.Receiver(conn, handlers={'*': state.event})
                 display.resetscreen()
                 display.init_screen()
-                with recv.consumer():
-                    recv.drain_events(timeout=1, ignore_timeouts=True)
+                recv.capture()
             except conn.connection_errors + conn.channel_errors as exc:
                 print('Connection lost: {0!r}'.format(exc), file=sys.stderr)
 

+ 76 - 27
celery/events/state.py

@@ -21,6 +21,8 @@ from __future__ import absolute_import
 import threading
 
 from heapq import heappush
+from itertools import islice
+from operator import itemgetter
 from time import time
 
 from kombu.utils import kwdict
@@ -52,6 +54,40 @@ def heartbeat_expires(timestamp, freq=60,
     return timestamp + freq * (expire_window / 1e2)
 
 
+class _lamportinfo(tuple):
+    __slots__ = ()
+
+    def __new__(cls, clock, timestamp, id, obj):
+        return tuple.__new__(cls, (clock, timestamp, id, obj))
+
+    def __repr__(self):
+        return '_lamport(clock={0}, timestamp={1}, id={2} {3!r}'.format(*self)
+
+    def __getnewargs__(self):
+        return tuple(self)
+
+    def __lt__(self, other):
+        # 0: clock 1: timestamp 3: process id
+        try:
+            A, B = self[0], other[0]
+            # uses logical clock value first
+            if A and B:  # use logical clock if available
+                if A == B:  # equal clocks use lower process id
+                    return self[3] < other[3]
+                return A < B
+            return self[1] < other[1]  # ... or use timestamp
+        except IndexError:
+            return NotImplemented
+    __gt__ = lambda self, other: other < self
+    __le__ = lambda self, other: not other < self
+    __ge__ = lambda self, other: not self < other
+
+    clock = property(itemgetter(0))
+    timestamp = property(itemgetter(1))
+    id = property(itemgetter(2))
+    obj = property(itemgetter(3))
+
+
 class Element(AttributeDict):
     """Base class for worker state elements."""
 
@@ -60,6 +96,7 @@ class Worker(Element):
     """Worker State."""
     heartbeat_max = 4
     expire_window = HEARTBEAT_EXPIRE_WINDOW
+    pid = None
 
     def __init__(self, **fields):
         fields.setdefault('freq', 60)
@@ -88,7 +125,7 @@ class Worker(Element):
         if drift > HEARTBEAT_DRIFT_MAX:
             warn(DRIFT_WARNING, self.hostname, drift)
         heartbeats, hbmax = self.heartbeats, self.heartbeat_max
-        if received and received > heartbeats[-1]:
+        if not heartbeats or (received and received > heartbeats[-1]):
             heappush(heartbeats, received)
             if len(heartbeats) > hbmax:
                 heartbeats[:] = heartbeats[hbmax:]
@@ -109,6 +146,10 @@ class Worker(Element):
     def alive(self):
         return bool(self.heartbeats and time() < self.heartbeat_expires)
 
+    @property
+    def id(self):
+        return '{0.hostname}.{0.pid}'.format(self)
+
 
 class Task(Element):
     """Task State."""
@@ -136,7 +177,8 @@ class Task(Element):
                      revoked=False, args=None, kwargs=None, eta=None,
                      expires=None, retries=None, worker=None, result=None,
                      exception=None, timestamp=None, runtime=None,
-                     traceback=None, exchange=None, routing_key=None)
+                     traceback=None, exchange=None, routing_key=None,
+                     clock=0)
 
     def __init__(self, **fields):
         super(Task, self).__init__(**dict(self._defaults, **fields))
@@ -150,7 +192,7 @@ class Task(Element):
 
         """
         if self.worker:
-            self.worker.on_heartbeat(timestamp=timestamp)
+            self.worker.update_heartbeat(fields['local_received'], timestamp)
         if state != states.RETRY and self.state != states.RETRY and \
                 states.state(state) < states.state(self.state):
             # this state logically happens-before the current state, so merge.
@@ -232,8 +274,11 @@ class State(object):
 
     def __init__(self, callback=None,
             max_workers_in_memory=5000, max_tasks_in_memory=10000):
-        self.workers = LRUCache(limit=max_workers_in_memory)
-        self.tasks = LRUCache(limit=max_tasks_in_memory)
+        self.max_workers_in_memory = max_workers_in_memory
+        self.max_tasks_in_memory = 10000
+        self.workers = LRUCache(limit=self.max_workers_in_memory)
+        self.tasks = LRUCache(limit=self.max_tasks_in_memory)
+        self._taskheap = []
         self.event_callback = callback
         self.group_handlers = {
             'worker': self.worker_event,
@@ -262,6 +307,7 @@ class State(object):
             self.tasks.update(in_progress)
         else:
             self.tasks.clear()
+        self._taskheap[:] = []
 
     def _clear(self, ready=True):
         self.workers.clear()
@@ -313,8 +359,18 @@ class State(object):
         uuid = fields.pop('uuid')
         hostname = fields.pop('hostname')
         worker, _ = self.get_or_create_worker(hostname)
-        worker.update_heartbeat(fields['local_received'], fields['timestamp'])
         task, created = self.get_or_create_task(uuid)
+        task.worker = worker
+
+        clock = 0 if type == 'sent' else fields.get('clock')
+
+        taskheap = self._taskheap
+        timestamp = fields['timestamp']
+        heappush(taskheap, _lamportinfo(clock, timestamp, worker.id, task))
+        curcount = len(self.tasks)
+        if len(taskheap) > self.max_tasks_in_memory * 2:
+            taskheap[:] = taskheap[curcount:]
+
         handler = getattr(task, 'on_' + type, None)
         if type == 'received':
             self.task_count += 1
@@ -322,7 +378,6 @@ class State(object):
             handler(**fields)
         else:
             task.on_unknown_event(type, **fields)
-        task.worker = worker
         return created
 
     def event(self, event):
@@ -343,18 +398,14 @@ class State(object):
             if limit and index + 1 >= limit:
                 break
 
-    def tasks_by_timestamp(self, limit=None):
-        """Get tasks by timestamp.
-
-        Returns a list of `(uuid, task)` tuples.
-
-        """
-        return self._sort_tasks_by_time(self.itertasks(limit))
-
-    def _sort_tasks_by_time(self, tasks):
-        """Sort task items by time."""
-        return sorted(tasks, key=lambda t: t[1].timestamp,
-                      reverse=True)
+    def tasks_by_time(self, limit=None):
+        seen = set()
+        for evtup in islice(reversed(self._taskheap), 0, limit):
+            uuid = evtup[3].uuid
+            if uuid not in seen:
+                yield uuid, evtup[3]
+                seen.add(uuid)
+    tasks_by_timestamp = tasks_by_time
 
     def tasks_by_type(self, name, limit=None):
         """Get all tasks by type.
@@ -362,11 +413,9 @@ class State(object):
         Returns a list of `(uuid, task)` tuples.
 
         """
-        sorted_tasks = self._sort_tasks_by_time((uuid, task)
-                for uuid, task in self.tasks.iteritems()
-                    if task.name == name)
-
-        return sorted_tasks[0:limit or None]
+        return islice(((tup[3].uuid, tup[3])
+                            for tup in self._taskheap
+                                if tup[3].name == name), 0, limit)
 
     def tasks_by_worker(self, hostname, limit=None):
         """Get all tasks by worker.
@@ -374,9 +423,9 @@ class State(object):
         Returns a list of `(uuid, task)` tuples.
 
         """
-        return self._sort_tasks_by_time((uuid, task)
-                for uuid, task in self.itertasks(limit)
-                    if task.worker.hostname == hostname)
+        return islice(((tup[3].uuid, tup[3])
+                        for tup in self._taskheap
+                            if tup[3].worker.hostname == hostname), 0, limit)
 
     def task_types(self):
         """Returns a list of all seen task types."""