|
@@ -7,6 +7,7 @@ from collections import deque
|
|
|
from datetime import datetime, timedelta
|
|
|
from Queue import Empty
|
|
|
|
|
|
+from billiard.exceptions import WorkerLostError
|
|
|
from kombu.exceptions import StdChannelError
|
|
|
from kombu.transport.base import Message
|
|
|
from kombu.connection import BrokerConnection
|
|
@@ -21,7 +22,7 @@ from celery.exceptions import SystemTerminate
|
|
|
from celery.task import task as task_dec
|
|
|
from celery.task import periodic_task as periodic_task_dec
|
|
|
from celery.utils import uuid
|
|
|
-from celery.worker import WorkController, Queues, Timers
|
|
|
+from celery.worker import WorkController, Queues, Timers, EvLoop, Pool
|
|
|
from celery.worker.buckets import FastQueue
|
|
|
from celery.worker.job import Request
|
|
|
from celery.worker.consumer import BlockingConsumer
|
|
@@ -952,6 +953,24 @@ class test_WorkController(AppCase):
|
|
|
self.assertTrue(worker.mediator)
|
|
|
self.assertNotEqual(worker.ready_queue.put, worker.process_task)
|
|
|
|
|
|
+ def test_process_task_sem(self):
|
|
|
+ worker = self.worker
|
|
|
+ worker.semaphore = Mock()
|
|
|
+
|
|
|
+ req = Mock()
|
|
|
+ worker.process_task_sem(req)
|
|
|
+ worker.semaphore.acquire.assert_called_with(worker.process_task, req)
|
|
|
+
|
|
|
+ def test_signal_consumer_close(self):
|
|
|
+ worker = self.worker
|
|
|
+ worker.consumer = Mock()
|
|
|
+
|
|
|
+ worker.signal_consumer_close()
|
|
|
+ worker.consumer.close.assert_called_with()
|
|
|
+
|
|
|
+ worker.consumer.close.side_effect = AttributeError()
|
|
|
+ worker.signal_consumer_close()
|
|
|
+
|
|
|
def test_start__stop(self):
|
|
|
worker = self.worker
|
|
|
worker._shutdown_complete.set()
|
|
@@ -964,6 +983,16 @@ class test_WorkController(AppCase):
|
|
|
for component in worker.components:
|
|
|
self.assertTrue(w.stop.call_count)
|
|
|
|
|
|
+ # Doesn't close pool if no pool.
|
|
|
+ worker.start()
|
|
|
+ worker.pool = None
|
|
|
+ worker.stop()
|
|
|
+
|
|
|
+ # test that stop of None is not attempted
|
|
|
+ worker.components[-1] = None
|
|
|
+ worker.start()
|
|
|
+ worker.stop()
|
|
|
+
|
|
|
def test_component_raises(self):
|
|
|
worker = self.worker
|
|
|
comp = Mock()
|
|
@@ -998,3 +1027,69 @@ class test_WorkController(AppCase):
|
|
|
w.pool_cls.rlimit_safe = False
|
|
|
Queues(w).create(w)
|
|
|
self.assertTrue(w.disable_rate_limits)
|
|
|
+
|
|
|
+ def test_Queues_pool_no_sem(self):
|
|
|
+ w = Mock()
|
|
|
+ w.pool_cls.uses_semaphore = False
|
|
|
+ Queues(w).create(w)
|
|
|
+ self.assertIs(w.ready_queue.put, w.process_task)
|
|
|
+
|
|
|
+ def test_EvLoop_crate(self):
|
|
|
+ w = Mock()
|
|
|
+ x = EvLoop(w)
|
|
|
+ hub = x.create(w)
|
|
|
+ self.assertTrue(w.timer.max_interval)
|
|
|
+ self.assertIs(w.hub, hub)
|
|
|
+
|
|
|
+ def test_Pool_crate_threaded(self):
|
|
|
+ w = Mock()
|
|
|
+ w.pool_cls = Mock()
|
|
|
+ w.use_eventloop = False
|
|
|
+ pool = Pool(w)
|
|
|
+ pool.create(w)
|
|
|
+
|
|
|
+ def test_Pool_create(self):
|
|
|
+ from celery.worker.hub import BoundedSemaphore
|
|
|
+ w = Mock()
|
|
|
+ w.hub = Mock()
|
|
|
+ w.hub.on_init = []
|
|
|
+ w.pool_cls = Mock()
|
|
|
+ P = w.pool_cls.return_value = Mock()
|
|
|
+ P.timers = {Mock(): 30}
|
|
|
+ w.use_eventloop = True
|
|
|
+ pool = Pool(w)
|
|
|
+ pool.create(w)
|
|
|
+ self.assertIsInstance(w.semaphore, BoundedSemaphore)
|
|
|
+ self.assertTrue(w.hub.on_init)
|
|
|
+
|
|
|
+ hub = Mock()
|
|
|
+ w.hub.on_init[0](hub)
|
|
|
+
|
|
|
+ cbs = w.pool.init_callbacks.call_args[1]
|
|
|
+ w = Mock()
|
|
|
+ cbs["on_process_up"](w)
|
|
|
+ hub.add_reader.assert_called_with(w.sentinel, P.maintain_pool)
|
|
|
+
|
|
|
+ cbs["on_process_down"](w)
|
|
|
+ hub.remove.assert_called_with(w.sentinel)
|
|
|
+
|
|
|
+ result = Mock()
|
|
|
+ tref = result._tref
|
|
|
+
|
|
|
+ cbs["on_timeout_cancel"](result)
|
|
|
+ tref.cancel.assert_called_with()
|
|
|
+ cbs["on_timeout_cancel"](result) # no more tref
|
|
|
+
|
|
|
+ cbs["on_timeout_set"](result, 10, 20)
|
|
|
+ tsoft, callback = hub.timer.apply_after.call_args[0]
|
|
|
+ callback()
|
|
|
+
|
|
|
+ cbs["on_timeout_set"](result, 10, None)
|
|
|
+ tsoft, callback = hub.timer.apply_after.call_args[0]
|
|
|
+ callback()
|
|
|
+ cbs["on_timeout_set"](result, None, 10)
|
|
|
+ cbs["on_timeout_set"](result, None, None)
|
|
|
+
|
|
|
+ P.did_start_ok.return_value = False
|
|
|
+ with self.assertRaises(WorkerLostError):
|
|
|
+ pool.on_poll_init(P, hub)
|