Kaynağa Gözat

Error while restarting using SIGHUP

Pidfile is removed only when interpreter exists. While restarting using
SIGHUP signal, obviously, interpreter doesn't exit, so pidfile is not
removed. We're fixing this by deleting pidfile in
WorkController._shutdown().
Ivan Virabyan 13 yıl önce
ebeveyn
işleme
e4400d4f58

+ 1 - 3
celery/apps/worker.py

@@ -216,15 +216,13 @@ class Worker(configurated):
         }
         }
 
 
     def run_worker(self):
     def run_worker(self):
-        if self.pidfile:
-            pidlock = platforms.create_pidlock(self.pidfile).acquire()
-            atexit.register(pidlock.release)
         worker = self.WorkController(app=self.app,
         worker = self.WorkController(app=self.app,
                                     hostname=self.hostname,
                                     hostname=self.hostname,
                                     ready_callback=self.on_consumer_ready,
                                     ready_callback=self.on_consumer_ready,
                                     embed_clockservice=self.embed_clockservice,
                                     embed_clockservice=self.embed_clockservice,
                                     autoscale=self.autoscale,
                                     autoscale=self.autoscale,
                                     autoreload=self.autoreload,
                                     autoreload=self.autoreload,
+                                    pidfile=self.pidfile,
                                     **self.confopts_as_dict())
                                     **self.confopts_as_dict())
         self.install_platform_tweaks(worker)
         self.install_platform_tweaks(worker)
         signals.worker_init.send(sender=worker)
         signals.worker_init.send(sender=worker)

+ 0 - 29
celery/tests/test_bin/test_celeryd.py

@@ -262,35 +262,6 @@ class test_Worker(AppCase):
         finally:
         finally:
             os.getuid = prev
             os.getuid = prev
 
 
-    @disable_stdouts
-    def test_use_pidfile(self):
-        from celery import platforms
-
-        class create_pidlock(object):
-            instance = [None]
-
-            def __init__(self, file):
-                self.file = file
-                self.instance[0] = self
-
-            def acquire(self):
-                self.acquired = True
-
-                class Object(object):
-                    def release(self):
-                        pass
-
-                return Object()
-
-        prev, platforms.create_pidlock = platforms.create_pidlock, \
-                                         create_pidlock
-        try:
-            worker = self.Worker(pidfile="pidfilelockfilepid")
-            worker.run_worker()
-            self.assertTrue(create_pidlock.instance[0].acquired)
-        finally:
-            platforms.create_pidlock = prev
-
     @disable_stdouts
     @disable_stdouts
     def test_redirect_stdouts(self):
     def test_redirect_stdouts(self):
         worker = self.Worker()
         worker = self.Worker()

+ 29 - 0
celery/tests/test_worker/__init__.py

@@ -737,6 +737,35 @@ class test_WorkController(AppCase):
         worker.logger = Mock()
         worker.logger = Mock()
         return worker
         return worker
 
 
+    def test_use_pidfile(self):
+        from celery import platforms
+
+        class create_pidlock(object):
+            instance = [None]
+
+            def __init__(self, file):
+                self.file = file
+                self.instance[0] = self
+
+            def acquire(self):
+                self.acquired = True
+                return self
+
+            def release(self):
+                self.acquired = False
+
+        prev, platforms.create_pidlock = platforms.create_pidlock, \
+                                         create_pidlock
+        try:
+            worker = self.create_worker(pidfile="pidfilelockfilepid")
+            worker.components = []
+            worker.start()
+            self.assertTrue(create_pidlock.instance[0].acquired)
+            worker.stop()
+            self.assertFalse(create_pidlock.instance[0].acquired)
+        finally:
+            platforms.create_pidlock = prev
+
     @patch("celery.platforms.signals")
     @patch("celery.platforms.signals")
     @patch("celery.platforms.set_mp_process_title")
     @patch("celery.platforms.set_mp_process_title")
     def test_process_initializer(self, set_mp_process_title, _signals):
     def test_process_initializer(self, set_mp_process_title, _signals):

+ 8 - 2
celery/worker/__init__.py

@@ -29,6 +29,7 @@ from ..app.abstract import configurated, from_config
 from ..exceptions import SystemTerminate
 from ..exceptions import SystemTerminate
 from ..utils.functional import noop
 from ..utils.functional import noop
 from ..utils.imports import qualname, reload_from_cwd
 from ..utils.imports import qualname, reload_from_cwd
+from .. import platforms
 
 
 from . import abstract
 from . import abstract
 from . import state
 from . import state
@@ -197,7 +198,7 @@ class WorkController(configurated):
 
 
     def __init__(self, loglevel=None, hostname=None, logger=None,
     def __init__(self, loglevel=None, hostname=None, logger=None,
             ready_callback=noop,
             ready_callback=noop,
-            queues=None, app=None, **kwargs):
+            queues=None, app=None, pidfile=None, **kwargs):
         self.app = app_or_default(app or self.app)
         self.app = app_or_default(app or self.app)
         self._shutdown_complete = threading.Event()
         self._shutdown_complete = threading.Event()
         self.setup_defaults(kwargs, namespace="celeryd")
         self.setup_defaults(kwargs, namespace="celeryd")
@@ -210,6 +211,8 @@ class WorkController(configurated):
         self.ready_callback = ready_callback
         self.ready_callback = ready_callback
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._finalize_db = None
         self._finalize_db = None
+        self.pidfile = pidfile
+        self.pidlock = None
 
 
         # Initialize boot steps
         # Initialize boot steps
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
         self.pool_cls = _concurrency.get_implementation(self.pool_cls)
@@ -220,7 +223,8 @@ class WorkController(configurated):
     def start(self):
     def start(self):
         """Starts the workers main loop."""
         """Starts the workers main loop."""
         self._state = self.RUN
         self._state = self.RUN
-
+        if self.pidfile:
+            self.pidlock = platforms.create_pidlock(self.pidfile).acquire()
         try:
         try:
             for i, component in enumerate(self.components):
             for i, component in enumerate(self.components):
                 self.logger.debug("Starting %s...", qualname(component))
                 self.logger.debug("Starting %s...", qualname(component))
@@ -280,6 +284,8 @@ class WorkController(configurated):
 
 
         self._state = self.CLOSE
         self._state = self.CLOSE
 
 
+        if self.pidlock:
+            self.pidlock.release()
         for component in reversed(self.components):
         for component in reversed(self.components):
             self.logger.debug("%s %s...", what, qualname(component))
             self.logger.debug("%s %s...", what, qualname(component))
             stop = component.stop
             stop = component.stop