|
@@ -25,6 +25,7 @@ from billiard.util import Finalize
|
|
|
from kombu.syn import detect_environment
|
|
|
|
|
|
from celery import bootsteps
|
|
|
+from celery.bootsteps import RUN, TERMINATE
|
|
|
from celery import concurrency as _concurrency
|
|
|
from celery import platforms
|
|
|
from celery import signals
|
|
@@ -88,7 +89,6 @@ class WorkController(object):
|
|
|
self.on_after_init(**kwargs)
|
|
|
|
|
|
self._finalize = [
|
|
|
- Finalize(self, self.stop, exitpriority=1),
|
|
|
Finalize(self, self._send_worker_shutdown, exitpriority=10),
|
|
|
]
|
|
|
self.setup_instance(**self.prepare_args(**kwargs))
|
|
@@ -217,12 +217,6 @@ class WorkController(object):
|
|
|
except Exception as exc:
|
|
|
logger.critical('Internal error: %r\n%s',
|
|
|
exc, traceback.format_exc(), exc_info=True)
|
|
|
- except SystemTerminate:
|
|
|
- self.terminate()
|
|
|
- raise
|
|
|
- except BaseException as exc:
|
|
|
- self.stop()
|
|
|
- raise exc
|
|
|
|
|
|
def signal_consumer_close(self):
|
|
|
try:
|
|
@@ -236,15 +230,17 @@ class WorkController(object):
|
|
|
|
|
|
def stop(self, in_sighandler=False):
|
|
|
"""Graceful shutdown of the worker server."""
|
|
|
- self.signal_consumer_close()
|
|
|
- if not in_sighandler or self.pool.signal_safe:
|
|
|
- self._shutdown(warm=True)
|
|
|
+ if self.blueprint.state == RUN:
|
|
|
+ self.signal_consumer_close()
|
|
|
+ if not in_sighandler or self.pool.signal_safe:
|
|
|
+ self._shutdown(warm=True)
|
|
|
|
|
|
def terminate(self, in_sighandler=False):
|
|
|
"""Not so graceful shutdown of the worker server."""
|
|
|
- self.signal_consumer_close()
|
|
|
- if not in_sighandler or self.pool.signal_safe:
|
|
|
- self._shutdown(warm=False)
|
|
|
+ if self.blueprint.state != TERMINATE:
|
|
|
+ self.signal_consumer_close()
|
|
|
+ if not in_sighandler or self.pool.signal_safe:
|
|
|
+ self._shutdown(warm=False)
|
|
|
|
|
|
def _shutdown(self, warm=True):
|
|
|
# if blueprint does not exist it means that we had an
|