Browse Source

timer data now namedtuple

Ask Solem 11 years ago
parent
commit
a2be71511b
2 changed files with 22 additions and 18 deletions
  1. 11 8
      celery/utils/timer2.py
  2. 11 10
      celery/worker/control.py

+ 11 - 8
celery/utils/timer2.py

@@ -13,6 +13,7 @@ import os
 import sys
 import threading
 
+from collections import namedtuple
 from datetime import datetime
 from functools import wraps
 from itertools import count
@@ -39,6 +40,8 @@ logger = get_logger('timer2')
 
 __all__ = ['Entry', 'Schedule', 'Timer', 'to_timestamp']
 
+scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry'))
+
 
 class Entry(object):
     if not IS_PYPY:  # pragma: no cover
@@ -140,7 +143,7 @@ class Schedule(object):
         return self._enter(eta, priority, entry)
 
     def _enter(self, eta, priority, entry):
-        heapq.heappush(self._queue, (eta, priority, entry))
+        heapq.heappush(self._queue, scheduled(eta, priority, entry))
         return entry
 
     def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
@@ -186,20 +189,21 @@ class Schedule(object):
 
         while 1:
             if queue:
-                eta, priority, entry = verify = queue[0]
-                now = nowfun()
+                eventA = queue[0]
+                now, eta = nowfun(), eventA[0]
 
                 if now < eta:
                     yield min(eta - now, max_interval), None
                 else:
-                    event = pop(queue)
+                    eventB = pop(queue)
 
-                    if event is verify:
+                    if eventB is eventA:
+                        entry = eventA[2]
                         if not entry.cancelled:
                             yield None, entry
                         continue
                     else:
-                        push(queue, event)
+                        push(queue, eventB)
             else:
                 yield None, None
 
@@ -208,8 +212,7 @@ class Schedule(object):
         return not self._queue
 
     def clear(self):
-        self._queue[:] = []  # used because we can't replace the object
-                             # and the operation is atomic.
+        self._queue[:] = []  # atomic, without creating a new list.
 
     def info(self):
         return ({'eta': eta, 'priority': priority, 'item': item}

+ 11 - 10
celery/worker/control.py

@@ -21,6 +21,7 @@ from celery.utils import jsonify
 
 from . import state as worker_state
 from .state import revoked
+from .job import Request
 
 __all__ = ['Panel']
 DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
@@ -168,18 +169,18 @@ def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
 
 @Panel.register
 def dump_schedule(state, safe=False, **kwargs):
-    from celery.worker.job import Request
-    schedule = state.consumer.timer.schedule
-    if not schedule.queue:
-        return []
 
     def prepare_entries():
-        for entry in schedule.info():
-            item = entry['item']
-            if item.args and isinstance(item.args[0], Request):
-                yield {'eta': entry['eta'],
-                       'priority': entry['priority'],
-                       'request': item.args[0].info(safe=safe)}
+        for waiting in state.consumer.timer.schedule.queue:
+            try:
+                arg0 = waiting.entry.args[0]
+            except (IndexError, TypeError):
+                continue
+            else:
+                if isinstance(arg0, Request):
+                    yield {'eta': waiting.eta,
+                           'priority': waiting.priority,
+                           'request': arg0.info(safe=safe)}
     return list(prepare_entries())