Quellcode durchsuchen

Revert billiard API changes. (Issue #1129)

Ask Solem vor 11 Jahren
Ursprung
Commit
ff6770e42c
1 geänderte Dateien mit 10 neuen und 8 gelöschten Zeilen
  1. 10 8
      celery/concurrency/processes.py

+ 10 - 8
celery/concurrency/processes.py

@@ -549,24 +549,22 @@ class TaskPool(BasePool):
 
     def _create_timelimit_handlers(self, hub, now=time):
         apply_after = hub.timer.apply_after
-        on_soft_timeout = self.on_soft_timeout
-        on_hard_timeout = self.on_hard_timeout
         trefs = self._tref_for_id = WeakValueDictionary()
 
-        def on_timeout_set(job, soft, hard):
+        def on_timeout_set(R, soft, hard):
             if soft:
-                trefs[job] = apply_after(
+                trefs[R._job] = apply_after(
                     soft * 1000.0,
-                    self._on_soft_timeout, (job, soft, hard, hub),
+                    self._on_soft_timeout, (R._job, soft, hard, hub),
                 )
             elif hard:
-                trefs[job] = apply_after(
+                trefs[R._job] = apply_after(
                     hard * 1000.0,
-                    self._on_hard_timeout, (job, )
+                    self._on_hard_timeout, (R._job, )
                 )
         self._pool.on_timeout_set = on_timeout_set
 
-        def on_timeout_cancel(job):
+        def _discard_tref(job):
             try:
                 tref = trefs.pop(job)
                 tref.cancel()
@@ -577,6 +575,10 @@ class TaskPool(BasePool):
                 gc.collect()
             except (KeyError, AttributeError):
                 pass  # out of scope
+        self._discard_tref = _discard_tref
+
+        def on_timeout_cancel(R):
+            _discard_tref(R._job)
         self._pool.on_timeout_cancel = on_timeout_cancel
 
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):