|
@@ -46,6 +46,7 @@ from kombu.utils.eventio import SELECT_BAD_FD
|
|
from celery.five import Counter, items, values
|
|
from celery.five import Counter, items, values
|
|
from celery.utils.log import get_logger
|
|
from celery.utils.log import get_logger
|
|
from celery.utils.text import truncate
|
|
from celery.utils.text import truncate
|
|
|
|
+from celery.worker import state as worker_state
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
logger = get_logger(__name__)
|
|
error, debug = logger.error, logger.debug
|
|
error, debug = logger.error, logger.debug
|
|
@@ -536,14 +537,20 @@ class AsynPool(_pool.Pool):
|
|
get_job = self._cache.__getitem__
|
|
get_job = self._cache.__getitem__
|
|
write_stats = self.write_stats
|
|
write_stats = self.write_stats
|
|
is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
|
|
is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
|
|
|
|
+ revoked_tasks = worker_state.revoked
|
|
|
|
|
|
precalc = {ACK: self._create_payload(ACK, (0, )),
|
|
precalc = {ACK: self._create_payload(ACK, (0, )),
|
|
NACK: self._create_payload(NACK, (0, ))}
|
|
NACK: self._create_payload(NACK, (0, ))}
|
|
|
|
|
|
def _put_back(job):
|
|
def _put_back(job):
|
|
# puts back at the end of the queue
|
|
# puts back at the end of the queue
|
|
- if job not in outbound: # XXX slow, should find another way
|
|
|
|
- outbound.appendleft(job)
|
|
|
|
|
|
+ if job._terminated or job.correlation_id in revoked_tasks:
|
|
|
|
+ job._set_terminated('process already gone')
|
|
|
|
+ else:
|
|
|
|
+ # XXX linear lookup, should find a better way,
|
|
|
|
+ # but this happens rarely and is here to protect against races.
|
|
|
|
+ if job not in outbound:
|
|
|
|
+ outbound.appendleft(job)
|
|
self._put_back = _put_back
|
|
self._put_back = _put_back
|
|
|
|
|
|
def on_poll_start():
|
|
def on_poll_start():
|