Ver Fonte

Reorganizes celery.worker.control module by topic

Ask Solem há 8 anos atrás
pai
commit
e5f7eca9f5
1 ficheiros alterados com 138 adições e 117 exclusões
  1. 138 117
      celery/worker/control.py

+ 138 - 117
celery/worker/control.py

@@ -27,6 +27,7 @@ from .request import Request
 from .state import revoked
 
 __all__ = ['Panel']
+
 DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
 logger = get_logger(__name__)
 
@@ -40,7 +41,7 @@ def nok(value):
 
 
 class Panel(UserDict):
-    data = dict()  # Global registry.
+    data = dict()  # global registry.
 
     @classmethod
     def register(cls, method, name=None):
@@ -48,6 +49,34 @@ class Panel(UserDict):
         return method
 
 
+# -- App
+
+@Panel.register
+def report(state):
+    return ok(state.app.bugreport())
+
+
+@Panel.register
+def dump_conf(state, with_defaults=False, **kwargs):
+    return jsonify(state.app.conf.table(with_defaults=with_defaults),
+                   keyfilter=_wanted_config_key,
+                   unknown_type_filter=safe_repr)
+
+
+def _wanted_config_key(key):
+    return isinstance(key, string_t) and not key.startswith('__')
+
+
+# -- Task
+
+@Panel.register
+def query_task(state, ids, **kwargs):
+    return {
+        req.id: (_state_of_task(req), req.info())
+        for req in _find_requests_by_id(maybe_list(ids))
+    }
+
+
 def _find_requests_by_id(ids,
                          get_request=worker_state.requests.__getitem__):
     for task_id in ids:
@@ -67,14 +96,6 @@ def _state_of_task(request,
     return 'ready'
 
 
-@Panel.register
-def query_task(state, ids, **kwargs):
-    return {
-        req.id: (_state_of_task(req), req.info())
-        for req in _find_requests_by_id(maybe_list(ids))
-    }
-
-
 @Panel.register
 def revoke(state, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""
@@ -103,38 +124,6 @@ def revoke(state, task_id, terminate=False, signal=None, **kwargs):
     return ok('tasks {0} flagged as revoked'.format(idstr))
 
 
-@Panel.register
-def report(state):
-    return ok(state.app.bugreport())
-
-
-@Panel.register
-def enable_events(state):
-    dispatcher = state.consumer.event_dispatcher
-    if dispatcher.groups and 'task' not in dispatcher.groups:
-        dispatcher.groups.add('task')
-        logger.info('Events of group {task} enabled by remote.')
-        return ok('task events enabled')
-    return ok('task events already enabled')
-
-
-@Panel.register
-def disable_events(state):
-    dispatcher = state.consumer.event_dispatcher
-    if 'task' in dispatcher.groups:
-        dispatcher.groups.discard('task')
-        logger.info('Events of group {task} disabled by remote.')
-        return ok('task events disabled')
-    return ok('task events already disabled')
-
-
-@Panel.register
-def heartbeat(state):
-    logger.debug('Heartbeat requested by remote.')
-    dispatcher = state.consumer.event_dispatcher
-    dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
-
-
 @Panel.register
 def rate_limit(state, task_name, rate_limit, **kwargs):
     """Set new rate limit for a task type.
@@ -186,21 +175,89 @@ def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
     return ok('time limits set successfully')
 
 
+# -- Events
+
+
+@Panel.register
+def clock(state, **kwargs):
+    return {'clock': state.app.clock.value}
+
+
+@Panel.register
+def election(state, id, topic, action=None, **kwargs):
+    if state.consumer.gossip:
+        state.consumer.gossip.election(id, topic, action)
+
+
+@Panel.register
+def enable_events(state):
+    dispatcher = state.consumer.event_dispatcher
+    if dispatcher.groups and 'task' not in dispatcher.groups:
+        dispatcher.groups.add('task')
+        logger.info('Events of group {task} enabled by remote.')
+        return ok('task events enabled')
+    return ok('task events already enabled')
+
+
+@Panel.register
+def disable_events(state):
+    dispatcher = state.consumer.event_dispatcher
+    if 'task' in dispatcher.groups:
+        dispatcher.groups.discard('task')
+        logger.info('Events of group {task} disabled by remote.')
+        return ok('task events disabled')
+    return ok('task events already disabled')
+
+
+@Panel.register
+def heartbeat(state):
+    logger.debug('Heartbeat requested by remote.')
+    dispatcher = state.consumer.event_dispatcher
+    dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)
+
+
+# -- Worker
+
+@Panel.register
+def hello(state, from_node, revoked=None, **kwargs):
+    if from_node != state.hostname:
+        logger.info('sync with %s', from_node)
+        if revoked:
+            worker_state.revoked.update(revoked)
+        return {
+            'revoked': worker_state.revoked._data,
+            'clock': state.app.clock.forward(),
+        }
+
+
+@Panel.register
+def ping(state, **kwargs):
+    return ok('pong')
+
+
+@Panel.register
+def stats(state, **kwargs):
+    return state.consumer.controller.stats()
+
+
 @Panel.register
 def dump_schedule(state, safe=False, **kwargs):
+    return list(_iter_schedule_requests(state.consumer.timer, safe=safe))
+
 
-    def prepare_entries():
-        for waiting in state.consumer.timer.schedule.queue:
-            try:
-                arg0 = waiting.entry.args[0]
-            except (IndexError, TypeError):
-                continue
-            else:
-                if isinstance(arg0, Request):
-                    yield {'eta': arg0.eta.isoformat() if arg0.eta else None,
-                           'priority': waiting.priority,
-                           'request': arg0.info(safe=safe)}
-    return list(prepare_entries())
+def _iter_schedule_requests(timer, safe=False, Request=Request):
+    for waiting in timer.schedule.queue:
+        try:
+            arg0 = waiting.entry.args[0]
+        except (IndexError, TypeError):
+            continue
+        else:
+            if isinstance(arg0, Request):
+                yield {
+                    'eta': arg0.eta.isoformat() if arg0.eta else None,
+                    'priority': waiting.priority,
+                    'request': arg0.info(safe=safe),
+                }
 
 
 @Panel.register
@@ -221,9 +278,32 @@ def dump_active(state, safe=False, **kwargs):
 
 
 @Panel.register
-def stats(state, **kwargs):
-    return state.consumer.controller.stats()
+def dump_revoked(state, **kwargs):
+    return list(worker_state.revoked)
+
+
+@Panel.register
+def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
+    reg = state.app.tasks
+    taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
 
+    tasks = reg if builtins else (
+        task for task in reg if not task.startswith('celery.'))
+
+    def _extract_info(task):
+        fields = {
+            field: str(getattr(task, field, None)) for field in taskinfoitems
+            if getattr(task, field, None) is not None
+        }
+        if fields:
+            info = ['='.join(f) for f in items(fields)]
+            return '{0} [{1}]'.format(task.name, ' '.join(info))
+        return task.name
+
+    return [_extract_info(reg[task]) for task in sorted(tasks)]
+
+
+# -- Debugging
 
 @Panel.register
 def objgraph(state, num=200, max_depth=10, type='Request'):  # pragma: no cover
@@ -256,51 +336,7 @@ def memdump(state, samples=10, **kwargs):  # pragma: no cover
     memdump(file=out)
     return out.getvalue()
 
-
-@Panel.register
-def clock(state, **kwargs):
-    return {'clock': state.app.clock.value}
-
-
-@Panel.register
-def dump_revoked(state, **kwargs):
-    return list(worker_state.revoked)
-
-
-@Panel.register
-def hello(state, from_node, revoked=None, **kwargs):
-    if from_node != state.hostname:
-        logger.info('sync with %s', from_node)
-        if revoked:
-            worker_state.revoked.update(revoked)
-        return {'revoked': worker_state.revoked._data,
-                'clock': state.app.clock.forward()}
-
-
-@Panel.register
-def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
-    reg = state.app.tasks
-    taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS
-
-    tasks = reg if builtins else (
-        task for task in reg if not task.startswith('celery.'))
-
-    def _extract_info(task):
-        fields = {
-            field: str(getattr(task, field, None)) for field in taskinfoitems
-            if getattr(task, field, None) is not None
-        }
-        if fields:
-            info = ['='.join(f) for f in items(fields)]
-            return '{0} [{1}]'.format(task.name, ' '.join(info))
-        return task.name
-
-    return [_extract_info(reg[task]) for task in sorted(tasks)]
-
-
-@Panel.register
-def ping(state, **kwargs):
-    return ok('pong')
+# -- Pool
 
 
 @Panel.register
@@ -347,6 +383,8 @@ def shutdown(state, msg='Got shutdown from remote', **kwargs):
     raise WorkerShutdown(msg)
 
 
+# -- Queues
+
 @Panel.register
 def add_consumer(state, queue, exchange=None, exchange_type=None,
                  routing_key=None, **options):
@@ -372,20 +410,3 @@ def active_queues(state):
         return [dict(queue.as_dict(recurse=True))
                 for queue in state.consumer.task_consumer.queues]
     return []
-
-
-def _wanted_config_key(key):
-    return isinstance(key, string_t) and not key.startswith('__')
-
-
-@Panel.register
-def dump_conf(state, with_defaults=False, **kwargs):
-    return jsonify(state.app.conf.table(with_defaults=with_defaults),
-                   keyfilter=_wanted_config_key,
-                   unknown_type_filter=safe_repr)
-
-
-@Panel.register
-def election(state, id, topic, action=None, **kwargs):
-    if state.consumer.gossip:
-        state.consumer.gossip.election(id, topic, action)