Browse Source

Logical clock ordering tests

Ask Solem 11 years ago
parent
commit
c9d5d01fc1
2 changed files with 68 additions and 2 deletions
  1. 2 2
      celery/events/state.py
  2. 66 0
      celery/tests/events/test_state.py

+ 2 - 2
celery/events/state.py

@@ -272,8 +272,8 @@ class Task(AttributeDict):
         self.revoked = timestamp
         self.update(states.REVOKED, timestamp, fields)
 
-    def on_unknown_event(self, type, timestamp=None, **fields):
-        self.update(type.upper(), timestamp, fields)
+    def on_unknown_event(self, shortype, timestamp=None, **fields):
+        self.update(shortype.upper(), timestamp, fields)
 
     def info(self, fields=None, extra=[]):
         """Information about this task suitable for on-screen display."""

+ 66 - 0
celery/tests/events/test_state.py

@@ -2,6 +2,7 @@ from __future__ import absolute_import
 
 import pickle
 
+from random import shuffle
 from time import time
 from itertools import count
 from mock import Mock, patch
@@ -16,6 +17,7 @@ from celery.events.state import (
     HEARTBEAT_DRIFT_MAX,
     _lamportinfo
 )
+from celery.five import range
 from celery.utils import uuid
 from celery.tests.case import Case
 
@@ -26,6 +28,7 @@ class replay(object):
         self.state = state
         self.rewind()
         self.setup()
+        self.current_clock = 0
 
     def setup(self):
         pass
@@ -33,6 +36,7 @@ class replay(object):
     def next_event(self):
         ev = self.events[next(self.position)]
         ev['local_received'] = ev['timestamp']
+        self.current_clock = ev.get('clock') or self.current_clock + 1
         return ev
 
     def __iter__(self):
@@ -91,6 +95,47 @@ class ev_task_states(replay):
                   runtime=0.1234, hostname='utest1'),
         ]
 
+def QTEV(type, uuid, hostname, clock, timestamp=None):
+    """Quick task event."""
+    return Event('task-{0}'.format(type), uuid=uuid, hostname=hostname,
+            clock=clock, timestamp=timestamp or time())
+
+
+class ev_logical_clock_ordering(replay):
+
+    def __init__(self, state, offset=0, uids=None):
+        self.offset = offset or 0
+        self.uids = self.setuids(uids)
+        super(ev_logical_clock_ordering, self).__init__(state)
+
+    def setuids(self, uids):
+        uids = self.tA, self.tB, self.tC = uids or [uuid(), uuid(), uuid()]
+        return uids
+
+    def setup(self):
+        offset = self.offset
+        tA, tB, tC = self.uids
+        self.events = [
+            QTEV('received', tA, 'w1', clock=offset + 1),
+            QTEV('received', tB, 'w2', clock=offset + 1),
+            QTEV('started', tA, 'w1', clock=offset + 3),
+            QTEV('received', tC, 'w2', clock=offset + 3),
+            QTEV('started', tB, 'w2', clock=offset + 5),
+            QTEV('retried', tA, 'w1', clock=offset + 7),
+            QTEV('succeeded', tB, 'w2', clock=offset + 9),
+            QTEV('started', tC, 'w2', clock=offset + 10),
+            QTEV('received', tA, 'w3', clock=offset + 13),
+            QTEV('succeded', tC, 'w2', clock=offset + 12),
+            QTEV('started', tA, 'w3', clock=offset + 14),
+            QTEV('succeeded', tA, 'w3', clock=offset + 16),
+        ]
+
+    def rewind_with_offset(self, offset, uids=None):
+        self.offset = offset
+        self.uids = self.setuids(uids or self.uids)
+        self.setup()
+        self.rewind()
+
 
 class ev_snapshot(replay):
 
@@ -228,6 +273,27 @@ class test_State(Case):
     def test_pickleable(self):
         self.assertTrue(pickle.loads(pickle.dumps(State())))
 
+    def test_task_logical_clock_ordering(self):
+        state = State()
+        r = ev_logical_clock_ordering(state)
+        tA, tB, tC = r.uids
+        r.play()
+        now = list(state.tasks_by_time())
+        self.assertEqual(now[0][0], tA)
+        self.assertEqual(now[1][0], tC)
+        self.assertEqual(now[2][0], tB)
+        for _ in range(1000):
+            shuffle(r.uids)
+            tA, tB, tC = r.uids
+            r.rewind_with_offset(r.current_clock + 1, r.uids)
+            r.play()
+        print('tA={0} tb={1}, tC={2}'.format(*r.uids))
+        now = list(state.tasks_by_time())
+        print(now)
+        self.assertEqual(now[0][0], tA)
+        self.assertEqual(now[1][0], tC)
+        self.assertEqual(now[2][0], tB)
+
     def test_worker_online_offline(self):
         r = ev_worker_online_offline(State())
         next(r)