Selaa lähdekoodia

100% coverage for celery.worker.control

Ask Solem 12 vuotta sitten
vanhempi
commit
4b656c7ec3

+ 53 - 16
celery/tests/worker/test_control.py

@@ -16,7 +16,7 @@ from celery.utils.timer2 import Timer
 from celery.worker import WorkController as _WC
 from celery.worker import consumer
 from celery.worker import control
-from celery.worker import state
+from celery.worker import state as worker_state
 from celery.five import Queue as FastQueue
 from celery.worker.job import TaskRequest
 from celery.worker.state import revoked
@@ -36,7 +36,7 @@ class WorkController(object):
     autoscaler = None
 
     def stats(self):
-        return {'total': state.total_count}
+        return {'total': worker_state.total_count}
 
 
 class Consumer(consumer.Consumer):
@@ -152,6 +152,41 @@ class test_ControlPanel(AppCase):
         self.assertNotIn('task', evd.groups)
         self.assertIn('already disabled', panel.handle('disable_events')['ok'])
 
+    def test_clock(self):
+        consumer = Consumer(self.app)
+        panel = self.create_panel(consumer=consumer)
+        panel.state.app.clock.value = 313
+        x = panel.handle('clock')
+        self.assertEqual(x['clock'], 313)
+
+    def test_hello(self):
+        consumer = Consumer(self.app)
+        panel = self.create_panel(consumer=consumer)
+        panel.state.app.clock.value = 313
+        worker_state.revoked.add('revoked1')
+        try:
+            x = panel.handle('hello')
+            self.assertIn('revoked1', x['revoked'])
+            self.assertEqual(x['clock'], 314)  # incremented
+        finally:
+            worker_state.revoked.discard('revoked1')
+
+    def test_conf(self):
+        consumer = Consumer(self.app)
+        panel = self.create_panel(consumer=consumer)
+        self.app.conf.SOME_KEY6 = 'hello world'
+        x = panel.handle('dump_conf')
+        self.assertIn('SOME_KEY6', x)
+
+    def test_election(self):
+        consumer = Consumer(self.app)
+        panel = self.create_panel(consumer=consumer)
+        consumer.gossip = Mock()
+        panel.handle(
+            'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},
+        )
+        consumer.gossip.election.assert_called_with('id', 'topic', 'action')
+
     def test_heartbeat(self):
         consumer = Consumer(self.app)
         panel = self.create_panel(consumer=consumer)
@@ -201,23 +236,23 @@ class test_ControlPanel(AppCase):
         self.assertIn('rate_limit=200', info)
 
     def test_stats(self):
-        prev_count, state.total_count = state.total_count, 100
+        prev_count, worker_state.total_count = worker_state.total_count, 100
         try:
             self.assertDictContainsSubset({'total': 100},
                                           self.panel.handle('stats'))
         finally:
-            state.total_count = prev_count
+            worker_state.total_count = prev_count
 
     def test_report(self):
         self.panel.handle('report')
 
     def test_active(self):
         r = TaskRequest(mytask.name, 'do re mi', (), {})
-        state.active_requests.add(r)
+        worker_state.active_requests.add(r)
         try:
             self.assertTrue(self.panel.handle('dump_active'))
         finally:
-            state.active_requests.discard(r)
+            worker_state.active_requests.discard(r)
 
     def test_pool_grow(self):
 
@@ -280,15 +315,15 @@ class test_ControlPanel(AppCase):
         self.assertIn('MyQueue', consumer.task_consumer.cancelled)
 
     def test_revoked(self):
-        state.revoked.clear()
-        state.revoked.add('a1')
-        state.revoked.add('a2')
+        worker_state.revoked.clear()
+        worker_state.revoked.add('a1')
+        worker_state.revoked.add('a2')
 
         try:
             self.assertEqual(sorted(self.panel.handle('dump_revoked')),
                              ['a1', 'a2'])
         finally:
-            state.revoked.clear()
+            worker_state.revoked.clear()
 
     def test_dump_schedule(self):
         consumer = Consumer(self.app)
@@ -298,12 +333,14 @@ class test_ControlPanel(AppCase):
         consumer.timer.schedule.enter(
             consumer.timer.Entry(lambda x: x, (r, )),
             datetime.now() + timedelta(seconds=10))
+        consumer.timer.schedule.enter(
+            consumer.timer.Entry(lambda x: x, (object(), )),
+            datetime.now() + timedelta(seconds=10))
         self.assertTrue(panel.handle('dump_schedule'))
 
     def test_dump_reserved(self):
-        from celery.worker import state
         consumer = Consumer(self.app)
-        state.reserved_requests.add(
+        worker_state.reserved_requests.add(
             TaskRequest(mytask.name, uuid(), args=(2, 2), kwargs={}),
         )
         try:
@@ -316,10 +353,10 @@ class test_ControlPanel(AppCase):
                  'hostname': socket.gethostname()},
                 response[0],
             )
-            state.reserved_requests.clear()
+            worker_state.reserved_requests.clear()
             self.assertFalse(panel.handle('dump_reserved'))
         finally:
-            state.reserved_requests.clear()
+            worker_state.reserved_requests.clear()
 
     def test_rate_limit_invalid_rate_limit_string(self):
         e = self.panel.handle('rate_limit', arguments=dict(
@@ -396,7 +433,7 @@ class test_ControlPanel(AppCase):
     def test_revoke_terminate(self):
         request = Mock()
         request.id = tid = uuid()
-        state.reserved_requests.add(request)
+        worker_state.reserved_requests.add(request)
         try:
             r = control.revoke(Mock(), tid, terminate=True)
             self.assertIn(tid, revoked)
@@ -406,7 +443,7 @@ class test_ControlPanel(AppCase):
             r = control.revoke(Mock(), uuid(), terminate=True)
             self.assertIn('not found', r['ok'])
         finally:
-            state.reserved_requests.discard(request)
+            worker_state.reserved_requests.discard(request)
 
     def test_autoscale(self):
         self.panel.state.consumer = Mock()

+ 1 - 1
celery/tests/worker/test_heartbeat.py

@@ -1,7 +1,7 @@
 from __future__ import absolute_import
 
 from celery.worker.heartbeat import Heart
-from celery.tests.utils import Case, sleepdeprived
+from celery.tests.utils import Case
 
 
 class MockDispatcher(object):

+ 1 - 1
celery/tests/worker/test_hub.py

@@ -238,7 +238,7 @@ class test_Hub(Case):
         self.assertTrue(hub.repr_events([
             (6, READ),
             (7, ERR),
-            (8, READ|ERR),
+            (8, READ | ERR),
             (9, WRITE),
             (10, 13213),
         ]))

+ 14 - 3
celery/utils/__init__.py

@@ -200,14 +200,23 @@ def strtobool(term, table={'false': False, 'no': False, '0': False,
     return term
 
 
-def jsonify(obj, builtin_types=(int, float, string_t)):
+def jsonify(obj,
+            builtin_types=(int, float, string_t), key=None,
+            keyfilter=None):
     """Transforms object making it suitable for json serialization"""
+    from kombu.abstract import Object as KombuDictType
+
+    if isinstance(obj, KombuDictType):
+        obj = obj.as_dict(recurse=True)
+
     if obj is None or isinstance(obj, builtin_types):
         return obj
     elif isinstance(obj, (tuple, list)):
         return [jsonify(v) for v in obj]
     elif isinstance(obj, dict):
-        return dict((k, jsonify(v)) for k, v in items(obj))
+        return dict((k, jsonify(v, key=k))
+                    for k, v in items(obj)
+                    if (keyfilter(k) if keyfilter else 1))
     elif isinstance(obj, datetime.datetime):
         # See "Date Time String Format" in the ECMA-262 specification.
         r = obj.isoformat()
@@ -226,7 +235,9 @@ def jsonify(obj, builtin_types=(int, float, string_t)):
     elif isinstance(obj, datetime.timedelta):
         return str(obj)
     else:
-        raise ValueError('Unsupported type: {0!r}'.format(type(obj)))
+        raise ValueError(
+            'Unsupported type: {0!r} {1!r} (parent: {2})'.format(
+                type(obj), obj, key))
 
 
 def gen_task_name(app, name, module_name):

+ 68 - 69
celery/worker/control.py

@@ -8,18 +8,15 @@
 """
 from __future__ import absolute_import
 
-import logging
 import tempfile
 
-from kombu.utils.encoding import safe_repr
-
 from celery.five import UserDict, items, StringIO
 from celery.platforms import signals as _signals
 from celery.utils import timeutils
 from celery.utils.log import get_logger
 from celery.utils import jsonify
 
-from . import state
+from . import state as worker_state
 from .state import revoked
 
 DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
@@ -36,15 +33,15 @@ class Panel(UserDict):
 
 
 @Panel.register
-def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
+def revoke(state, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""
     revoked.add(task_id)
     if terminate:
         signum = _signals.signum(signal or 'TERM')
-        for request in state.reserved_requests:
+        for request in worker_state.reserved_requests:
             if request.id == task_id:
                 logger.info('Terminating %s (%s)', task_id, signum)
-                request.terminate(panel.consumer.pool, signal=signum)
+                request.terminate(state.consumer.pool, signal=signum)
                 break
         else:
             return {'ok': 'terminate: task {0} not found'.format(task_id)}
@@ -55,13 +52,13 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
 
 
 @Panel.register
-def report(panel):
-    return {'ok': panel.app.bugreport()}
+def report(state):
+    return {'ok': state.app.bugreport()}
 
 
 @Panel.register
-def enable_events(panel):
-    dispatcher = panel.consumer.event_dispatcher
+def enable_events(state):
+    dispatcher = state.consumer.event_dispatcher
     if 'task' not in dispatcher.groups:
         dispatcher.groups.add('task')
         logger.info('Events of group {task} enabled by remote.')
@@ -70,8 +67,8 @@ def enable_events(panel):
 
 
 @Panel.register
-def disable_events(panel):
-    dispatcher = panel.consumer.event_dispatcher
+def disable_events(state):
+    dispatcher = state.consumer.event_dispatcher
     if 'task' in dispatcher.groups:
         dispatcher.groups.discard('task')
         logger.info('Events of group {task} disabled by remote.')
@@ -80,14 +77,14 @@ def disable_events(panel):
 
 
 @Panel.register
-def heartbeat(panel):
+def heartbeat(state):
     logger.debug('Heartbeat requested by remote.')
-    dispatcher = panel.consumer.event_dispatcher
-    dispatcher.send('worker-heartbeat', freq=5, **state.SOFTWARE_INFO)
+    dispatcher = state.consumer.event_dispatcher
+    dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
 
 
 @Panel.register
-def rate_limit(panel, task_name, rate_limit, **kwargs):
+def rate_limit(state, task_name, rate_limit, **kwargs):
     """Set new rate limit for a task type.
 
     See :attr:`celery.task.base.Task.rate_limit`.
@@ -103,13 +100,13 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
         return {'error': 'Invalid rate limit string: {0!r}'.format(exc)}
 
     try:
-        panel.app.tasks[task_name].rate_limit = rate_limit
+        state.app.tasks[task_name].rate_limit = rate_limit
     except KeyError:
         logger.error('Rate limit attempt for unknown task %s',
                      task_name, exc_info=True)
         return {'error': 'unknown task'}
 
-    panel.consumer.reset_rate_limits()
+    state.consumer.reset_rate_limits()
 
     if not rate_limit:
         logger.info('Rate limits disabled for tasks of type %s', task_name)
@@ -121,9 +118,9 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
 
 
 @Panel.register
-def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
+def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
     try:
-        task = panel.app.tasks[task_name]
+        task = state.app.tasks[task_name]
     except KeyError:
         logger.error('Change time limit attempt for unknown task %s',
                      task_name, exc_info=True)
@@ -138,9 +135,9 @@ def time_limit(panel, task_name=None, hard=None, soft=None, **kwargs):
 
 
 @Panel.register
-def dump_schedule(panel, safe=False, **kwargs):
+def dump_schedule(state, safe=False, **kwargs):
     from celery.worker.job import Request
-    schedule = panel.consumer.timer.schedule
+    schedule = state.consumer.timer.schedule
     if not schedule.queue:
         return []
 
@@ -155,29 +152,26 @@ def dump_schedule(panel, safe=False, **kwargs):
 
 
 @Panel.register
-def dump_reserved(panel, safe=False, **kwargs):
-    reserved = state.reserved_requests - state.active_requests
+def dump_reserved(state, safe=False, **kwargs):
+    reserved = worker_state.reserved_requests - worker_state.active_requests
     if not reserved:
-        logger.debug('--Empty queue--')
         return []
-    if logger.isEnabledFor(logging.DEBUG):
-        logger.debug('* Dump of currently reserved tasks:\n%s',
-                     '\n'.join(safe_repr(id) for id in reserved))
     return [request.info(safe=safe) for request in reserved]
 
 
 @Panel.register
-def dump_active(panel, safe=False, **kwargs):
-    return [request.info(safe=safe) for request in state.active_requests]
+def dump_active(state, safe=False, **kwargs):
+    return [request.info(safe=safe)
+            for request in worker_state.active_requests]
 
 
 @Panel.register
-def stats(panel, **kwargs):
-    return panel.consumer.controller.stats()
+def stats(state, **kwargs):
+    return state.consumer.controller.stats()
 
 
 @Panel.register
-def objgraph(panel, num=200, max_depth=10, ):
+def objgraph(state, num=200, max_depth=10, ):  # pragma: no cover
     try:
         import objgraph
     except ImportError:
@@ -194,13 +188,13 @@ def objgraph(panel, num=200, max_depth=10, ):
 
 
 @Panel.register
-def memsample(panel, **kwargs):
+def memsample(state, **kwargs):  # pragma: no cover
     from celery.utils.debug import sample_mem
     return sample_mem()
 
 
 @Panel.register
-def memdump(panel, samples=10, **kwargs):
+def memdump(state, samples=10, **kwargs):  # pragma: no cover
     from celery.utils.debug import memdump
     out = StringIO()
     memdump(file=out)
@@ -208,23 +202,24 @@ def memdump(panel, samples=10, **kwargs):
 
 
 @Panel.register
-def clock(panel, **kwargs):
-    return {'clock': panel.app.clock.value}
+def clock(state, **kwargs):
+    return {'clock': state.app.clock.value}
 
 
 @Panel.register
-def dump_revoked(panel, **kwargs):
-    return list(state.revoked)
+def dump_revoked(state, **kwargs):
+    return list(worker_state.revoked)
 
 
 @Panel.register
-def hello(panel, **kwargs):
-    return {'revoked': state.revoked._data, 'clock': panel.app.clock.forward()}
+def hello(state, **kwargs):
+    return {'revoked': worker_state.revoked._data,
+            'clock': state.app.clock.forward()}
 
 
 @Panel.register
-def dump_tasks(panel, taskinfoitems=None, **kwargs):
-    tasks = panel.app.tasks
+def dump_tasks(state, taskinfoitems=None, **kwargs):
+    tasks = state.app.tasks
     taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
 
     def _extract_info(task):
@@ -240,40 +235,40 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
 
 
 @Panel.register
-def ping(panel, **kwargs):
+def ping(state, **kwargs):
     return {'ok': 'pong'}
 
 
 @Panel.register
-def pool_grow(panel, n=1, **kwargs):
-    if panel.consumer.controller.autoscaler:
-        panel.consumer.controller.autoscaler.force_scale_up(n)
+def pool_grow(state, n=1, **kwargs):
+    if state.consumer.controller.autoscaler:
+        state.consumer.controller.autoscaler.force_scale_up(n)
     else:
-        panel.consumer.pool.grow(n)
+        state.consumer.pool.grow(n)
     return {'ok': 'spawned worker processes'}
 
 
 @Panel.register
-def pool_shrink(panel, n=1, **kwargs):
-    if panel.consumer.controller.autoscaler:
-        panel.consumer.controller.autoscaler.force_scale_down(n)
+def pool_shrink(state, n=1, **kwargs):
+    if state.consumer.controller.autoscaler:
+        state.consumer.controller.autoscaler.force_scale_down(n)
     else:
-        panel.consumer.pool.shrink(n)
+        state.consumer.pool.shrink(n)
     return {'ok': 'terminated worker processes'}
 
 
 @Panel.register
-def pool_restart(panel, modules=None, reload=False, reloader=None, **kwargs):
-    if panel.app.conf.CELERYD_POOL_RESTARTS:
-        panel.consumer.controller.reload(modules, reload, reloader=reloader)
+def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
+    if state.app.conf.CELERYD_POOL_RESTARTS:
+        state.consumer.controller.reload(modules, reload, reloader=reloader)
         return {'ok': 'reload started'}
     else:
         raise ValueError('Pool restarts not enabled')
 
 
 @Panel.register
-def autoscale(panel, max=None, min=None):
-    autoscaler = panel.consumer.controller.autoscaler
+def autoscale(state, max=None, min=None):
+    autoscaler = state.consumer.controller.autoscaler
     if autoscaler:
         max_, min_ = autoscaler.update(max, min)
         return {'ok': 'autoscale now min={0} max={1}'.format(max_, min_)}
@@ -281,37 +276,41 @@ def autoscale(panel, max=None, min=None):
 
 
 @Panel.register
-def shutdown(panel, msg='Got shutdown from remote', **kwargs):
+def shutdown(state, msg='Got shutdown from remote', **kwargs):
     logger.warning(msg)
     raise SystemExit(msg)
 
 
 @Panel.register
-def add_consumer(panel, queue, exchange=None, exchange_type=None,
+def add_consumer(state, queue, exchange=None, exchange_type=None,
                  routing_key=None, **options):
-    panel.consumer.add_task_queue(queue, exchange, exchange_type,
+    state.consumer.add_task_queue(queue, exchange, exchange_type,
                                   routing_key, **options)
     return {'ok': 'add consumer {0}'.format(queue)}
 
 
 @Panel.register
-def cancel_consumer(panel, queue=None, **_):
-    panel.consumer.cancel_task_queue(queue)
+def cancel_consumer(state, queue=None, **_):
+    state.consumer.cancel_task_queue(queue)
     return {'ok': 'no longer consuming from {0}'.format(queue)}
 
 
 @Panel.register
-def active_queues(panel):
+def active_queues(state):
     """Returns the queues associated with each worker."""
     return [dict(queue.as_dict(recurse=True))
-            for queue in panel.consumer.task_consumer.queues]
+            for queue in state.consumer.task_consumer.queues]
+
+
+def _wanted_config_key(key):
+    return key.isupper() and not key.startswith('__')
 
 
 @Panel.register
-def dump_conf(panel, **kwargs):
-    return jsonify(dict(panel.app.conf))
+def dump_conf(state, **kwargs):
+    return jsonify(dict(state.app.conf), keyfilter=_wanted_config_key)
 
 
 @Panel.register
-def election(panel, id, topic, action=None, **kwargs):
-    panel.consumer.gossip.election(id, topic, action)
+def election(state, id, topic, action=None, **kwargs):
+    state.consumer.gossip.election(id, topic, action)

+ 0 - 2
celery/worker/hub.py

@@ -8,8 +8,6 @@
 """
 from __future__ import absolute_import
 
-from functools import wraps
-
 from kombu.utils import cached_property
 from kombu.utils import eventio
 from kombu.utils.eventio import READ, WRITE, ERR

+ 2 - 2
docs/history/changelog-1.0.rst

@@ -283,8 +283,8 @@ Remote control commands
         from celery.task.control import Panel
 
         @Panel.register
-        def reset_broker_connection(panel, **kwargs):
-            panel.consumer.reset_connection()
+        def reset_broker_connection(state, **kwargs):
+            state.consumer.reset_connection()
             return {"ok": "connection re-established"}
 
     With this module imported in the worker, you can launch the command

+ 2 - 2
docs/userguide/workers.rst

@@ -836,6 +836,6 @@ Here's an example control command that restarts the broker connection:
     from celery.worker.control import Panel
 
     @Panel.register
-    def reset_connection(panel):
-        panel.consumer.reset_connection()
+    def reset_connection(state):
+        state.consumer.reset_connection()
         return {'ok': 'connection reset'}