|
@@ -104,6 +104,7 @@ class WorkController(object):
|
|
|
concurrency = conf.CELERYD_CONCURRENCY
|
|
|
logfile = conf.CELERYD_LOG_FILE
|
|
|
_state = None
|
|
|
+ _running = 0
|
|
|
|
|
|
def __init__(self, concurrency=None, logfile=None, loglevel=None,
|
|
|
send_events=conf.SEND_EVENTS, hostname=None,
|
|
@@ -184,13 +185,13 @@ class WorkController(object):
|
|
|
self._state = "RUN"
|
|
|
|
|
|
try:
|
|
|
- for component in self.components:
|
|
|
+ for i, component in enumerate(self.components):
|
|
|
self.logger.debug("Starting thread %s..." % \
|
|
|
component.__class__.__name__)
|
|
|
+ self._running = i
|
|
|
component.start()
|
|
|
finally:
|
|
|
self.stop()
|
|
|
-
|
|
|
def process_task(self, wrapper):
|
|
|
"""Process task by sending it to the pool of workers."""
|
|
|
try:
|
|
@@ -207,6 +208,8 @@ class WorkController(object):
|
|
|
"""Gracefully shutdown the worker server."""
|
|
|
if self._state != "RUN":
|
|
|
return
|
|
|
+ if self._running != len(self.components):
|
|
|
+ return
|
|
|
|
|
|
signals.worker_shutdown.send(sender=self)
|
|
|
for component in reversed(self.components):
|
|
@@ -219,7 +222,6 @@ class WorkController(object):
|
|
|
|
|
|
def terminate(self):
|
|
|
"""Not so gracefully shutdown the worker server."""
|
|
|
- return self.stop()
|
|
|
if self._state != "RUN":
|
|
|
return
|
|
|
|