Browse Source

Embedded celerybeat is now running as a separate process instead of as a
thread.

Ask Solem 15 years ago
parent
commit
712c33ab9f
4 changed files with 64 additions and 19 deletions
  1. 42 9
      celery/beat.py
  2. 0 1
      celery/result.py
  3. 18 3
      celery/tests/test_beat.py
  4. 4 6
      celery/worker/__init__.py

+ 42 - 9
celery/beat.py

@@ -1,6 +1,7 @@
 import time
 import shelve
 import threading
+import multiprocessing
 from datetime import datetime
 from UserDict import UserDict
 
@@ -198,6 +199,7 @@ class ClockService(object):
             self._stopped.set()
 
     def stop(self, wait=False):
+        self.logger.info("ClockService: Shutting down...")
         self._shutdown.set()
         wait and self._stopped.wait() # block until shutdown done.
 
@@ -218,15 +220,46 @@ class ClockService(object):
         return self._scheduler
 
 
-class ClockServiceThread(threading.Thread):
+def EmbeddedClockService(*args, **kwargs):
+    """Return embedded clock service.
 
-    def __init__(self, *args, **kwargs):
-        self.clockservice = ClockService(*args, **kwargs)
-        threading.Thread.__init__(self)
-        self.setDaemon(True)
+    :keyword thread: Run threaded instead of as a separate process.
+        Default is ``False``.
 
-    def run(self):
-        self.clockservice.start()
+    """
+
+    class _Threaded(threading.Thread):
+        """Embedded clock service using threading."""
+
+        def __init__(self, *args, **kwargs):
+            super(_Threaded, self).__init__()
+            self.clockservice = ClockService(*args, **kwargs)
+            self.setDaemon(True)
+
+        def run(self):
+            self.clockservice.start()
+
+        def stop(self):
+            self.clockservice.stop(wait=True)
+
+    class _Process(multiprocessing.Process):
+        """Embedded clock service using multiprocessing."""
+
+        def __init__(self, *args, **kwargs):
+            super(_Process, self).__init__()
+            self.clockservice = ClockService(*args, **kwargs)
+            self.daemon = True
+
+        def run(self):
+            self.clockservice.start()
+
+        def stop(self):
+            self.clockservice.stop()
+
+    if kwargs.pop("thread", False):
+        # Need short max interval to be able to stop thread
+        # in reasonable time.
+        kwargs.setdefault("max_interval", 1)
+        return _Threaded(*args, **kwargs)
 
-    def stop(self):
-        self.clockservice.stop(wait=True)
+    return _Process(*args, **kwargs)

+ 0 - 1
celery/result.py

@@ -142,7 +142,6 @@ class BaseAsyncResult(object):
         return self.backend.get_status(self.task_id)
 
 
-
 class AsyncResult(BaseAsyncResult):
     """Pending task result using the default backend.
 

+ 18 - 3
celery/tests/test_beat.py

@@ -199,10 +199,25 @@ class TestClockService(unittest.TestCase):
         self.assertTrue(s._shutdown.isSet())
 
 
-class TestClockServiceThread(unittest.TestCase):
+class TestEmbeddedClockService(unittest.TestCase):
 
-    def test_start_stop(self):
-        s = beat.ClockServiceThread()
+    def test_start_stop_process(self):
+        s = beat.EmbeddedClockService()
+        from multiprocessing import Process
+        self.assertTrue(isinstance(s, Process))
+        self.assertTrue(isinstance(s.clockservice, beat.ClockService))
+        s.clockservice = MockClockService()
+
+        s.run()
+        self.assertTrue(s.clockservice.started)
+
+        s.stop()
+        self.assertTrue(s.clockservice.stopped)
+
+    def test_start_stop_threaded(self):
+        s = beat.EmbeddedClockService(thread=True)
+        from threading import Thread
+        self.assertTrue(isinstance(s, Thread))
         self.assertTrue(isinstance(s.clockservice, beat.ClockService))
         s.clockservice = MockClockService()
 

+ 4 - 6
celery/worker/__init__.py

@@ -13,7 +13,7 @@ from celery import registry
 from celery import platform
 from celery import signals
 from celery.log import setup_logger, _hijack_multiprocessing_logger
-from celery.beat import ClockServiceThread
+from celery.beat import EmbeddedClockService
 
 from celery.worker.pool import TaskPool
 from celery.worker.buckets import TaskBucket
@@ -133,11 +133,9 @@ class WorkController(object):
         self.scheduler = ScheduleController(self.eta_schedule,
                                             logger=self.logger)
 
-        # Need a tight loop interval when embedded so the program
-        # can be stopped in a sensible short time.
-        self.clockservice = self.embed_clockservice and ClockServiceThread(
-                                logger=self.logger,
-                                max_interval=1) or None
+        self.clockservice = None
+        if self.embed_clockservice:
+            self.clockservice = EmbeddedClockService(logger=self.logger)
 
         prefetch_count = concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
         self.listener = CarrotListener(self.ready_queue,