Browse Source

Database access can't happen at Thread.__init__, renamed InfinityThread ->
BackgroundThread and added on_start() + on_stop() handlers that is run just
before the infinite loop and when the thread is shutdown. The initialization
of periodic task meta entries in the database is moved to on_start() in
PeriodicWorkController.

Ask Solem 16 years ago
parent
commit
358b463111
2 changed files with 26 additions and 12 deletions
  1. 7 6
      celery/tests/test_worker_controllers.py
  2. 19 6
      celery/worker/controllers.py

+ 7 - 6
celery/tests/test_worker_controllers.py

@@ -5,7 +5,7 @@ from Queue import Queue, Empty
 from datetime import datetime, timedelta
 
 from celery.worker.controllers import Mediator, PeriodicWorkController
-from celery.worker.controllers import InfinityThread
+from celery.worker.controllers import BackgroundThread
 
 
 class MockTask(object):
@@ -16,26 +16,27 @@ class MockTask(object):
         self.value = value
 
 
-class MyInfinityThread(InfinityThread):
+class MyBackgroundThread(BackgroundThread):
 
     def on_iteration(self):
         import time
         time.sleep(1)
 
 
-class TestInfinityThread(unittest.TestCase):
+class TestBackgroundThread(unittest.TestCase):
 
     def test_on_iteration(self):
-        self.assertRaises(NotImplementedError, InfinityThread().on_iteration)
+        self.assertRaises(NotImplementedError,
+                BackgroundThread().on_iteration)
 
     def test_run(self):
-        t = MyInfinityThread()
+        t = MyBackgroundThread()
         t._shutdown.set()
         t.run()
         self.assertTrue(t._stopped.isSet())
 
     def test_start_stop(self):
-        t = MyInfinityThread()
+        t = MyBackgroundThread()
         t.start()
         self.assertFalse(t._shutdown.isSet())
         self.assertFalse(t._stopped.isSet())

+ 19 - 6
celery/worker/controllers.py

@@ -11,7 +11,7 @@ import threading
 import time
 
 
-class InfinityThread(threading.Thread):
+class BackgroundThread(threading.Thread):
     """Thread running an infinite loop which for every iteration
     calls its :meth:`on_iteration` method.
 
@@ -22,7 +22,7 @@ class InfinityThread(threading.Thread):
     is_infinite = True
 
     def __init__(self):
-        super(InfinityThread, self).__init__()
+        super(BackgroundThread, self).__init__()
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self.setDaemon(True)
@@ -33,25 +33,37 @@ class InfinityThread(threading.Thread):
         To start the thread use :meth:`start` instead.
 
         """
+        self.on_start()
+
         while self.is_infinite:
             if self._shutdown.isSet():
                 break
             self.on_iteration()
         self._stopped.set() # indicate that we are stopped
 
+    def on_start(self):
+        """This handler is run at thread start, just before the infinite
+        loop."""
+        pass
+
     def on_iteration(self):
         """This is the method called for every iteration and must be
-        implemented by every subclass of :class:`InfinityThread`."""
+        implemented by every subclass of :class:`BackgroundThread`."""
         raise NotImplementedError(
                 "InfiniteThreads must implement on_iteration")
 
+    def on_stop(self):
+        """This handler is run when the thread is shutdown."""
+        pass
+
     def stop(self):
         """Gracefully shutdown the thread."""
+        self.on_stop()
         self._shutdown.set()
         self._stopped.wait() # block until this thread is done
 
 
-class Mediator(InfinityThread):
+class Mediator(BackgroundThread):
     """Thread continuously sending tasks in the queue to the pool.
 
     .. attribute:: bucket_queue
@@ -85,7 +97,7 @@ class Mediator(InfinityThread):
             self.callback(task)
 
 
-class PeriodicWorkController(InfinityThread):
+class PeriodicWorkController(BackgroundThread):
     """A thread that continuously checks if there are
     :class:`celery.task.PeriodicTask` tasks waiting for execution,
     and executes them. It also finds tasks in the hold queue that is
@@ -101,7 +113,8 @@ class PeriodicWorkController(InfinityThread):
         self.hold_queue = hold_queue
         self.bucket_queue = bucket_queue
 
-        # Do backend-specific periodic task initialization.
+    def on_start(self):
+        """Do backend-specific periodic task initialization."""
         default_periodic_status_backend.init_periodic_tasks()
 
     def on_iteration(self):