Преглед на файлове

Remove celery.timer and replace with threading.Timer instead.

Ask Solem преди 16 години
родител
ревизия
aef7f13a1c
променени са 4 файла, в които са добавени 51 реда и са изтрити 107 реда
  1. 22 10
      celery/backends/base.py
  2. 28 17
      celery/result.py
  3. 1 1
      celery/task/base.py
  4. 0 79
      celery/timer.py

+ 22 - 10
celery/backends/base.py

@@ -1,13 +1,17 @@
 """celery.backends.base"""
 import time
 
-from celery.timer import TimeoutTimer
+import threading
 try:
     import cPickle as pickle
 except ImportError:
     import pickle
 
 
+class TimeoutError(Exception):
+    """The operation timed out."""
+
+
 def find_nearest_pickleable_exception(exc):
     """With an exception instance, iterate over its super classes (by mro)
     and find the first super exception that is pickleable. It does
@@ -83,6 +87,7 @@ class BaseBackend(object):
 
     capabilities = []
     UnpickleableExceptionWrapper = UnpickleableExceptionWrapper
+    TimeoutError = TimeoutError
 
     def store_result(self, task_id, result, status):
         """Store the result and status of a task."""
@@ -168,15 +173,22 @@ class BaseBackend(object):
         longer than ``timeout`` seconds.
 
         """
-        timeout_timer = TimeoutTimer(timeout)
-        while True:
-            status = self.get_status(task_id)
-            if status == "DONE":
-                return self.get_result(task_id)
-            elif status == "FAILURE":
-                raise self.get_result(task_id)
-            time.sleep(0.5) # avoid hammering the CPU checking status.
-            timeout_timer.tick()
+
+        def on_timeout():
+            raise TimeoutError("The operation timed out.")
+
+        timeout_timer = threading.Timer(timeout, on_timeout)
+        timeout_timer.start()
+        try:
+            while True:
+                status = self.get_status(task_id)
+                if status == "DONE":
+                    return self.get_result(task_id)
+                elif status == "FAILURE":
+                    raise self.get_result(task_id)
+                time.sleep(0.5) # avoid hammering the CPU checking status.
+        finally:
+            timeout_timer.cancel()
 
     def process_cleanup(self):
         """Cleanup actions to do at the end of a task worker process.

+ 28 - 17
celery/result.py

@@ -5,8 +5,12 @@ Asynchronous result types.
 """
 from celery.backends import default_backend
 from celery.datastructures import PositionQueue
-from celery.timer import TimeoutTimer
 from itertools import imap
+import threading
+
+
+class TimeoutError(Exception):
+    """The operation timed out."""
 
 
 class BaseAsyncResult(object):
@@ -28,6 +32,8 @@ class BaseAsyncResult(object):
 
     """
 
+    TimeoutError = TimeoutError
+
     def __init__(self, task_id, backend):
         self.task_id = task_id
         self.backend = backend
@@ -50,7 +56,7 @@ class BaseAsyncResult(object):
         :keyword timeout: How long to wait in seconds, before the
             operation times out.
 
-        :raises celery.timer.TimeoutError: if ``timeout`` is not ``None`` and
+        :raises TimeoutError: if ``timeout`` is not ``None`` and
             the result does not arrive within ``timeout`` seconds.
 
         If the remote call raised an exception then that
@@ -253,7 +259,7 @@ class TaskSetResult(object):
         :keyword timeout: The time in seconds, how long
             it will wait for results, before the operation times out.
 
-        :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
+        :raises TimeoutError: if ``timeout`` is not ``None``
             and the operation takes longer than ``timeout`` seconds.
 
         If any of the tasks raises an exception, the exception
@@ -262,23 +268,28 @@ class TaskSetResult(object):
         :returns: list of return values for all tasks in the taskset.
 
         """
-        timeout_timer = TimeoutTimer(timeout)
-        results = PositionQueue(length=self.total)
 
-        while True:
-            for position, pending_result in enumerate(self.subtasks):
-                if pending_result.status == "DONE":
-                    results[position] = pending_result.result
-                elif pending_result.status == "FAILURE":
-                    raise pending_result.result
-            if results.full():
-                # Make list copy, so the returned type is not a position
-                # queue.
-                return list(results)
+        def on_timeout():
+            raise TimeoutError("The operation timed out.")
 
-            # This raises TimeoutError when timed out.
-            timeout_timer.tick()
+        timeout_timer = threading.Timer(timeout, on_timeout)
+        results = PositionQueue(length=self.total)
 
+        timeout_timer.start()
+        try:
+            while True:
+                for position, pending_result in enumerate(self.subtasks):
+                    if pending_result.status == "DONE":
+                        results[position] = pending_result.result
+                    elif pending_result.status == "FAILURE":
+                        raise pending_result.result
+                if results.full():
+                    # Make list copy, so the returned type is not a position
+                    # queue.
+                    return list(results)
+        finally:
+            timeout_timer.cancel()
+    
     @property
     def total(self):
         """The total number of tasks in the :class:`celery.task.TaskSet`."""

+ 1 - 1
celery/task/base.py

@@ -309,7 +309,7 @@ class TaskSet(object):
         :keyword timeout: The time in seconds, how long
             it will wait for results, before the operation times out.
 
-        :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
+        :raises TimeoutError: if ``timeout`` is not ``None``
             and the operation takes longer than ``timeout`` seconds.
 
         If any of the tasks raises an exception, the exception

+ 0 - 79
celery/timer.py

@@ -1,79 +0,0 @@
-"""
-
-Managing time and events
-
-"""
-import time
-
-
-class TimeoutError(Exception):
-    """The event has timed out."""
-
-
-class EventTimer(object):
-    """Do something at an interval.
-
-    .. attribute:: interval
-
-        How often we call the event (in seconds).
-
-    .. attribute:: event
-
-        The event callable to run every ``interval`` seconds.
-
-    .. attribute:: last_triggered
-
-        The last time, in unix timestamp format, the event was executed.
-
-    """
-
-    def __init__(self, event, interval=None):
-        self.event = event
-        self.interval = interval
-        self.last_triggered = None
-
-    def tick(self):
-        """Run a event timer clock tick.
-
-        When the interval has run, the event will be triggered.
-        If interval is not set, the event will never be triggered.
-
-        """
-        if not self.interval: # never trigger if no interval.
-            return
-        if not self.last_triggered or \
-                time.time() > self.last_triggered + self.interval:
-            self.event()
-            self.last_triggered = time.time()
-
-
-class TimeoutTimer(object):
-    """A timer that raises :exc:`TimeoutError` exception when the
-    time has run out.
-
-    .. attribute:: timeout
-
-        The timeout in seconds.
-
-    .. attribute:: time_start
-
-        The time when the timeout timer instance was constructed.
-
-    """
-
-    def __init__(self, timeout, timeout_msg="The operation timed out"):
-        self.timeout = timeout
-        self.timeout_msg = timeout_msg
-        self.time_start = time.time()
-
-    def tick(self):
-        """Run a timeout timer clock tick.
-
-        :raises TimeoutError: when :attr:`timeout` seconds has passed.
-            If :attr:`timeout` is not set, it will never time out.
-
-        """
-        if not self.timeout:
-            return
-        if time.time() > self.time_start + self.timeout:
-            raise TimeoutError(self.timeout_msg)