Parcourir la source

Merge branch 'ivirabyan/master'

Conflicts:
	celery/apps/worker.py
	celery/tests/bin/test_celeryd.py
	celery/worker/__init__.py
Ask Solem il y a 13 ans
Parent
commit
d3192eb5c1

+ 1 - 2
celery/apps/worker.py

@@ -183,13 +183,12 @@ class Worker(configurated):
         }
 
     def run_worker(self):
-        if self.pidfile:
-            platforms.create_pidlock(self.pidfile)
         worker = self.WorkController(app=self.app,
                     hostname=self.hostname,
                     ready_callback=self.on_consumer_ready, beat=self.beat,
                     autoscale=self.autoscale, autoreload=self.autoreload,
                     no_execv=self.no_execv,
+                    pidfile=self.pidfile,
                     **self.confopts_as_dict())
         self.install_platform_tweaks(worker)
         signals.worker_init.send(sender=worker)

+ 0 - 7
celery/tests/bin/test_celeryd.py

@@ -231,13 +231,6 @@ class test_Worker(AppCase):
         finally:
             os.getuid = prev
 
-    @disable_stdouts
-    @patch("celery.platforms.create_pidlock")
-    def test_use_pidfile(self, create_pidlock):
-        worker = self.Worker(pidfile="pidfilelockfilepid")
-        worker.run_worker()
-        self.assertTrue(create_pidlock.called)
-
     @disable_stdouts
     def test_redirect_stdouts(self):
         worker = self.Worker()

+ 10 - 0
celery/tests/worker/test_worker.py

@@ -789,6 +789,16 @@ class test_WorkController(AppCase):
         worker._shutdown_complete.set()
         return worker
 
+    @patch("celery.platforms.create_pidlock")
+    def test_use_pidfile(self, create_pidlock):
+        create_pidlock.return_value = Mock()
+        worker = self.create_worker(pidfile="pidfilelockfilepid")
+        worker.components = []
+        worker.start()
+        self.assertTrue(create_pidlock.called)
+        worker.stop()
+        self.assertTrue(worker.pidlock.release.called)
+
     @patch("celery.platforms.signals")
     @patch("celery.platforms.set_mp_process_title")
     def test_process_initializer(self, set_mp_process_title, _signals):

+ 8 - 2
celery/worker/__init__.py

@@ -24,6 +24,7 @@ from billiard import forking_enable
 from kombu.utils.finalize import Finalize
 
 from celery import concurrency as _concurrency
+from celery import platforms
 from celery.app import app_or_default, set_default_app
 from celery.app.abstract import configurated, from_config
 from celery.exceptions import SystemTerminate
@@ -207,7 +208,7 @@ class WorkController(configurated):
     _running = 0
 
     def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
-            queues=None, app=None, **kwargs):
+            queues=None, app=None, pidfile=None, **kwargs):
         self.app = app_or_default(app or self.app)
 
         # all new threads start without a current app, so if an app is not
@@ -227,6 +228,8 @@ class WorkController(configurated):
         self.hostname = hostname or socket.gethostname()
         self.ready_callback = ready_callback
         self._finalize = Finalize(self, self.stop, exitpriority=1)
+        self.pidfile = pidfile
+        self.pidlock = None
 
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
@@ -236,7 +239,8 @@ class WorkController(configurated):
     def start(self):
         """Starts the workers main loop."""
         self._state = self.RUN
-
+        if self.pidfile:
+            self.pidlock = platforms.create_pidlock(self.pidfile)
         try:
             for i, component in enumerate(self.components):
                 logger.debug("Starting %s...", qualname(component))
@@ -303,6 +307,8 @@ class WorkController(configurated):
         self.priority_timer.stop()
         self.consumer.close_connection()
 
+        if self.pidlock:
+            self.pidlock.release()
         self._state = self.TERMINATE
         self._shutdown_complete.set()