| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pytest
 
- import sys
 
- import socket
 
- from collections import defaultdict
 
- from datetime import datetime, timedelta
 
- from case import Mock, call, patch
 
- from kombu import pidbox
 
- from kombu.utils.uuid import uuid
 
- from celery.five import Queue as FastQueue
 
- from celery.utils.timer2 import Timer
 
- from celery.worker import WorkController as _WC  # noqa
 
- from celery.worker import consumer
 
- from celery.worker import control
 
- from celery.worker import state as worker_state
 
- from celery.worker.request import Request
 
- from celery.worker.state import revoked
 
- from celery.worker.pidbox import Pidbox, gPidbox
 
- from celery.utils.collections import AttributeDict
 
- hostname = socket.gethostname()
 
- class WorkController(object):
 
-     autoscaler = None
 
-     def stats(self):
 
-         return {'total': worker_state.total_count}
 
- class Consumer(consumer.Consumer):
 
-     def __init__(self, app):
 
-         self.app = app
 
-         self.buffer = FastQueue()
 
-         self.timer = Timer()
 
-         self.event_dispatcher = Mock()
 
-         self.controller = WorkController()
 
-         self.task_consumer = Mock()
 
-         self.prefetch_multiplier = 1
 
-         self.initial_prefetch_count = 1
 
-         from celery.concurrency.base import BasePool
 
-         self.pool = BasePool(10)
 
-         self.task_buckets = defaultdict(lambda: None)
 
-         self.hub = None
 
-     def call_soon(self, p, *args, **kwargs):
 
-         return p(*args, **kwargs)
 
- class test_Pidbox:
 
-     def test_shutdown(self):
 
-         with patch('celery.worker.pidbox.ignore_errors') as eig:
 
-             parent = Mock()
 
-             pbox = Pidbox(parent)
 
-             pbox._close_channel = Mock()
 
-             assert pbox.c is parent
 
-             pconsumer = pbox.consumer = Mock()
 
-             cancel = pconsumer.cancel
 
-             pbox.shutdown(parent)
 
-             eig.assert_called_with(parent, cancel)
 
-             pbox._close_channel.assert_called_with(parent)
 
- class test_Pidbox_green:
 
-     def test_stop(self):
 
-         parent = Mock()
 
-         g = gPidbox(parent)
 
-         stopped = g._node_stopped = Mock()
 
-         shutdown = g._node_shutdown = Mock()
 
-         close_chan = g._close_channel = Mock()
 
-         g.stop(parent)
 
-         shutdown.set.assert_called_with()
 
-         stopped.wait.assert_called_with()
 
-         close_chan.assert_called_with(parent)
 
-         assert g._node_stopped is None
 
-         assert g._node_shutdown is None
 
-         close_chan.reset()
 
-         g.stop(parent)
 
-         close_chan.assert_called_with(parent)
 
-     def test_resets(self):
 
-         parent = Mock()
 
-         g = gPidbox(parent)
 
-         g._resets = 100
 
-         g.reset()
 
-         assert g._resets == 101
 
-     def test_loop(self):
 
-         parent = Mock()
 
-         conn = parent.connect.return_value = self.app.connection_for_read()
 
-         drain = conn.drain_events = Mock()
 
-         g = gPidbox(parent)
 
-         parent.connection = Mock()
 
-         do_reset = g._do_reset = Mock()
 
-         call_count = [0]
 
-         def se(*args, **kwargs):
 
-             if call_count[0] > 2:
 
-                 g._node_shutdown.set()
 
-             g.reset()
 
-             call_count[0] += 1
 
-         drain.side_effect = se
 
-         g.loop(parent)
 
-         assert do_reset.call_count == 4
 
- class test_ControlPanel:
 
-     def setup(self):
 
-         self.panel = self.create_panel(consumer=Consumer(self.app))
 
-         @self.app.task(name='c.unittest.mytask', rate_limit=200, shared=False)
 
-         def mytask():
 
-             pass
 
-         self.mytask = mytask
 
-     def create_state(self, **kwargs):
 
-         kwargs.setdefault('app', self.app)
 
-         kwargs.setdefault('hostname', hostname)
 
-         kwargs.setdefault('tset', set)
 
-         return AttributeDict(kwargs)
 
-     def create_panel(self, **kwargs):
 
-         return self.app.control.mailbox.Node(
 
-             hostname=hostname,
 
-             state=self.create_state(**kwargs),
 
-             handlers=control.Panel.data,
 
-         )
 
-     def test_enable_events(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         evd = consumer.event_dispatcher
 
-         evd.groups = set()
 
-         panel.handle('enable_events')
 
-         assert not evd.groups
 
-         evd.groups = {'worker'}
 
-         panel.handle('enable_events')
 
-         assert 'task' in evd.groups
 
-         evd.groups = {'task'}
 
-         assert 'already enabled' in panel.handle('enable_events')['ok']
 
-     def test_disable_events(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         evd = consumer.event_dispatcher
 
-         evd.enabled = True
 
-         evd.groups = {'task'}
 
-         panel.handle('disable_events')
 
-         assert 'task' not in evd.groups
 
-         assert 'already disabled' in panel.handle('disable_events')['ok']
 
-     def test_clock(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.state.app.clock.value = 313
 
-         x = panel.handle('clock')
 
-         assert x['clock'] == 313
 
-     def test_hello(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.state.app.clock.value = 313
 
-         panel.state.hostname = 'elaine@vandelay.com'
 
-         worker_state.revoked.add('revoked1')
 
-         try:
 
-             assert panel.handle('hello', {
 
-                 'from_node': 'elaine@vandelay.com',
 
-             }) is None
 
-             x = panel.handle('hello', {
 
-                 'from_node': 'george@vandelay.com',
 
-             })
 
-             assert x['clock'] == 314  # incremented
 
-             x = panel.handle('hello', {
 
-                 'from_node': 'george@vandelay.com',
 
-                 'revoked': {'1234', '4567', '891'}
 
-             })
 
-             assert 'revoked1' in x['revoked']
 
-             assert '1234' in x['revoked']
 
-             assert '4567' in x['revoked']
 
-             assert '891' in x['revoked']
 
-             assert x['clock'] == 315  # incremented
 
-         finally:
 
-             worker_state.revoked.discard('revoked1')
 
-     def test_conf(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.app = self.app
 
-         panel.app.finalize()
 
-         self.app.conf.some_key6 = 'hello world'
 
-         x = panel.handle('dump_conf')
 
-         assert 'some_key6' in x
 
-     def test_election(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         consumer.gossip = Mock()
 
-         panel.handle(
 
-             'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},
 
-         )
 
-         consumer.gossip.election.assert_called_with('id', 'topic', 'action')
 
-     def test_election__no_gossip(self):
 
-         consumer = Mock(name='consumer')
 
-         consumer.gossip = None
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.handle(
 
-             'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},
 
-         )
 
-     def test_heartbeat(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         event_dispatcher = consumer.event_dispatcher
 
-         event_dispatcher.enabled = True
 
-         panel.handle('heartbeat')
 
-         assert ('worker-heartbeat',) in event_dispatcher.send.call_args
 
-     def test_time_limit(self):
 
-         panel = self.create_panel(consumer=Mock())
 
-         r = panel.handle('time_limit', arguments=dict(
 
-             task_name=self.mytask.name, hard=30, soft=10))
 
-         assert self.mytask.time_limit == 30
 
-         assert self.mytask.soft_time_limit == 10
 
-         assert 'ok' in r
 
-         r = panel.handle('time_limit', arguments=dict(
 
-             task_name=self.mytask.name, hard=None, soft=None))
 
-         assert self.mytask.time_limit is None
 
-         assert self.mytask.soft_time_limit is None
 
-         assert 'ok' in r
 
-         r = panel.handle('time_limit', arguments=dict(
 
-             task_name='248e8afya9s8dh921eh928', hard=30))
 
-         assert 'error' in r
 
-     def test_active_queues(self):
 
-         import kombu
 
-         x = kombu.Consumer(self.app.connection_for_read(),
 
-                            [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),
 
-                             kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],
 
-                            auto_declare=False)
 
-         consumer = Mock()
 
-         consumer.task_consumer = x
 
-         panel = self.create_panel(consumer=consumer)
 
-         r = panel.handle('active_queues')
 
-         assert list(sorted(q['name'] for q in r)) == ['bar', 'foo']
 
-     def test_active_queues__empty(self):
 
-         consumer = Mock(name='consumer')
 
-         panel = self.create_panel(consumer=consumer)
 
-         consumer.task_consumer = None
 
-         assert not panel.handle('active_queues')
 
-     def test_dump_tasks(self):
 
-         info = '\n'.join(self.panel.handle('dump_tasks'))
 
-         assert 'mytask' in info
 
-         assert 'rate_limit=200' in info
 
-     def test_dump_tasks2(self):
 
-         prev, control.DEFAULT_TASK_INFO_ITEMS = (
 
-             control.DEFAULT_TASK_INFO_ITEMS, [])
 
-         try:
 
-             info = '\n'.join(self.panel.handle('dump_tasks'))
 
-             assert 'mytask' in info
 
-             assert 'rate_limit=200' not in info
 
-         finally:
 
-             control.DEFAULT_TASK_INFO_ITEMS = prev
 
-     def test_stats(self):
 
-         prev_count, worker_state.total_count = worker_state.total_count, 100
 
-         try:
 
-             assert self.panel.handle('stats')['total'] == 100
 
-         finally:
 
-             worker_state.total_count = prev_count
 
-     def test_report(self):
 
-         self.panel.handle('report')
 
-     def test_active(self):
 
-         r = Request(
 
-             self.TaskMessage(self.mytask.name, 'do re mi'),
 
-             app=self.app,
 
-         )
 
-         worker_state.active_requests.add(r)
 
-         try:
 
-             assert self.panel.handle('dump_active')
 
-         finally:
 
-             worker_state.active_requests.discard(r)
 
-     def test_pool_grow(self):
 
-         class MockPool(object):
 
-             def __init__(self, size=1):
 
-                 self.size = size
 
-             def grow(self, n=1):
 
-                 self.size += n
 
-             def shrink(self, n=1):
 
-                 self.size -= n
 
-             @property
 
-             def num_processes(self):
 
-                 return self.size
 
-         consumer = Consumer(self.app)
 
-         consumer.prefetch_multiplier = 8
 
-         consumer.qos = Mock(name='qos')
 
-         consumer.pool = MockPool(1)
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.handle('pool_grow')
 
-         assert consumer.pool.size == 2
 
-         consumer.qos.increment_eventually.assert_called_with(8)
 
-         assert consumer.initial_prefetch_count == 16
 
-         panel.handle('pool_shrink')
 
-         assert consumer.pool.size == 1
 
-         consumer.qos.decrement_eventually.assert_called_with(8)
 
-         assert consumer.initial_prefetch_count == 8
 
-         panel.state.consumer = Mock()
 
-         panel.state.consumer.controller = Mock()
 
-         sc = panel.state.consumer.controller.autoscaler = Mock()
 
-         panel.handle('pool_grow')
 
-         sc.force_scale_up.assert_called()
 
-         panel.handle('pool_shrink')
 
-         sc.force_scale_down.assert_called()
 
-     def test_add__cancel_consumer(self):
 
-         class MockConsumer(object):
 
-             queues = []
 
-             canceled = []
 
-             consuming = False
 
-             hub = Mock(name='hub')
 
-             def add_queue(self, queue):
 
-                 self.queues.append(queue.name)
 
-             def consume(self):
 
-                 self.consuming = True
 
-             def cancel_by_queue(self, queue):
 
-                 self.canceled.append(queue)
 
-             def consuming_from(self, queue):
 
-                 return queue in self.queues
 
-         consumer = Consumer(self.app)
 
-         consumer.task_consumer = MockConsumer()
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.handle('add_consumer', {'queue': 'MyQueue'})
 
-         assert 'MyQueue' in consumer.task_consumer.queues
 
-         assert consumer.task_consumer.consuming
 
-         panel.handle('add_consumer', {'queue': 'MyQueue'})
 
-         panel.handle('cancel_consumer', {'queue': 'MyQueue'})
 
-         assert 'MyQueue' in consumer.task_consumer.canceled
 
-     def test_revoked(self):
 
-         worker_state.revoked.clear()
 
-         worker_state.revoked.add('a1')
 
-         worker_state.revoked.add('a2')
 
-         try:
 
-             assert sorted(self.panel.handle('dump_revoked')) == ['a1', 'a2']
 
-         finally:
 
-             worker_state.revoked.clear()
 
-     def test_dump_schedule(self):
 
-         consumer = Consumer(self.app)
 
-         panel = self.create_panel(consumer=consumer)
 
-         assert not panel.handle('dump_schedule')
 
-         r = Request(
 
-             self.TaskMessage(self.mytask.name, 'CAFEBABE'),
 
-             app=self.app,
 
-         )
 
-         consumer.timer.schedule.enter_at(
 
-             consumer.timer.Entry(lambda x: x, (r,)),
 
-             datetime.now() + timedelta(seconds=10))
 
-         consumer.timer.schedule.enter_at(
 
-             consumer.timer.Entry(lambda x: x, (object(),)),
 
-             datetime.now() + timedelta(seconds=10))
 
-         assert panel.handle('dump_schedule')
 
-     def test_dump_reserved(self):
 
-         consumer = Consumer(self.app)
 
-         req = Request(
 
-             self.TaskMessage(self.mytask.name, args=(2, 2)), app=self.app,
 
-         )  # ^ need to keep reference for reserved_tasks WeakSet.
 
-         worker_state.task_reserved(req)
 
-         try:
 
-             panel = self.create_panel(consumer=consumer)
 
-             response = panel.handle('dump_reserved', {'safe': True})
 
-             assert response[0]['name'] == self.mytask.name
 
-             assert response[0]['hostname'] == socket.gethostname()
 
-             worker_state.reserved_requests.clear()
 
-             assert not panel.handle('dump_reserved')
 
-         finally:
 
-             worker_state.reserved_requests.clear()
 
-     def test_rate_limit_invalid_rate_limit_string(self):
 
-         e = self.panel.handle('rate_limit', arguments=dict(
 
-             task_name='tasks.add', rate_limit='x1240301#%!'))
 
-         assert 'Invalid rate limit string' in e.get('error')
 
-     def test_rate_limit(self):
 
-         class xConsumer(object):
 
-             reset = False
 
-             def reset_rate_limits(self):
 
-                 self.reset = True
 
-         consumer = xConsumer()
 
-         panel = self.create_panel(app=self.app, consumer=consumer)
 
-         task = self.app.tasks[self.mytask.name]
 
-         panel.handle('rate_limit', arguments=dict(task_name=task.name,
 
-                                                   rate_limit='100/m'))
 
-         assert task.rate_limit == '100/m'
 
-         assert consumer.reset
 
-         consumer.reset = False
 
-         panel.handle('rate_limit', arguments=dict(
 
-             task_name=task.name,
 
-             rate_limit=0,
 
-         ))
 
-         assert task.rate_limit == 0
 
-         assert consumer.reset
 
-     def test_rate_limit_nonexistant_task(self):
 
-         self.panel.handle('rate_limit', arguments={
 
-             'task_name': 'xxxx.does.not.exist',
 
-             'rate_limit': '1000/s'})
 
-     def test_unexposed_command(self):
 
-         with pytest.raises(KeyError):
 
-             self.panel.handle('foo', arguments={})
 
-     def test_revoke_with_name(self):
 
-         tid = uuid()
 
-         m = {
 
-             'method': 'revoke',
 
-             'destination': hostname,
 
-             'arguments': {
 
-                 'task_id': tid,
 
-                 'task_name': self.mytask.name,
 
-             },
 
-         }
 
-         self.panel.handle_message(m, None)
 
-         assert tid in revoked
 
-     def test_revoke_with_name_not_in_registry(self):
 
-         tid = uuid()
 
-         m = {
 
-             'method': 'revoke',
 
-             'destination': hostname,
 
-             'arguments': {
 
-                 'task_id': tid,
 
-                 'task_name': 'xxxxxxxxx33333333388888',
 
-             },
 
-         }
 
-         self.panel.handle_message(m, None)
 
-         assert tid in revoked
 
-     def test_revoke(self):
 
-         tid = uuid()
 
-         m = {
 
-             'method': 'revoke',
 
-             'destination': hostname,
 
-             'arguments': {
 
-                 'task_id': tid,
 
-             },
 
-         }
 
-         self.panel.handle_message(m, None)
 
-         assert tid in revoked
 
-         m = {
 
-             'method': 'revoke',
 
-             'destination': 'does.not.exist',
 
-             'arguments': {
 
-                 'task_id': tid + 'xxx',
 
-             },
 
-         }
 
-         self.panel.handle_message(m, None)
 
-         assert tid + 'xxx' not in revoked
 
-     def test_revoke_terminate(self):
 
-         request = Mock()
 
-         request.id = tid = uuid()
 
-         state = self.create_state()
 
-         state.consumer = Mock()
 
-         worker_state.task_reserved(request)
 
-         try:
 
-             r = control.revoke(state, tid, terminate=True)
 
-             assert tid in revoked
 
-             assert request.terminate.call_count
 
-             assert 'terminate:' in r['ok']
 
-             # unknown task id only revokes
 
-             r = control.revoke(state, uuid(), terminate=True)
 
-             assert 'tasks unknown' in r['ok']
 
-         finally:
 
-             worker_state.task_ready(request)
 
-     def test_autoscale(self):
 
-         self.panel.state.consumer = Mock()
 
-         self.panel.state.consumer.controller = Mock()
 
-         sc = self.panel.state.consumer.controller.autoscaler = Mock()
 
-         sc.update.return_value = 10, 2
 
-         m = {'method': 'autoscale',
 
-              'destination': hostname,
 
-              'arguments': {'max': '10', 'min': '2'}}
 
-         r = self.panel.handle_message(m, None)
 
-         assert 'ok' in r
 
-         self.panel.state.consumer.controller.autoscaler = None
 
-         r = self.panel.handle_message(m, None)
 
-         assert 'error' in r
 
-     def test_ping(self):
 
-         m = {'method': 'ping',
 
-              'destination': hostname}
 
-         r = self.panel.handle_message(m, None)
 
-         assert r == {'ok': 'pong'}
 
-     def test_shutdown(self):
 
-         m = {'method': 'shutdown',
 
-              'destination': hostname}
 
-         with pytest.raises(SystemExit):
 
-             self.panel.handle_message(m, None)
 
-     def test_panel_reply(self):
 
-         replies = []
 
-         class _Node(pidbox.Node):
 
-             def reply(self, data, exchange, routing_key, **kwargs):
 
-                 replies.append(data)
 
-         panel = _Node(
 
-             hostname=hostname,
 
-             state=self.create_state(consumer=Consumer(self.app)),
 
-             handlers=control.Panel.data,
 
-             mailbox=self.app.control.mailbox,
 
-         )
 
-         r = panel.dispatch('ping', reply_to={
 
-             'exchange': 'x',
 
-             'routing_key': 'x',
 
-         })
 
-         assert r == {'ok': 'pong'}
 
-         assert replies[0] == {panel.hostname: {'ok': 'pong'}}
 
-     def test_pool_restart(self):
 
-         consumer = Consumer(self.app)
 
-         consumer.controller = _WC(app=self.app)
 
-         consumer.controller.consumer = consumer
 
-         consumer.controller.pool.restart = Mock()
 
-         consumer.reset_rate_limits = Mock(name='reset_rate_limits()')
 
-         consumer.update_strategies = Mock(name='update_strategies()')
 
-         consumer.event_dispatcher = Mock(name='evd')
 
-         panel = self.create_panel(consumer=consumer)
 
-         assert panel.state.consumer.controller.consumer is consumer
 
-         panel.app = self.app
 
-         _import = panel.app.loader.import_from_cwd = Mock()
 
-         _reload = Mock()
 
-         with pytest.raises(ValueError):
 
-             panel.handle('pool_restart', {'reloader': _reload})
 
-         self.app.conf.worker_pool_restarts = True
 
-         panel.handle('pool_restart', {'reloader': _reload})
 
-         consumer.controller.pool.restart.assert_called()
 
-         consumer.reset_rate_limits.assert_called_with()
 
-         consumer.update_strategies.assert_called_with()
 
-         _reload.assert_not_called()
 
-         _import.assert_not_called()
 
-         consumer.controller.pool.restart.side_effect = NotImplementedError()
 
-         panel.handle('pool_restart', {'reloader': _reload})
 
-         consumer.controller.consumer = None
 
-         panel.handle('pool_restart', {'reloader': _reload})
 
-     @patch('celery.worker.logger.debug')
 
-     def test_pool_restart_import_modules(self, _debug):
 
-         consumer = Consumer(self.app)
 
-         consumer.controller = _WC(app=self.app)
 
-         consumer.controller.consumer = consumer
 
-         consumer.controller.pool.restart = Mock()
 
-         consumer.reset_rate_limits = Mock(name='reset_rate_limits()')
 
-         consumer.update_strategies = Mock(name='update_strategies()')
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.app = self.app
 
-         assert panel.state.consumer.controller.consumer is consumer
 
-         _import = consumer.controller.app.loader.import_from_cwd = Mock()
 
-         _reload = Mock()
 
-         self.app.conf.worker_pool_restarts = True
 
-         with patch('sys.modules'):
 
-             panel.handle('pool_restart', {
 
-                 'modules': ['foo', 'bar'],
 
-                 'reloader': _reload,
 
-             })
 
-         consumer.controller.pool.restart.assert_called()
 
-         consumer.reset_rate_limits.assert_called_with()
 
-         consumer.update_strategies.assert_called_with()
 
-         _reload.assert_not_called()
 
-         _import.assert_has_calls([call('bar'), call('foo')], any_order=True)
 
-         assert _import.call_count == 2
 
-     def test_pool_restart_reload_modules(self):
 
-         consumer = Consumer(self.app)
 
-         consumer.controller = _WC(app=self.app)
 
-         consumer.controller.consumer = consumer
 
-         consumer.controller.pool.restart = Mock()
 
-         consumer.reset_rate_limits = Mock(name='reset_rate_limits()')
 
-         consumer.update_strategies = Mock(name='update_strategies()')
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.app = self.app
 
-         _import = panel.app.loader.import_from_cwd = Mock()
 
-         _reload = Mock()
 
-         self.app.conf.worker_pool_restarts = True
 
-         with patch.dict(sys.modules, {'foo': None}):
 
-             panel.handle('pool_restart', {
 
-                 'modules': ['foo'],
 
-                 'reload': False,
 
-                 'reloader': _reload,
 
-             })
 
-             consumer.controller.pool.restart.assert_called()
 
-             _reload.assert_not_called()
 
-             _import.assert_not_called()
 
-             _import.reset_mock()
 
-             _reload.reset_mock()
 
-             consumer.controller.pool.restart.reset_mock()
 
-             panel.handle('pool_restart', {
 
-                 'modules': ['foo'],
 
-                 'reload': True,
 
-                 'reloader': _reload,
 
-             })
 
-             consumer.controller.pool.restart.assert_called()
 
-             _reload.assert_called()
 
-             _import.assert_not_called()
 
-     def test_query_task(self):
 
-         consumer = Consumer(self.app)
 
-         consumer.controller = _WC(app=self.app)
 
-         consumer.controller.consumer = consumer
 
-         panel = self.create_panel(consumer=consumer)
 
-         panel.app = self.app
 
-         req1 = Request(
 
-             self.TaskMessage(self.mytask.name, args=(2, 2)),
 
-             app=self.app,
 
-         )
 
-         worker_state.task_reserved(req1)
 
-         try:
 
-             assert not panel.handle('query_task', {'ids': {'1daa'}})
 
-             ret = panel.handle('query_task', {'ids': {req1.id}})
 
-             assert req1.id in ret
 
-             assert ret[req1.id][0] == 'reserved'
 
-             worker_state.active_requests.add(req1)
 
-             try:
 
-                 ret = panel.handle('query_task', {'ids': {req1.id}})
 
-                 assert ret[req1.id][0] == 'active'
 
-             finally:
 
-                 worker_state.active_requests.clear()
 
-             ret = panel.handle('query_task', {'ids': {req1.id}})
 
-             assert ret[req1.id][0] == 'reserved'
 
-         finally:
 
-             worker_state.reserved_requests.clear()
 
 
  |