Browse Source

Embedded beat must set app for thread/process. Closes #2594

Ask Solem 10 years ago
parent
commit
a84d67ce33
3 changed files with 15 additions and 11 deletions
  1. 12 8
      celery/beat.py
  2. 2 2
      celery/tests/app/test_beat.py
  3. 1 1
      celery/worker/components.py

+ 12 - 8
celery/beat.py

@@ -535,13 +535,15 @@ class Service(object):
 class _Threaded(Thread):
     """Embedded task scheduler using threading."""
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, app, **kwargs):
         super(_Threaded, self).__init__()
-        self.service = Service(*args, **kwargs)
+        self.app = app
+        self.service = Service(app, **kwargs)
         self.daemon = True
         self.name = 'Beat'
 
     def run(self):
+        self.app.set_current()
         self.service.start()
 
     def stop(self):
@@ -555,9 +557,10 @@ except NotImplementedError:     # pragma: no cover
 else:
     class _Process(Process):    # noqa
 
-        def __init__(self, *args, **kwargs):
+        def __init__(self, app, **kwargs):
             super(_Process, self).__init__()
-            self.service = Service(*args, **kwargs)
+            self.app = app
+            self.service = Service(app, **kwargs)
             self.name = 'Beat'
 
         def run(self):
@@ -565,6 +568,8 @@ else:
             platforms.close_open_fds([
                 sys.__stdin__, sys.__stdout__, sys.__stderr__,
             ] + list(iter_open_logger_fds()))
+            self.app.set_default()
+            self.app.set_current()
             self.service.start(embedded_process=True)
 
         def stop(self):
@@ -572,7 +577,7 @@ else:
             self.terminate()
 
 
-def EmbeddedService(*args, **kwargs):
+def EmbeddedService(app, max_interval=None, **kwargs):
     """Return embedded clock service.
 
     :keyword thread: Run threaded instead of as a separate process.
@@ -582,6 +587,5 @@ def EmbeddedService(*args, **kwargs):
     if kwargs.pop('thread', False) or _Process is None:
         # Need short max interval to be able to stop thread
         # in reasonable time.
-        kwargs.setdefault('max_interval', 1)
-        return _Threaded(*args, **kwargs)
-    return _Process(*args, **kwargs)
+        return _Threaded(app, max_interval=1, **kwargs)
+    return _Process(app, max_interval=max_interval, **kwargs)

+ 2 - 2
celery/tests/app/test_beat.py

@@ -478,7 +478,7 @@ class test_EmbeddedService(AppCase):
 
         from billiard.process import Process
 
-        s = beat.EmbeddedService(app=self.app)
+        s = beat.EmbeddedService(self.app)
         self.assertIsInstance(s, Process)
         self.assertIsInstance(s.service, beat.Service)
         s.service = MockService()
@@ -499,7 +499,7 @@ class test_EmbeddedService(AppCase):
         self.assertTrue(s._popen.terminated)
 
     def test_start_stop_threaded(self):
-        s = beat.EmbeddedService(thread=True, app=self.app)
+        s = beat.EmbeddedService(self.app, thread=True)
         from threading import Thread
         self.assertIsInstance(s, Thread)
         self.assertIsInstance(s.service, beat.Service)

+ 1 - 1
celery/worker/components.py

@@ -203,7 +203,7 @@ class Beat(bootsteps.StartStopStep):
         from celery.beat import EmbeddedService
         if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
             raise ImproperlyConfigured(ERR_B_GREEN)
-        b = w.beat = EmbeddedService(app=w.app,
+        b = w.beat = EmbeddedService(w.app,
                                      schedule_filename=w.schedule_filename,
                                      scheduler_cls=w.scheduler_cls)
         return b