|
@@ -22,7 +22,8 @@ from celery.exceptions import SystemTerminate
|
|
|
from celery.task import task as task_dec
|
|
|
from celery.task import periodic_task as periodic_task_dec
|
|
|
from celery.utils import uuid
|
|
|
-from celery.worker import WorkController, Queues, Timers, EvLoop, Pool
|
|
|
+from celery.worker import WorkController
|
|
|
+from celery.worker.components import Queues, Timers, EvLoop, Pool
|
|
|
from celery.worker.buckets import FastQueue
|
|
|
from celery.worker.job import Request
|
|
|
from celery.worker.consumer import BlockingConsumer
|
|
@@ -752,12 +753,17 @@ class test_WorkController(AppCase):
|
|
|
def setup(self):
|
|
|
self.worker = self.create_worker()
|
|
|
from celery import worker
|
|
|
+ from celery.worker import components
|
|
|
self._logger = worker.logger
|
|
|
+ self._comp_logger = components.logger
|
|
|
self.logger = worker.logger = Mock()
|
|
|
+ self.comp_logger = components.logger = Mock()
|
|
|
|
|
|
def teardown(self):
|
|
|
from celery import worker
|
|
|
+ from celery.worker import components
|
|
|
worker.logger = self._logger
|
|
|
+ components.logger = self._comp_logger
|
|
|
|
|
|
def create_worker(self, **kw):
|
|
|
worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
|
|
@@ -851,14 +857,14 @@ class test_WorkController(AppCase):
|
|
|
raise KeyError('foo')
|
|
|
except KeyError as exc:
|
|
|
Timers(worker).on_timer_error(exc)
|
|
|
- msg, args = self.logger.error.call_args[0]
|
|
|
+ msg, args = self.comp_logger.error.call_args[0]
|
|
|
self.assertIn('KeyError', msg % args)
|
|
|
|
|
|
def test_on_timer_tick(self):
|
|
|
worker = WorkController(concurrency=1, loglevel=10)
|
|
|
|
|
|
Timers(worker).on_timer_tick(30.0)
|
|
|
- xargs = self.logger.debug.call_args[0]
|
|
|
+ xargs = self.comp_logger.debug.call_args[0]
|
|
|
fmt, arg = xargs[0], xargs[1]
|
|
|
self.assertEqual(30.0, arg)
|
|
|
self.assertIn('Next eta %s secs', fmt)
|
|
@@ -959,11 +965,11 @@ class test_WorkController(AppCase):
|
|
|
|
|
|
def test_process_task_sem(self):
|
|
|
worker = self.worker
|
|
|
- worker.semaphore = Mock()
|
|
|
+ worker._quick_acquire = Mock()
|
|
|
|
|
|
req = Mock()
|
|
|
worker.process_task_sem(req)
|
|
|
- worker.semaphore.acquire.assert_called_with(worker.process_task, req)
|
|
|
+ worker._quick_acquire.assert_called_with(worker.process_task, req)
|
|
|
|
|
|
def test_signal_consumer_close(self):
|
|
|
worker = self.worker
|