ソースを参照

eventlet/gevent: Use kombu.async.timer directly

Ask Solem 11 年 前
コミット
75e5acca94
2 ファイル変更11 行追加36 行削除
  1. 8 20
      celery/concurrency/eventlet.py
  2. 3 16
      celery/concurrency/gevent.py

+ 8 - 20
celery/concurrency/eventlet.py

@@ -28,9 +28,10 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             import warnings
             warnings.warn(RuntimeWarning(W_RACE % side))
 
+from kombu.async import timer as _timer
+
 
 from celery import signals
-from celery.utils import timer2
 
 from . import base
 
@@ -41,12 +42,12 @@ def apply_target(target, args=(), kwargs={}, callback=None,
                              pid=getpid())
 
 
-class Schedule(timer2.Schedule):
+class Timer(_timer.Timer):
 
     def __init__(self, *args, **kwargs):
         from eventlet.greenthread import spawn_after
         from greenlet import GreenletExit
-        super(Schedule, self).__init__(*args, **kwargs)
+        super(Timer, self).__init__(*args, **kwargs)
 
         self.GreenletExit = GreenletExit
         self._spawn_after = spawn_after
@@ -81,28 +82,15 @@ class Schedule(timer2.Schedule):
             except (KeyError, self.GreenletExit):
                 pass
 
-    @property
-    def queue(self):
-        return self._queue
-
-
-class Timer(timer2.Timer):
-    Schedule = Schedule
-
-    def ensure_started(self):
-        pass
-
-    def stop(self):
-        self.schedule.clear()
-
     def cancel(self, tref):
         try:
             tref.cancel()
-        except self.schedule.GreenletExit:
+        except self.GreenletExit:
             pass
 
-    def start(self):
-        pass
+    @property
+    def queue(self):
+        return self._queue
 
 
 class TaskPool(base.BasePool):

+ 3 - 16
celery/concurrency/gevent.py

@@ -15,7 +15,7 @@ try:
 except ImportError:  # pragma: no cover
     Timeout = None  # noqa
 
-from celery.utils import timer2
+from kombu.async import timer as _timer
 
 from .base import apply_target, BasePool
 
@@ -35,7 +35,7 @@ def apply_timeout(target, args=(), kwargs={}, callback=None,
         return timeout_callback(False, timeout)
 
 
-class Schedule(timer2.Schedule):
+class Timer(_timer.Timer):
 
     def __init__(self, *args, **kwargs):
         from gevent.greenlet import Greenlet, GreenletExit
@@ -45,7 +45,7 @@ class Schedule(timer2.Schedule):
 
         self._Greenlet = _Greenlet
         self._GreenletExit = GreenletExit
-        super(Schedule, self).__init__(*args, **kwargs)
+        super(Timer, self).__init__(*args, **kwargs)
         self._queue = set()
 
     def _enter(self, eta, priority, entry):
@@ -78,19 +78,6 @@ class Schedule(timer2.Schedule):
         return self._queue
 
 
-class Timer(timer2.Timer):
-    Schedule = Schedule
-
-    def ensure_started(self):
-        pass
-
-    def stop(self):
-        self.schedule.clear()
-
-    def start(self):
-        pass
-
-
 class TaskPool(BasePool):
     Timer = Timer