Bladeren bron

ETA tasks now working with evenlet using spawn_after_local instead of timer2.Timer

Ask Solem 14 jaren geleden
bovenliggende
commit
e6451a6117
5 gewijzigde bestanden met toevoegingen van 85 en 6 verwijderingen
  1. 1 1
      celery/app/defaults.py
  2. 3 0
      celery/concurrency/base.py
  3. 74 1
      celery/concurrency/evlet.py
  4. 2 1
      celery/utils/timer2.py
  5. 5 3
      celery/worker/__init__.py

+ 1 - 1
celery/app/defaults.py

@@ -97,7 +97,7 @@ NAMESPACES = {
     "CELERYD": {
         "AUTOSCALER": Option("celery.worker.controllers.Autoscaler"),
         "CONCURRENCY": Option(0, type="int"),
-        "ETA_SCHEDULER": Option("celery.utils.timer2.Timer"),
+        "ETA_SCHEDULER": Option(None, type="str"),
         "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
         "FORCE_HIJACK_ROOT_LOGGER": Option(False, type="bool"),
         "CONSUMER": Option("celery.worker.consumer.Consumer"),

+ 3 - 0
celery/concurrency/base.py

@@ -4,6 +4,7 @@ import traceback
 from celery import log
 from celery.datastructures import ExceptionInfo
 from celery.utils.functional import partial
+from celery.utils import timer2
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
@@ -18,6 +19,8 @@ class BasePool(object):
     CLOSE = 0x2
     TERMINATE = 0x3
 
+    Timer = timer2.Timer
+
     signal_safe = True
 
     _state = None

+ 74 - 1
celery/concurrency/evlet.py

@@ -1,11 +1,82 @@
+import sys
+
+from time import time
+
 from eventlet import GreenPool
-from eventlet import spawn
+from eventlet.greenthread import spawn, spawn_after_local
+from greenlet import GreenletExit
 
 from celery.concurrency.base import apply_target, BasePool
+from celery.utils import timer2
+
+
+class Schedule(timer2.Schedule):
+
+    def __init__(self, *args, **kwargs):
+        super(Schedule, self).__init__(*args, **kwargs)
+        self._queue = set()
+
+    def enter(self, entry, eta=None, priority=0):
+        try:
+            timer2.to_timestamp(eta)
+        except OverflowError:
+            if not self.handle_error(sys.exc_info()):
+                raise
+
+        now = time()
+        if eta is None:
+            eta = now
+        secs = eta - now
+
+        g = spawn_after_local(secs, entry)
+        self._queue.add(g)
+        g.link(self._entry_exit, entry)
+        g.entry = entry
+        g.eta = eta
+        g.priority = priority
+        g.cancelled = False
+
+        return g
+
+    def _entry_exit(self, g, entry):
+        try:
+            try:
+                g.wait()
+            except GreenletExit:
+                entry.cancel()
+                g.cancelled = True
+        finally:
+            self._queue.discard(g)
+
+    def clear(self):
+        queue = self._queue
+        while queue:
+            try:
+                queue.pop().cancel()
+            except KeyError:
+                pass
+
+    @property
+    def queue(self):
+        return [(g.eta, g.priority, g.entry) for g in self._queue]
+
+
+class Timer(timer2.Timer):
+    Schedule = Schedule
+
+    def ensure_started(self):
+        pass
+
+    def stop(self):
+        self.schedule.clear()
+
+    def start(self):
+        pass
 
 
 class TaskPool(BasePool):
     Pool = GreenPool
+    Timer = Timer
 
     signal_safe = False
 
@@ -31,3 +102,5 @@ class TaskPool(BasePool):
         eventlet.monkey_patch()
         eventlet.debug.hub_prevent_multiple_readers(False)
 TaskPool.on_import()
+
+

+ 2 - 1
celery/utils/timer2.py

@@ -131,13 +131,14 @@ class Schedule(object):
 
 class Timer(Thread):
     Entry = Entry
+    Schedule = Schedule
 
     running = False
     on_tick = None
     _timer_count = count(1).next
 
     def __init__(self, schedule=None, on_error=None, on_tick=None, **kwargs):
-        self.schedule = schedule or Schedule(on_error=on_error)
+        self.schedule = schedule or self.Schedule(on_error=on_error)
         self.on_tick = on_tick or self.on_tick
 
         Thread.__init__(self)

+ 5 - 3
celery/worker/__init__.py

@@ -82,9 +82,6 @@ class WorkController(object):
     #: processing.
     ready_queue = None
 
-    #: Instance of :class:`celery.worker.controllers.ScheduleController`.
-    schedule_controller = None
-
     #: Instance of :class:`celery.worker.controllers.Mediator`.
     mediator = None
 
@@ -122,6 +119,7 @@ class WorkController(object):
         self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
         self.eta_scheduler_cls = eta_scheduler_cls or \
                                     conf.CELERYD_ETA_SCHEDULER
+
         self.autoscaler_cls = autoscaler_cls or \
                                     conf.CELERYD_AUTOSCALER
         self.schedule_filename = schedule_filename or \
@@ -179,6 +177,10 @@ class WorkController(object):
                                 timeout=self.task_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 putlocks=self.pool_putlocks)
+        if not self.eta_scheduler_cls:
+            # Default Timer is set by the pool, as e.g. eventlet
+            # needs a custom implementation.
+            self.eta_scheduler_cls = self.pool.Timer
 
         if autoscale:
             self.autoscaler = instantiate(self.autoscaler_cls, self.pool,