Explorar o código

Events now use a logical clock. Merged from gossip branch

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
99d8674e8e
Modificáronse 2 ficheiros con 65 adicións e 4 borrados
  1. 58 0
      celery/app/base.py
  2. 7 4
      celery/events/__init__.py

+ 58 - 0
celery/app/base.py

@@ -11,6 +11,7 @@ Application Base Class.
 import platform as _platform
 
 from copy import deepcopy
+from threading import Lock
 
 from kombu.utils import cached_property
 
@@ -20,6 +21,62 @@ from celery.utils import instantiate, lpmerge
 from celery.utils.functional import wraps
 
 
+class LamportClock(object):
+    """Lamports logical clock.
+
+    From Wikipedia:
+
+    "A Lamport logical clock is a monotonically incrementing software counter
+    maintained in each process.  It follows some simple rules:
+
+        * A process increments its counter before each event in that process;
+        * When a process sends a message, it includes its counter value with
+          the message;
+        * On receiving a message, the receiver process sets its counter to be
+          greater than the maximum of its own value and the received value
+          before it considers the message received.
+
+    Conceptually, this logical clock can be thought of as a clock that only
+    has meaning in relation to messages moving between processes.  When a
+    process receives a message, it resynchronizes its logical clock with
+    the sender.
+
+    .. seealso::
+
+        http://en.wikipedia.org/wiki/Lamport_timestamps
+        http://en.wikipedia.org/wiki/Lamport's_Distributed_
+            Mutual_Exclusion_Algorithm
+
+    *Usage*
+
+    When sending a message use :meth:`forward` to increment the clock,
+    when receiving a message use :meth:`adjust` to sync with
+    the timestamp of the incoming message.
+
+    """
+    #: The clocks current value.
+    value = 0
+
+    def __init__(self, initial_value=0):
+        self.value = initial_value
+        self._mutex = Lock()
+
+    def adjust(self, other):
+        self._mutex.acquire()
+        try:
+            self.value = max(self.value, other) + 1
+        finally:
+            self._mutex.release()
+
+    def forward(self):
+        self._mutex.acquire()
+        try:
+            self.value += 1
+        finally:
+            self._mutex.release()
+        return self.value
+
+
 class BaseApp(object):
     """Base class for apps."""
     SYSTEM = _platform.system()
@@ -46,6 +103,7 @@ class BaseApp(object):
         self.set_as_current = set_as_current
         self.accept_magic_kwargs = accept_magic_kwargs
         self.on_init()
+        self.clock = LamportClock()
 
     def on_init(self):
         """Called at the end of the constructor."""

+ 7 - 4
celery/events/__init__.py

@@ -15,9 +15,8 @@ event_exchange = Exchange("celeryev", type="topic")
 
 
 def create_event(type, fields):
-    std = {"type": type,
-           "timestamp": fields.get("timestamp") or time.time()}
-    return dict(fields, **std)
+    return dict(fields, type=type,
+                        timestamp=fields.get("timestamp") or time.time())
 
 
 def Event(type, **fields):
@@ -93,7 +92,8 @@ class EventDispatcher(object):
             return
 
         self._lock.acquire()
-        event = Event(type, hostname=self.hostname, **fields)
+        event = Event(type, hostname=self.hostname,
+                            clock=self.app.clock.forward(), **fields)
         try:
             try:
                 self.publisher.publish(event,
@@ -214,6 +214,9 @@ class EventReceiver(object):
 
     def _receive(self, body, message):
         type = body.pop("type").lower()
+        clock = body.get("clock")
+        if clock:
+            self.app.clock.adjust(clock)
         self.process(type, create_event(type, body))