Browse Source

Must keep maxtasks * 4 events in heap

Ask Solem 11 years ago
parent
commit
19b9d01ea3
2 changed files with 8 additions and 2 deletions
  1. 7 2
      celery/events/state.py
  2. 1 0
      celery/tests/events/test_state.py

+ 7 - 2
celery/events/state.py

@@ -366,6 +366,7 @@ class State(object):
     Task = Task
     event_count = 0
     task_count = 0
+    heap_multiplier = 4
 
     def __init__(self, callback=None,
                  workers=None, tasks=None, taskheap=None,
@@ -463,7 +464,11 @@ class State(object):
         tfields = itemgetter('uuid', 'hostname', 'timestamp',
                              'local_received', 'clock')
         taskheap = self._taskheap
-        maxtasks = self.max_tasks_in_memory * 2
+        # Removing events from task heap is an O(n) operation,
+        # so easier to just account for the common number of events
+        # for each task (PENDING->RECEIVED->STARTED->final)
+        #: an O(n) operation
+        max_events_in_heap = self.max_tasks_in_memory * self.heap_multiplier
         add_type = self._seen_types.add
         tasks, Task = self.tasks, self.Task
         workers, Worker = self.workers, self.Worker
@@ -517,7 +522,7 @@ class State(object):
                 origin = hostname if is_client_event else worker.id
                 heappush(taskheap,
                          timetuple(clock, timestamp, origin, ref(task)))
-                if len(taskheap) > maxtasks:
+                if len(taskheap) > max_events_in_heap:
                     heappop(taskheap)
                 if subject == 'received':
                     self.task_count += 1

+ 1 - 0
celery/tests/events/test_state.py

@@ -483,6 +483,7 @@ class test_State(AppCase):
 
     def test_limits_maxtasks(self):
         s = State(max_tasks_in_memory=1)
+        s.heap_multiplier = 2
         s.event({
             'type': 'task-unknown-event-xxx',
             'foo': 'bar',