Ask Solem пре 9 година
родитељ
комит
7d545d8906
2 измењених фајлова са 43 додато и 34 уклоњено
  1. 30 23
      celery/worker/control.py
  2. 13 11
      celery/worker/request.py

+ 30 - 23
celery/worker/control.py

@@ -31,6 +31,14 @@ DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
 logger = get_logger(__name__)
 
 
+def ok(value):
+    return {'ok': value}
+
+
+def nok(value):
+    return {'error': value}
+
+
 class Panel(UserDict):
     data = dict()  # Global registry.
 
@@ -90,17 +98,17 @@ def revoke(state, task_id, terminate=False, signal=None, **kwargs):
                     break
 
         if not terminated:
-            return {'ok': 'terminate: tasks unknown'}
-        return {'ok': 'terminate: {0}'.format(', '.join(terminated))}
+            return ok('terminate: tasks unknown')
+        return ok('terminate: {0}'.format(', '.join(terminated)))
 
     idstr = ', '.join(task_ids)
     logger.info('Tasks flagged as revoked: %s', idstr)
-    return {'ok': 'tasks {0} flagged as revoked'.format(idstr)}
+    return ok('tasks {0} flagged as revoked'.format(idstr))
 
 
 @Panel.register
 def report(state):
-    return {'ok': state.app.bugreport()}
+    return ok(state.app.bugreport())
 
 
 @Panel.register
@@ -109,8 +117,8 @@ def enable_events(state):
     if dispatcher.groups and 'task' not in dispatcher.groups:
         dispatcher.groups.add('task')
         logger.info('Events of group {task} enabled by remote.')
-        return {'ok': 'task events enabled'}
-    return {'ok': 'task events already enabled'}
+        return ok('task events enabled')
+    return ok('task events already enabled')
 
 
 @Panel.register
@@ -119,8 +127,8 @@ def disable_events(state):
     if 'task' in dispatcher.groups:
         dispatcher.groups.discard('task')
         logger.info('Events of group {task} disabled by remote.')
-        return {'ok': 'task events disabled'}
-    return {'ok': 'task events already disabled'}
+        return ok('task events disabled')
+    return ok('task events already disabled')
 
 
 @Panel.register
@@ -144,24 +152,24 @@ def rate_limit(state, task_name, rate_limit, **kwargs):
     try:
         timeutils.rate(rate_limit)
     except ValueError as exc:
-        return {'error': 'Invalid rate limit string: {0!r}'.format(exc)}
+        return nok('Invalid rate limit string: {0!r}'.format(exc))
 
     try:
         state.app.tasks[task_name].rate_limit = rate_limit
     except KeyError:
         logger.error('Rate limit attempt for unknown task %s',
                      task_name, exc_info=True)
-        return {'error': 'unknown task'}
+        return nok('unknown task')
 
     state.consumer.reset_rate_limits()
 
     if not rate_limit:
         logger.info('Rate limits disabled for tasks of type %s', task_name)
-        return {'ok': 'rate limit disabled successfully'}
+        return ok('rate limit disabled successfully')
 
     logger.info('New rate limit for tasks of type %s: %s.',
                 task_name, rate_limit)
-    return {'ok': 'new rate limit set successfully'}
+    return ok('new rate limit set successfully')
 
 
 @Panel.register
@@ -171,14 +179,14 @@ def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
     except KeyError:
         logger.error('Change time limit attempt for unknown task %s',
                      task_name, exc_info=True)
-        return {'error': 'unknown task'}
+        return nok('unknown task')
 
     task.soft_time_limit = soft
     task.time_limit = hard
 
     logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
                 task_name, soft, hard)
-    return {'ok': 'time limits set successfully'}
+    return ok('time limits set successfully')
 
 
 @Panel.register
@@ -295,7 +303,7 @@ def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
 
 @Panel.register
 def ping(state, **kwargs):
-    return {'ok': 'pong'}
+    return ok('pong')
 
 
 @Panel.register
@@ -305,7 +313,7 @@ def pool_grow(state, n=1, **kwargs):
     else:
         state.consumer.pool.grow(n)
         state.consumer._update_prefetch_count(n)
-    return {'ok': 'pool will grow'}
+    return ok('pool will grow')
 
 
 @Panel.register
@@ -315,14 +323,14 @@ def pool_shrink(state, n=1, **kwargs):
     else:
         state.consumer.pool.shrink(n)
         state.consumer._update_prefetch_count(-n)
-    return {'ok': 'pool will shrink'}
+    return ok('pool will shrink')
 
 
 @Panel.register
 def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
     if state.app.conf.worker_pool_restarts:
         state.consumer.controller.reload(modules, reload, reloader=reloader)
-        return {'ok': 'reload started'}
+        return ok('reload started')
     else:
         raise ValueError('Pool restarts not enabled')
 
@@ -332,7 +340,7 @@ def autoscale(state, max=None, min=None):
     autoscaler = state.consumer.controller.autoscaler
     if autoscaler:
         max_, min_ = autoscaler.update(max, min)
-        return {'ok': 'autoscale now min={0} max={1}'.format(max_, min_)}
+        return ok('autoscale now min={0} max={1}'.format(max_, min_))
     raise ValueError('Autoscale not enabled')
 
 
@@ -349,7 +357,7 @@ def add_consumer(state, queue, exchange=None, exchange_type=None,
         state.consumer.add_task_queue,
         queue, exchange, exchange_type, routing_key, **options
     )
-    return {'ok': 'add consumer {0}'.format(queue)}
+    return ok('add consumer {0}'.format(queue))
 
 
 @Panel.register
@@ -357,7 +365,7 @@ def cancel_consumer(state, queue=None, **_):
     state.consumer.call_soon(
         state.consumer.cancel_task_queue, queue,
     )
-    return {'ok': 'no longer consuming from {0}'.format(queue)}
+    return ok('no longer consuming from {0}'.format(queue))
 
 
 @Panel.register
@@ -370,8 +378,7 @@ def active_queues(state):
 
 
 def _wanted_config_key(key):
-    return (isinstance(key, string_t) and
-            not key.startswith('__'))
+    return isinstance(key, string_t) and not key.startswith('__')
 
 
 @Panel.register

+ 13 - 11
celery/worker/request.py

@@ -394,17 +394,19 @@ class Request(object):
             self.send_event('task-rejected', requeue=requeue)
 
     def info(self, safe=False):
-        return {'id': self.id,
-                'name': self.name,
-                'args': self.argsrepr,
-                'kwargs': self.kwargsrepr,
-                'type': self.type,
-                'body': self.body,
-                'hostname': self.hostname,
-                'time_start': self.time_start,
-                'acknowledged': self.acknowledged,
-                'delivery_info': self.delivery_info,
-                'worker_pid': self.worker_pid}
+        return {
+            'id': self.id,
+            'name': self.name,
+            'args': self.argsrepr,
+            'kwargs': self.kwargsrepr,
+            'type': self.type,
+            'body': self.body,
+            'hostname': self.hostname,
+            'time_start': self.time_start,
+            'acknowledged': self.acknowledged,
+            'delivery_info': self.delivery_info,
+            'worker_pid': self.worker_pid,
+        }
 
     def __str__(self):
         return ' '.join([