|
@@ -18,7 +18,7 @@ from celery.app.defaults import DEFAULTS
|
|
|
from celery.bootsteps import RUN, CLOSE, TERMINATE, StartStopStep
|
|
|
from celery.concurrency.base import BasePool
|
|
|
from celery.datastructures import AttributeDict
|
|
|
-from celery.exceptions import SystemTerminate
|
|
|
+from celery.exceptions import SystemTerminate, TaskRevokedError
|
|
|
from celery.five import Empty, range, Queue as FastQueue
|
|
|
from celery.task import task as task_dec
|
|
|
from celery.task import periodic_task as periodic_task_dec
|
|
@@ -29,6 +29,7 @@ from celery.worker import consumer
|
|
|
from celery.worker.consumer import Consumer as __Consumer
|
|
|
from celery.worker.hub import READ, ERR
|
|
|
from celery.worker.job import Request
|
|
|
+from celery.utils import worker_direct
|
|
|
from celery.utils.serialization import pickle
|
|
|
from celery.utils.timer2 import Timer
|
|
|
|
|
@@ -829,6 +830,40 @@ class test_WorkController(AppCase):
|
|
|
worker.blueprint.shutdown_complete.set()
|
|
|
return worker
|
|
|
|
|
|
+ def test_on_consumer_ready(self):
|
|
|
+ self.worker.on_consumer_ready(Mock())
|
|
|
+
|
|
|
+ def test_setup_queues_worker_direct(self):
|
|
|
+ self.app.conf.CELERY_WORKER_DIRECT = True
|
|
|
+ _qs, self.app.amqp.__dict__['queues'] = self.app.amqp.queues, Mock()
|
|
|
+ try:
|
|
|
+ self.worker.setup_queues({})
|
|
|
+ self.app.amqp.queues.select_add.assert_called_with(
|
|
|
+ worker_direct(self.worker.hostname),
|
|
|
+ )
|
|
|
+ finally:
|
|
|
+ self.app.amqp.queues = _qs
|
|
|
+ self.app.conf.CELERY_WORKER_DIRECT = False
|
|
|
+
|
|
|
+ def test_send_worker_shutdown(self):
|
|
|
+ with patch('celery.signals.worker_shutdown') as ws:
|
|
|
+ self.worker._send_worker_shutdown()
|
|
|
+ ws.send.assert_called_with(sender=self.worker)
|
|
|
+
|
|
|
+ def test_process_task_revoked_release_semaphore(self):
|
|
|
+ self.worker._quick_release = Mock()
|
|
|
+ req = Mock()
|
|
|
+ req.execute_using_pool.side_effect = TaskRevokedError
|
|
|
+ self.worker._process_task(req)
|
|
|
+ self.worker._quick_release.assert_called_with()
|
|
|
+
|
|
|
+ delattr(self.worker, '_quick_release')
|
|
|
+ self.worker._process_task(req)
|
|
|
+
|
|
|
+ def test_shutdown_no_blueprint(self):
|
|
|
+ self.worker.blueprint = None
|
|
|
+ self.worker._shutdown()
|
|
|
+
|
|
|
@patch('celery.platforms.create_pidlock')
|
|
|
def test_use_pidfile(self, create_pidlock):
|
|
|
create_pidlock.return_value = Mock()
|