Browse Source

Don't use threading.Timer for timeouts.

Ask Solem 15 years ago
parent
commit
f28398b939
5 changed files with 34 additions and 73 deletions
  1. 14 16
      celery/backends/base.py
  2. 16 16
      celery/result.py
  3. 2 24
      celery/supervisor.py
  4. 1 16
      celery/tests/test_supervisor.py
  5. 1 1
      celery/tests/test_worker_job.py

+ 14 - 16
celery/backends/base.py

@@ -1,7 +1,6 @@
 """celery.backends.base"""
 import time
 import operator
-import threading
 from functools import partial as curry
 from celery.serialization import pickle
 
@@ -172,21 +171,20 @@ class BaseBackend(object):
 
         """
 
-        def on_timeout():
-            raise TimeoutError("The operation timed out.")
-
-        timeout_timer = threading.Timer(timeout, on_timeout)
-        timeout_timer.start()
-        try:
-            while True:
-                status = self.get_status(task_id)
-                if status == "DONE":
-                    return self.get_result(task_id)
-                elif status == "FAILURE":
-                    raise self.get_result(task_id)
-                time.sleep(0.5) # avoid hammering the CPU checking status.
-        finally:
-            timeout_timer.cancel()
+        sleep_inbetween = 0.5
+        time_elapsed = 0.0
+
+        while True:
+            status = self.get_status(task_id)
+            if status == "DONE":
+                return self.get_result(task_id)
+            elif status == "FAILURE":
+                raise self.get_result(task_id)
+            # avoid hammering the CPU checking status.
+            time.sleep(sleep_inbetween) 
+            time_elapsed += sleep_inbetween
+            if timeout and time_elapsed >= timeout:
+                raise TimeoutError("The operation timed out.")
 
     def process_cleanup(self):
         """Cleanup actions to do at the end of a task worker process.

+ 16 - 16
celery/result.py

@@ -6,7 +6,7 @@ Asynchronous result types.
 from celery.backends import default_backend
 from celery.datastructures import PositionQueue
 from itertools import imap
-import threading
+import time
 
 
 class TimeoutError(Exception):
@@ -269,26 +269,26 @@ class TaskSetResult(object):
 
         """
 
+        time_start = time.time()
+
         def on_timeout():
             raise TimeoutError("The operation timed out.")
 
-        timeout_timer = threading.Timer(timeout, on_timeout)
         results = PositionQueue(length=self.total)
 
-        timeout_timer.start()
-        try:
-            while True:
-                for position, pending_result in enumerate(self.subtasks):
-                    if pending_result.status == "DONE":
-                        results[position] = pending_result.result
-                    elif pending_result.status == "FAILURE":
-                        raise pending_result.result
-                if results.full():
-                    # Make list copy, so the returned type is not a position
-                    # queue.
-                    return list(results)
-        finally:
-            timeout_timer.cancel()
+        while True:
+            for position, pending_result in enumerate(self.subtasks):
+                if pending_result.status == "DONE":
+                    results[position] = pending_result.result
+                elif pending_result.status == "FAILURE":
+                    raise pending_result.result
+            if results.full():
+                # Make list copy, so the returned type is not a position
+                # queue.
+                return list(results)
+            else:
+                if time.time() >= time_start + timeout:
+                    on_timeout()
 
     @property
     def total(self):

+ 2 - 24
celery/supervisor.py

@@ -1,9 +1,7 @@
 import multiprocessing
-import threading
 import time
 from multiprocessing import TimeoutError
 
-PING_TIMEOUT = 30 # seconds
 JOIN_TIMEOUT = 2
 CHECK_INTERVAL = 2
 MAX_RESTART_FREQ = 3
@@ -14,12 +12,6 @@ class MaxRestartsExceededError(Exception):
     """Restarts exceeded the maximum restart frequency."""
 
 
-def raise_ping_timeout(msg):
-    """Raises :exc:`multiprocessing.TimeoutError`, for use in
-    :class:`threading.Timer` callbacks."""
-    raise TimeoutError("Supervised: Timed out while pinging process.")
-
-
 class OFASupervisor(object):
     """Process supervisor using the `one_for_all`_ strategy.
 
@@ -76,14 +68,13 @@ class OFASupervisor(object):
     Process = multiprocessing.Process
 
     def __init__(self, target, args=None, kwargs=None,
-            ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
+            join_timeout=JOIN_TIMEOUT,
             max_restart_freq = MAX_RESTART_FREQ,
             max_restart_freq_time=MAX_RESTART_FREQ_TIME,
             check_interval=CHECK_INTERVAL):
         self.target = target
         self.args = args or []
         self.kwargs = kwargs or {}
-        self.ping_timeout = ping_timeout
         self.join_timeout = join_timeout
         self.check_interval = check_interval
         self.max_restart_freq = max_restart_freq
@@ -121,7 +112,7 @@ class OFASupervisor(object):
                 self.restarts_in_frame = 0
 
                 try:
-                    proc_is_alive = self._is_alive(process)
+                    proc_is_alive = process.is_alive()
                 except TimeoutError:
                     proc_is_alive = False
 
@@ -132,16 +123,3 @@ class OFASupervisor(object):
                 restart_frame += self.check_interval
         finally:
             process.join()
-
-    def _is_alive(self, process):
-        """Sends a ping to the target process to see if it's alive.
-
-        :rtype bool:
-
-        """
-        timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
-        try:
-            alive = process.is_alive()
-        finally:
-            timeout_timer.cancel()
-        return alive

+ 1 - 16
celery/tests/test_supervisor.py

@@ -1,5 +1,5 @@
 import unittest
-from celery.supervisor import raise_ping_timeout, OFASupervisor
+from celery.supervisor import OFASupervisor
 from celery.supervisor import TimeoutError, MaxRestartsExceededError
 
 
@@ -42,27 +42,12 @@ class MockProcess(object):
         self._joined = True
 
 
-class TestDiv(unittest.TestCase):
-
-    def test_raise_ping_timeout(self):
-        self.assertRaises(TimeoutError, raise_ping_timeout, "timed out")
-
-
 class TestOFASupervisor(unittest.TestCase):
 
     def test_init(self):
         s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={})
         s.Process = MockProcess
 
-    def test__is_alive(self):
-        s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={})
-        s.Process = MockProcess
-        proc = MockProcess(target_one, [2, 4, 8], {})
-        proc.start()
-        self.assertTrue(s._is_alive(proc))
-        proc.alive = False
-        self.assertFalse(s._is_alive(proc))
-
     def test_start(self):
         MockProcess.alive = False
         s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},

+ 1 - 1
celery/tests/test_worker_job.py

@@ -177,7 +177,7 @@ class TestTaskWrapper(unittest.TestCase):
     def test_execute_ack(self):
         tid = gen_unique_id()
         tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"},
-                        on_acknowledge=on_ack)
+                        on_ack=on_ack)
         self.assertEquals(tw.execute(), 256)
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertTrue(scratch["ACK"])