Procházet zdrojové kódy

Tests passing for beat

Ask Solem před 11 roky
rodič
revize
b3cd3fcec4
2 změnil soubory, kde provedl 18 přidání a 24 odebrání
  1. 15 15
      celery/beat.py
  2. 3 9
      celery/tests/app/test_beat.py

+ 15 - 15
celery/beat.py

@@ -199,13 +199,6 @@ class Scheduler(object):
                     'options': {'expires': 12 * 3600}}
         self.update_from_dict(entries)
 
-    def maybe_due(self, entry, producer=None):
-        is_due, next_time_to_run = entry.is_due()
-
-        if is_due:
-            self.apply_entry(entry, producer=producer, advance=True)
-        return next_time_to_run
-
     def apply_entry(self, entry, producer=None):
         info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
         try:
@@ -216,20 +209,26 @@ class Scheduler(object):
         else:
             debug('%s sent. id->%s', entry.task, result.id)
 
+    def is_due(self, entry):
+        return entry.is_due()
+
     def tick(self, event_t=event_t, min=min,
-             heappop=heapq.heappop, heappush=heapq.heappush):
+             heappop=heapq.heappop, heappush=heapq.heappush,
+             heapify=heapq.heapify):
         """Run a tick, that is one iteration of the scheduler.
 
         Executes all due tasks.
 
         """
+        max_interval = self.max_interval
         H = self._heap
         if H is None:
-            H = self._heap = [event_t(e.is_due()[1], 5, e)
+            H = self._heap = [event_t(e.is_due()[1] or 0, 5, e)
                               for e in values(self.schedule)]
+            heapify(H)
         event = H[0]
         entry = event[2]
-        is_due, next_time_to_run = entry.is_due()
+        is_due, next_time_to_run = self.is_due(entry)
         if is_due:
             verify = heappop(H)
             if verify is event:
@@ -239,8 +238,8 @@ class Scheduler(object):
                 return 0
             else:
                 heappush(H, verify)
-                return min(verify[0], self.max_interval)
-        return min(next_time_to_run, self.max_interval)
+                return min(verify[0], max_interval)
+        return min(next_time_to_run or max_interval, max_interval)
 
     def should_sync(self):
         return (
@@ -477,9 +476,10 @@ class Service(object):
         try:
             while not self._is_shutdown.is_set():
                 interval = self.scheduler.tick()
-                debug('beat: Waking up %s.',
-                      humanize_seconds(interval, prefix='in '))
-                time.sleep(interval)
+                if interval:
+                    debug('beat: Waking up %s.',
+                        humanize_seconds(interval, prefix='in '))
+                    time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
             self._is_shutdown.set()
         finally:

+ 3 - 9
celery/tests/app/test_beat.py

@@ -123,7 +123,7 @@ class mSchedulerSchedulingError(mScheduler):
 
 class mSchedulerRuntimeError(mScheduler):
 
-    def maybe_due(self, *args, **kwargs):
+    def is_due(self, *args, **kwargs):
         raise RuntimeError('dict modified while itervalues')
 
 
@@ -273,22 +273,16 @@ class test_Scheduler(AppCase):
                       schedule=always_due,
                       args=(1, 2),
                       kwargs={'foo': 'bar'})
-        self.assertEqual(scheduler.tick(), 1)
+        self.assertEqual(scheduler.tick(), 0)
 
     @patch('celery.beat.error')
     def test_due_tick_SchedulingError(self, error):
         scheduler = mSchedulerSchedulingError(app=self.app)
         scheduler.add(name='test_due_tick_SchedulingError',
                       schedule=always_due)
-        self.assertEqual(scheduler.tick(), 1)
+        self.assertEqual(scheduler.tick(), 0)
         self.assertTrue(error.called)
 
-    def test_due_tick_RuntimeError(self):
-        scheduler = mSchedulerRuntimeError(app=self.app)
-        scheduler.add(name='test_due_tick_RuntimeError',
-                      schedule=always_due)
-        self.assertEqual(scheduler.tick(), scheduler.max_interval)
-
     def test_pending_tick(self):
         scheduler = mScheduler(app=self.app)
         scheduler.add(name='test_pending_tick',