浏览代码

Use a heapq to avoid calling all entries for every iteration

Ask Solem 11 年之前
父节点
当前提交
dc4634298f
共有 1 个文件被更改,包括 45 次插入28 次删除
  1. 45 28
      celery/beat.py

+ 45 - 28
celery/beat.py

@@ -9,12 +9,14 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
 import errno
 import errno
+import heapq
 import os
 import os
 import time
 import time
 import shelve
 import shelve
 import sys
 import sys
 import traceback
 import traceback
 
 
+from collections import namedtuple
 from threading import Event, Thread
 from threading import Event, Thread
 
 
 from billiard import Process, ensure_multiprocessing
 from billiard import Process, ensure_multiprocessing
@@ -34,6 +36,8 @@ from .utils.log import get_logger, iter_open_logger_fds
 __all__ = ['SchedulingError', 'ScheduleEntry', 'Scheduler',
 __all__ = ['SchedulingError', 'ScheduleEntry', 'Scheduler',
            'PersistentScheduler', 'Service', 'EmbeddedService']
            'PersistentScheduler', 'Service', 'EmbeddedService']
 
 
+event_t = namedtuple('event_t', ('time', 'priority', 'entry'))
+
 logger = get_logger(__name__)
 logger = get_logger(__name__)
 debug, info, error, warning = (logger.debug, logger.info,
 debug, info, error, warning = (logger.debug, logger.info,
                                logger.error, logger.warning)
                                logger.error, logger.warning)
@@ -173,6 +177,7 @@ class Scheduler(object):
                              or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
                              or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
                              or self.max_interval)
                              or self.max_interval)
         self.Publisher = Publisher or app.amqp.TaskProducer
         self.Publisher = Publisher or app.amqp.TaskProducer
+        self._heap = None
         if not lazy:
         if not lazy:
             self.setup_schedule()
             self.setup_schedule()
 
 
@@ -191,32 +196,45 @@ class Scheduler(object):
         is_due, next_time_to_run = entry.is_due()
         is_due, next_time_to_run = entry.is_due()
 
 
         if is_due:
         if is_due:
-            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
-            try:
-                result = self.apply_async(entry, publisher=publisher)
-            except Exception as exc:
-                error('Message Error: %s\n%s',
-                      exc, traceback.format_stack(), exc_info=True)
-            else:
-                debug('%s sent. id->%s', entry.task, result.id)
+            self.apply_entry(entry, producer=publisher, advance=True)
         return next_time_to_run
         return next_time_to_run
 
 
-    def tick(self):
+    def apply_entry(self, entry, producer=None):
+        info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
+        try:
+            result = self.apply_async(entry, producer=producer, advance=False)
+        except Exception as exc:
+            error('Message Error: %s\n%s',
+                  exc, traceback.format_stack(), exc_info=True)
+        else:
+            debug('%s sent. id->%s', entry.task, result.id)
+
+    def tick(self, event_t=event_t, min=min,
+             heappop=heapq.heappop, heappush=heapq.heappush):
         """Run a tick, that is one iteration of the scheduler.
         """Run a tick, that is one iteration of the scheduler.
 
 
         Executes all due tasks.
         Executes all due tasks.
 
 
         """
         """
-        remaining_times = []
-        try:
-            for entry in values(self.schedule):
-                next_time_to_run = self.maybe_due(entry, self.publisher)
-                if next_time_to_run:
-                    remaining_times.append(next_time_to_run)
-        except RuntimeError:
-            pass
-
-        return min(remaining_times + [self.max_interval])
+        H = self._heap
+        if H is None:
+            H = self._heap = [event_t(e.is_due()[1], 5, e)
+                              for e in values(self.schedule)]
+        print('HEAP: %r' % (H, ))
+        event = H[0]
+        entry = event[2]
+        is_due, next_time_to_run = entry.is_due()
+        if is_due:
+            verify = heappop(H)
+            if verify is event:
+                next_entry = self.reserve(entry)
+                self.apply_entry(entry, producer=self.publisher)
+                heappush(H, event_t(next_time_to_run, event[1], next_entry))
+                return 0
+            else:
+                heappush(H, verify)
+                return min(verify[0], self.max_interval)
+        return min(next_time_to_run, self.max_interval)
 
 
     def should_sync(self):
     def should_sync(self):
         return (not self._last_sync or
         return (not self._last_sync or
@@ -226,22 +244,22 @@ class Scheduler(object):
         new_entry = self.schedule[entry.name] = next(entry)
         new_entry = self.schedule[entry.name] = next(entry)
         return new_entry
         return new_entry
 
 
-    def apply_async(self, entry, publisher=None, **kwargs):
+    def apply_async(self, entry, producer=None, advance=True, **kwargs):
         # Update timestamps and run counts before we actually execute,
         # Update timestamps and run counts before we actually execute,
         # so we have that done if an exception is raised (doesn't schedule
         # so we have that done if an exception is raised (doesn't schedule
         # forever.)
         # forever.)
-        entry = self.reserve(entry)
+        entry = self.reserve(entry) if advance else entry
         task = self.app.tasks.get(entry.task)
         task = self.app.tasks.get(entry.task)
 
 
         try:
         try:
             if task:
             if task:
-                result = task.apply_async(entry.args, entry.kwargs,
-                                          publisher=publisher,
-                                          **entry.options)
-            else:
-                result = self.send_task(entry.task, entry.args, entry.kwargs,
-                                        publisher=publisher,
+                return task.apply_async(entry.args, entry.kwargs,
+                                        producer=producer,
                                         **entry.options)
                                         **entry.options)
+            else:
+                return self.send_task(entry.task, entry.args, entry.kwargs,
+                                      producer=producer,
+                                      **entry.options)
         except Exception as exc:
         except Exception as exc:
             reraise(SchedulingError, SchedulingError(
             reraise(SchedulingError, SchedulingError(
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
@@ -249,7 +267,6 @@ class Scheduler(object):
         finally:
         finally:
             if self.should_sync():
             if self.should_sync():
                 self._do_sync()
                 self._do_sync()
-        return result
 
 
     def send_task(self, *args, **kwargs):
     def send_task(self, *args, **kwargs):
         return self.app.send_task(*args, **kwargs)
         return self.app.send_task(*args, **kwargs)