Browse Source

Fixes after timer changes (requires kombu master)

Ask Solem 11 years ago
parent
commit
dd6ff8fd6a

+ 2 - 3
celery/concurrency/processes.py

@@ -347,12 +347,11 @@ class AsynPool(_pool.Pool):
         def on_timeout_set(R, soft, hard):
             if soft:
                 trefs[R._job] = call_later(
-                    soft * 1000.0, self._on_soft_timeout,
-                    R._job, soft, hard, hub,
+                    soft, self._on_soft_timeout, R._job, soft, hard, hub,
                 )
             elif hard:
                 trefs[R._job] = call_later(
-                    hard * 1000.0, self._on_hard_timeout, R._job,
+                    hard, self._on_hard_timeout, R._job,
                 )
         self.on_timeout_set = on_timeout_set
 

+ 3 - 2
celery/contrib/batches.py

@@ -203,8 +203,9 @@ class Batches(Task):
             put_buffer(request)
 
             if self._tref is None:     # first request starts flush timer.
-                self._tref = timer.apply_interval(self.flush_interval * 1000.0,
-                                                  flush_buffer)
+                self._tref = timer.call_repeatedly(
+                    self.flush_interval, flush_buffer,
+                )
 
             if not next(self._count) % self.flush_every:
                 flush_buffer()

+ 4 - 4
celery/events/snapshot.py

@@ -47,10 +47,10 @@ class Polaroid(object):
         self.maxrate = maxrate and TokenBucket(rate(maxrate))
 
     def install(self):
-        self._tref = self.timer.apply_interval(self.freq * 1000.0,
-                                               self.capture)
-        self._ctref = self.timer.apply_interval(self.cleanup_freq * 1000.0,
-                                                self.cleanup)
+        self._tref = self.timer.call_repeatedly(self.freq, self.capture)
+        self._ctref = self.timer.call_repeatedly(
+            self.cleanup_freq, self.cleanup,
+        )
 
     def on_shutter(self, state):
         pass

+ 5 - 0
celery/tests/case.py

@@ -15,6 +15,7 @@ import os
 import platform
 import re
 import sys
+import threading
 import time
 import warnings
 
@@ -309,6 +310,7 @@ class AppCase(Case):
         return UnitApp(*args, **kwargs)
 
     def setUp(self):
+        self._threads_at_setup = list(threading.enumerate())
         from celery import _state
         self._current_app = current_app()
         self._default_app = _state.default_app
@@ -353,6 +355,9 @@ class AppCase(Case):
         if self.app is not self._current_app:
             self.app.close()
         self.app = None
+        self.assertEqual(
+            self._threads_at_setup, list(threading.enumerate()),
+        )
 
     def _get_test_name(self):
         return '.'.join([self.__class__.__name__, self._testMethodName])

+ 1 - 1
celery/tests/events/test_snapshot.py

@@ -21,7 +21,7 @@ class TRef(object):
 class MockTimer(object):
     installed = []
 
-    def apply_interval(self, msecs, fun, *args, **kwargs):
+    def call_repeatedly(self, secs, fun, *args, **kwargs):
         self.installed.append(fun)
         return TRef()
 timer = MockTimer()

+ 10 - 11
celery/tests/utils/test_timer2.py

@@ -46,7 +46,6 @@ class test_Schedule(Case):
 
     def test_handle_error(self):
         from datetime import datetime
-        to_timestamp = timer2.to_timestamp
         scratch = [None]
 
         def on_error(exc_info):
@@ -78,7 +77,7 @@ class test_Timer(Case):
             def set_done():
                 done[0] = True
 
-            t.apply_after(300, set_done)
+            t.call_after(0.3, set_done)
             mss = 0
             while not done[0]:
                 if mss >= 2.0:
@@ -90,9 +89,9 @@ class test_Timer(Case):
 
     def test_exit_after(self):
         t = timer2.Timer()
-        t.apply_after = Mock()
-        t.exit_after(300, priority=10)
-        t.apply_after.assert_called_with(300, sys.exit, 10)
+        t.call_after = Mock()
+        t.exit_after(0.3, priority=10)
+        t.call_after.assert_called_with(0.3, sys.exit, 10)
 
     def test_ensure_started_not_started(self):
         t = timer2.Timer()
@@ -101,25 +100,25 @@ class test_Timer(Case):
         t.ensure_started()
         self.assertFalse(t.start.called)
 
-    def test_apply_interval(self):
+    def test_call_repeatedly(self):
         t = timer2.Timer()
         try:
             t.schedule.enter_after = Mock()
 
             myfun = Mock()
             myfun.__name__ = 'myfun'
-            t.apply_interval(30, myfun)
+            t.call_repeatedly(0.03, myfun)
 
             self.assertEqual(t.schedule.enter_after.call_count, 1)
             args1, _ = t.schedule.enter_after.call_args_list[0]
-            msec1, tref1, _ = args1
-            self.assertEqual(msec1, 30)
+            sec1, tref1, _ = args1
+            self.assertEqual(sec1, 0.03)
             tref1()
 
             self.assertEqual(t.schedule.enter_after.call_count, 2)
             args2, _ = t.schedule.enter_after.call_args_list[1]
-            msec2, tref2, _ = args2
-            self.assertEqual(msec2, 30)
+            sec2, tref2, _ = args2
+            self.assertEqual(sec2, 0.03)
             tref2.cancelled = True
             tref2()
 

+ 3 - 5
celery/tests/worker/test_consumer.py

@@ -88,8 +88,8 @@ class test_Consumer(AppCase):
             bucket.expected_time.return_value = 3.33
             c._limit_task(request, bucket, 4)
             bucket.can_consume.assert_called_with(4)
-            c.timer.apply_after.assert_called_with(
-                3.33 * 1000.0, c._limit_task, (request, bucket, 4),
+            c.timer.call_after.assert_called_with(
+                3.33, c._limit_task, (request, bucket, 4),
             )
             bucket.expected_time.assert_called_with(4)
             self.assertFalse(reserved.called)
@@ -397,9 +397,7 @@ class test_Gossip(AppCase):
         c = self.Consumer()
         g = Gossip(c)
         g.register_timer()
-        c.timer.apply_interval.assert_called_with(
-            g.interval * 1000.0, g.periodic,
-        )
+        c.timer.call_repeatedly.assert_called_with(g.interval, g.periodic)
         tref = g._tref
         g.register_timer()
         tref.cancel.assert_called_with()

+ 2 - 2
celery/tests/worker/test_heartbeat.py

@@ -31,7 +31,7 @@ class MockDispatcherRaising(object):
 
 class MockTimer(object):
 
-    def apply_interval(self, msecs, fun, args=(), kwargs={}):
+    def call_repeatedly(self, secs, fun, args=(), kwargs={}):
 
         class entry(tuple):
             cancelled = False
@@ -39,7 +39,7 @@ class MockTimer(object):
             def cancel(self):
                 self.cancelled = True
 
-        return entry((msecs, fun, args, kwargs))
+        return entry((secs, fun, args, kwargs))
 
     def cancel(self, entry):
         entry.cancel()

+ 2 - 2
celery/tests/worker/test_strategy.py

@@ -44,7 +44,7 @@ class test_default_strategy(AppCase):
         def was_scheduled(self):
             assert not self.was_reserved()
             assert not self.was_rate_limited()
-            return self.consumer.timer.apply_at.called
+            return self.consumer.timer.call_at.called
 
         def event_sent(self):
             return self.consumer.event_dispatcher.send.call_args
@@ -55,7 +55,7 @@ class test_default_strategy(AppCase):
             if self.was_rate_limited():
                 return self.consumer._limit_task.call_args[0][0]
             if self.was_scheduled():
-                return self.consumer.timer.apply_at.call_args[0][0]
+                return self.consumer.timer.call_at.call_args[0][0]
             raise ValueError('request not handled')
 
     @contextmanager

+ 12 - 8
celery/utils/timer2.py

@@ -8,7 +8,6 @@
 """
 from __future__ import absolute_import
 
-import heapq
 import os
 import sys
 import threading
@@ -17,7 +16,6 @@ from itertools import count
 from time import sleep
 
 from celery.five import THREAD_TIMEOUT_MAX
-from celery.utils.timeutils import timedelta_seconds, timezone
 from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
 
 TIMER_DEBUG = os.environ.get('TIMER_DEBUG')
@@ -110,20 +108,20 @@ class Timer(threading.Thread):
     def enter(self, entry, eta, priority=None):
         return self._do_enter('enter_at', entry, eta, priority=priority)
 
-    def apply_at(self, *args, **kwargs):
+    def call_at(self, *args, **kwargs):
         return self._do_enter('call_at', *args, **kwargs)
 
     def enter_after(self, *args, **kwargs):
         return self._do_enter('enter_after', *args, **kwargs)
 
-    def apply_after(self, *args, **kwargs):
+    def call_after(self, *args, **kwargs):
         return self._do_enter('call_after', *args, **kwargs)
 
-    def apply_interval(self, *args, **kwargs):
+    def call_repeatedly(self, *args, **kwargs):
         return self._do_enter('call_repeatedly', *args, **kwargs)
 
-    def exit_after(self, msecs, priority=10):
-        self.apply_after(msecs, sys.exit, priority)
+    def exit_after(self, secs, priority=10):
+        self.call_after(secs, sys.exit, priority)
 
     def cancel(self, tref):
         tref.cancel()
@@ -132,7 +130,13 @@ class Timer(threading.Thread):
         self.schedule.clear()
 
     def empty(self):
-        return not len(self.schedule)
+        return not len(self)
+
+    def __len__(self):
+        return len(self.schedule)
+
+    def __nonzero__(self):
+        return True
 
     @property
     def queue(self):

+ 3 - 5
celery/worker/consumer.py

@@ -226,8 +226,8 @@ class Consumer(object):
     def _limit_task(self, request, bucket, tokens):
         if not bucket.can_consume(tokens):
             hold = bucket.expected_time(tokens)
-            self.timer.apply_after(
-                hold * 1000.0, self._limit_task, (request, bucket, tokens),
+            self.timer.call_after(
+                hold, self._limit_task, (request, bucket, tokens),
             )
         else:
             task_reserved(request)
@@ -644,9 +644,7 @@ class Gossip(bootsteps.ConsumerStep):
     def register_timer(self):
         if self._tref is not None:
             self._tref.cancel()
-        self._tref = self.timer.apply_interval(
-            self.interval * 1000.0, self.periodic,
-        )
+        self._tref = self.timer.call_repeatedly(self.interval, self.periodic)
 
     def periodic(self):
         workers = self.state.workers

+ 2 - 2
celery/worker/heartbeat.py

@@ -47,8 +47,8 @@ class Heart(object):
     def start(self):
         if self.eventer.enabled:
             self._send('worker-online')
-            self.tref = self.timer.apply_interval(
-                self.interval * 1000.0, self._send, ('worker-heartbeat', ),
+            self.tref = self.timer.call_repeatedly(
+                self.interval, self._send, ('worker-heartbeat', ),
             )
 
     def stop(self):

+ 2 - 4
celery/worker/strategy.py

@@ -34,7 +34,7 @@ def default(task, app, consumer,
     _does_info = logger.isEnabledFor(logging.INFO)
     events = eventer and eventer.enabled
     send_event = eventer.send
-    timer_apply_at = consumer.timer.apply_at
+    call_at = consumer.timer.call_at
     apply_eta_task = consumer.apply_eta_task
     rate_limits_enabled = not consumer.disable_rate_limits
     bucket = consumer.task_buckets[task.name]
@@ -74,9 +74,7 @@ def default(task, app, consumer,
                 req.acknowledge()
             else:
                 consumer.qos.increment_eventually()
-                timer_apply_at(
-                    eta, apply_eta_task, (req, ), priority=6,
-                )
+                call_at(eta, apply_eta_task, (req, ), priority=6)
         else:
             if rate_limits_enabled:
                 if bucket:

+ 9 - 10
docs/userguide/extending.rst

@@ -152,7 +152,7 @@ Attributes
 
 .. attribute:: timer
 
-    :class:`Timer <celery.utils.timer2.Schedule` used to schedule functions.
+    :class:`~kombu.async.timer.Timer` used to schedule functions.
 
     Your bootstep must require the Timer bootstep to use this.
 
@@ -230,8 +230,8 @@ Another example could use the timer to wake up at regular intervals:
 
         def start(self, worker):
             # run every 30 seconds.
-            self.tref = worker.timer.apply_interval(
-                30000.0, self.detect, (worker, ),
+            self.tref = worker.timer.call_repeatedly(
+                30.0, self.detect, (worker, ), priority=10,
             )
 
         def stop(self, worker):
@@ -669,12 +669,11 @@ will take some time so other transports still use a threading-based solution.
 Timer - Scheduling events
 -------------------------
 
-.. method:: timer.apply_after(msecs, callback, args=(), kwargs=(),
-                              priority=0)
-
+.. method:: timer.call_after(secs, callback, args=(), kwargs=(),
+                             priority=0)
 
-.. method:: timer.apply_interval(msecs, callback, args=(), kwargs=(),
-                                priority=0)
+.. method:: timer.call_repeatedly(secs, callback, args=(), kwargs=(),
+                                  priority=0)
 
-.. method:: timer.apply_at(eta, callback, args=(), kwargs=(),
-                           priority=0)
+.. method:: timer.call_at(eta, callback, args=(), kwargs=(),
+                          priority=0)