소스 검색

Merge pull request #2384 from willtrking/master

Add switch to reverse order of tasks_by_time, with test
Ask Solem Hoel 10 년 전
부모
커밋
954e4c3047
2개의 변경된 파일29개의 추가작업 그리고 6개의 파일을 삭제
  1. 10 6
      celery/events/state.py
  2. 19 0
      celery/tests/events/test_state.py

+ 10 - 6
celery/events/state.py

@@ -601,11 +601,15 @@ class State(object):
             if limit and index + 1 >= limit:
                 break
 
-    def tasks_by_time(self, limit=None):
+    def tasks_by_time(self, limit=None, reverse=True):
         """Generator giving tasks ordered by time,
         in ``(uuid, Task)`` tuples."""
+        _heap = self._taskheap
+        if reverse:
+            _heap = reversed(_heap)
+
         seen = set()
-        for evtup in islice(reversed(self._taskheap), 0, limit):
+        for evtup in islice(_heap, 0, limit):
             task = evtup[3]()
             if task is not None:
                 uuid = task.uuid
@@ -614,24 +618,24 @@ class State(object):
                     seen.add(uuid)
     tasks_by_timestamp = tasks_by_time
 
-    def tasks_by_type(self, name, limit=None):
+    def tasks_by_type(self, name, limit=None, reverse=True):
         """Get all tasks by type.
 
         Return a list of ``(uuid, Task)`` tuples.
 
         """
         return islice(
-            ((uuid, task) for uuid, task in self.tasks_by_time()
+            ((uuid, task) for uuid, task in self.tasks_by_time(reverse=reverse)
              if task.name == name),
             0, limit,
         )
 
-    def tasks_by_worker(self, hostname, limit=None):
+    def tasks_by_worker(self, hostname, limit=None, reverse=True):
         """Get all tasks by worker.
 
         """
         return islice(
-            ((uuid, task) for uuid, task in self.tasks_by_time()
+            ((uuid, task) for uuid, task in self.tasks_by_time(reverse=reverse)
              if task.worker.hostname == hostname),
             0, limit,
         )

+ 19 - 0
celery/tests/events/test_state.py

@@ -318,6 +318,25 @@ class test_State(AppCase):
         self.assertEqual(now[1][0], tC)
         self.assertEqual(now[2][0], tB)
 
+    def test_task_descending_clock_ordering(self):
+        state = State()
+        r = ev_logical_clock_ordering(state)
+        tA, tB, tC = r.uids
+        r.play()
+        now = list(state.tasks_by_time(reverse=False))
+        self.assertEqual(now[0][0], tB)
+        self.assertEqual(now[1][0], tC)
+        self.assertEqual(now[2][0], tA)
+        for _ in range(1000):
+            shuffle(r.uids)
+            tA, tB, tC = r.uids
+            r.rewind_with_offset(r.current_clock + 1, r.uids)
+            r.play()
+        now = list(state.tasks_by_time(reverse=False))
+        self.assertEqual(now[0][0], tB)
+        self.assertEqual(now[1][0], tC)
+        self.assertEqual(now[2][0], tA)
+
     def test_worker_online_offline(self):
         r = ev_worker_online_offline(State())
         next(r)