Browse Source

Make a common subclass for threads continously doing work.

Ask Solem 16 years ago
parent
commit
e7e48f455c
1 changed files with 53 additions and 41 deletions
  1. 53 41
      celery/worker/controllers.py

+ 53 - 41
celery/worker/controllers.py

@@ -10,7 +10,45 @@ import threading
 import time
 
 
-class Mediator(threading.Thread):
+class InfinityThread(threading.Thread):
+    """Thread running an infinite loop which for every iteration
+    calls its :meth:`on_iteration` method.
+
+    This also implements graceful shutdown of the thread by providing
+    the :meth:`stop` method.
+
+    """
+
+    def __init__(self):
+        super(InfinityThread, self).__init__()
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+
+    def run(self):
+        """This is the body of the thread.
+        
+        To start the thread use :meth:`start` instead.
+        
+        """
+        while True:
+            if self._shutdown.isSet():
+                break
+            self.on_iteration()
+        self._stopped.set() # indicate that we are stopped
+
+    def on_iteration(self):
+        """This is the method called for every iteration and must be 
+        implemented by every subclass of :class:`InfinityThread`."""
+        raise NotImplementedError(
+                "InfiniteThreads must implement on_iteration")
+    
+    def stop(self):
+        """Gracefully shutdown the thread."""
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done
+
+
+class Mediator(InfinityThread):
     """Thread continuously sending tasks in the queue to the pool.
 
     .. attribute:: bucket_queue
@@ -26,33 +64,20 @@ class Mediator(threading.Thread):
 
     def __init__(self, bucket_queue, callback):
         super(Mediator, self).__init__()
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
         self.bucket_queue = bucket_queue
         self.callback = callback
 
-    def run(self):
-        """Run the mediator thread. Should not be used directly, use
-        :meth:`start` instead."""
-        while True:
-            if self._shutdown.isSet():
-                break
+    def on_iteration(self):
+        try:
             # This blocks until there's a message in the queue.
-            try:
-                task = self.bucket_queue.get(timeout=1)
-            except QueueEmpty:
-                pass
-            else:
-                self.callback(task)
-        self._stopped.set() # indicate that we are stopped
-
-    def stop(self):
-        """Gracefully shutdown the thread."""
-        self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
+            task = self.bucket_queue.get(timeout=1)
+        except QueueEmpty:
+            pass
+        else:
+            self.callback(task)
 
 
-class PeriodicWorkController(threading.Thread):
+class PeriodicWorkController(InfinityThread):
     """A thread that continuously checks if there are
     :class:`celery.task.PeriodicTask` tasks waiting for execution,
     and executes them. It also finds tasks in the hold queue that is
@@ -65,24 +90,16 @@ class PeriodicWorkController(threading.Thread):
 
     def __init__(self, bucket_queue, hold_queue):
         super(PeriodicWorkController, self).__init__()
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
         self.hold_queue = hold_queue
         self.bucket_queue = bucket_queue
 
-    def run(self):
-        """Run the thread.
-
-        Should not be used directly, use :meth:`start` instead.
+    def on_iteration(self):
+        self.run_periodic_tasks()
+        self.process_hold_queue()
+        time.sleep(1)
 
-        """
-        while True:
-            if self._shutdown.isSet():
-                break
-            default_periodic_status_backend.run_periodic_tasks()
-            self.process_hold_queue()
-            time.sleep(1)
-        self._stopped.set() # indicate that we are stopped
+    def run_periodic_tasks(self):
+        default_periodic_status_backend.run_periodic_tasks()
 
     def process_hold_queue(self):
         """Finds paused tasks that are ready for execution and move
@@ -95,8 +112,3 @@ class PeriodicWorkController(threading.Thread):
             self.bucket_queue.put(task)
         else:
             self.hold_queue.put((task, eta))
-
-    def stop(self):
-        """Gracefully shutdown the thread."""
-        self._shutdown.set()
-        self._stopped.wait() # block until this thread is done