Browse Source

Remote control query task used by stresstests

Ask Solem 11 years ago
parent
commit
de6c096745

+ 5 - 2
celery/app/builtins.py

@@ -292,10 +292,13 @@ def add_chain_task(app):
             tasks[0].apply_async()
             return result
 
-        def apply(self, args=(), kwargs={}, signature=maybe_signature, **options):
+        def apply(self, args=(), kwargs={}, signature=maybe_signature,
+                  **options):
             last, fargs = None, args  # fargs passed to first task only
             for task in kwargs['tasks']:
-                res = signature(task).clone(fargs).apply(last and (last.get(), ))
+                res = signature(task).clone(fargs).apply(
+                    last and (last.get(), ),
+                )
                 res.parent, last, fargs = last, res, None
             return last
     return Chain

+ 3 - 0
celery/app/control.py

@@ -101,6 +101,9 @@ class Inspect(object):
     def active_queues(self):
         return self._request('active_queues')
 
+    def query_task(self, ids):
+        return self._request('query_task', ids=ids)
+
     def conf(self):
         return self._request('dump_conf')
 

+ 21 - 1
celery/worker/control.py

@@ -39,7 +39,7 @@ class Panel(UserDict):
 
 def _find_requests_by_id(ids, requests):
     found, total = 0, len(ids)
-    for request in worker_state.reserved_requests:
+    for request in requests:
         if request.id in ids:
             yield request
             found += 1
@@ -47,6 +47,26 @@ def _find_requests_by_id(ids, requests):
                 break
 
 
+@Panel.register
+def query_task(state, ids, **kwargs):
+    ids = maybe_list(ids)
+
+    def reqinfo(state, req):
+        return state, req.info()
+
+    reqs = dict((req.id, ('reserved', req.info()))
+                for req in _find_requests_by_id(
+                    ids, worker_state.reserved_requests))
+    reqs.update(dict(
+        (req.id, ('active', req.info()))
+        for req in _find_requests_by_id(
+            ids, worker_state.active_requests,
+        )
+    ))
+
+    return req
+
+
 @Panel.register
 def revoke(state, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""

+ 1 - 1
funtests/benchmarks/bench_worker.py

@@ -83,7 +83,7 @@ def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'):
     if loglevel:
         app.log.setup_logging_subsystem(loglevel=loglevel)
     worker = app.WorkController(concurrency=15,
-                                   queues=['bench.worker'])
+                                queues=['bench.worker'])
 
     try:
         print('STARTING WORKER')

+ 3 - 0
funtests/stress/stress/__main__.py

@@ -32,6 +32,9 @@ class Stress(Command):
                    help='Number of times to repeat the test suite'),
             Option('-g', '--group', default='all',
                    help='Specify test group (all|green)'),
+            Option('--diag', default=False, action='store_true',
+                   help='Enable diagnostics (slow)',
+            )
         )
 
 

+ 1 - 2
funtests/stress/stress/data.py

@@ -2,7 +2,6 @@
 from __future__ import absolute_import
 
 import json
-import sys
 
 from celery.utils.debug import humanbytes
 from celery.utils.imports import qualname
@@ -60,7 +59,7 @@ class JSONEncoder(json.JSONEncoder):
 
 def decode_hook(d):
     try:
-        meta = d['py/obj']
+        d = d['py/obj']
     except KeyError:
         return d
     type_registry[d['type']](**d['attrs'])

+ 67 - 0
funtests/stress/stress/fbi.py

@@ -0,0 +1,67 @@
+from __future__ import absolute_import, print_function
+
+import socket
+import sys
+
+from contextlib import contextmanager
+
+from celery import states
+
+
+class FBI(object):
+
+    def __init__(self, app):
+        self.app = app
+        self.receiver = None
+        self.state = self.app.events.State()
+        self.connection = None
+        self.enabled = False
+
+    def enable(self, enabled):
+        self.enabled = enabled
+
+    @contextmanager
+    def investigation(self):
+        if self.enabled:
+            with self.app.connection() as conn:
+                receiver = self.app.events.Receiver(
+                    conn, handlers={'*': self.state.event},
+                )
+                with receiver.consumer_context() as (conn, _, _):
+                    self.connection = conn
+                    try:
+                        yield self
+                    finally:
+                        self.ffwd()
+        else:
+            yield
+
+    def ffwd(self):
+        while 1:
+            try:
+                self.connection.drain_events(timeout=1)
+            except socket.error:
+                break
+
+    def state_of(self, tid):
+        try:
+            task = self.state.tasks[tid]
+        except KeyError:
+            return 'No events for {0}'.format(tid)
+
+        if task.state in states.READY_STATES:
+            return 'Task {0.uuid} completed with {0.state}'.format(task)
+        elif task.state in states.UNREADY_STATES:
+            return 'Task {0.uuid} waiting in {0.state} state'.format(task)
+        else:
+            return 'Task {0.uuid} in other state {0.state}'.format(task)
+
+    def query(self, ids):
+        return self.app.control.inspect().query_task(id)
+
+    def diag(self, ids, file=sys.stderr):
+        if self.enabled:
+            self.ffwd()
+            for tid in ids:
+                print(self.state_of(tid), file=file)
+            #print(self.query(ids), file=file)

+ 48 - 38
funtests/stress/stress/suite.py

@@ -23,6 +23,7 @@ from .app import (
     sleeping_ignore_limits, segfault,
 )
 from .data import BIG, SMALL
+from .fbi import FBI
 
 BANNER = """\
 Celery stress-suite v{version}
@@ -90,6 +91,7 @@ class Suite(object):
         self.block_timeout = block_timeout
         self.progress = None
         self.speaker = Speaker()
+        self.fbi = FBI(app)
 
         self.groups = {
             'all': testgroup(
@@ -114,11 +116,15 @@ class Suite(object):
         }
 
     def run(self, names=None, iterations=50, offset=0,
-            numtests=None, list_all=False, repeat=0, group='all', **kw):
+            numtests=None, list_all=False, repeat=0, group='all',
+            diag=False, **kw):
+        self.fbi.enable(diag)
         tests = self.filtertests(group, names)[offset:numtests or None]
         if list_all:
             return print(self.testlist(tests))
         print(self.banner(tests))
+        print('+ Enabling events')
+        self.app.control.enable_events()
         it = count() if repeat == Inf else range(int(repeat) or 1)
         for i in it:
             marker(
@@ -159,46 +165,48 @@ class Suite(object):
         )
 
     def manyshort(self):
-        self.join(group(add.s(i, i) for i in range(1000))(), timeout=10, propagate=True)
+        self.join(group(add.s(i, i) for i in range(1000))(),
+                  timeout=10, propagate=True)
 
     def runtest(self, fun, n=50, index=0, repeats=1):
         print('{0}: [[[{1}({2})]]]'.format(repeats, fun.__name__, n))
         with blockdetection(self.block_timeout):
-            runtime = elapsed = monotonic()
-            i = 0
-            failed = False
-            self.progress = Progress(
-                fun, i, n, index, repeats, elapsed, runtime, 0,
-            )
-            _marker.delay(pstatus(self.progress))
-            try:
-                for i in range(n):
-                    runtime = monotonic()
-                    self.progress = Progress(
-                        fun, i + 1, n, index, repeats, runtime, elapsed, 0,
-                    )
-                    try:
-                        fun()
-                    except StopSuite:
-                        raise
-                    except Exception as exc:
-                        print('-> {0!r}'.format(exc))
-                        print(pstatus(self.progress))
-                    else:
-                        print(pstatus(self.progress))
-            except Exception:
-                failed = True
-                self.speaker.beep()
-                raise
-            finally:
-                print('{0} {1} iterations in {2}s'.format(
-                    'failed after' if failed else 'completed',
-                    i + 1, humanize_seconds(monotonic() - elapsed),
-                ))
-                if not failed:
-                    self.progress = Progress(
-                        fun, i + 1, n, index, repeats, runtime, elapsed, 1,
-                    )
+            with self.fbi.investigation():
+                runtime = elapsed = monotonic()
+                i = 0
+                failed = False
+                self.progress = Progress(
+                    fun, i, n, index, repeats, elapsed, runtime, 0,
+                )
+                _marker.delay(pstatus(self.progress))
+                try:
+                    for i in range(n):
+                        runtime = monotonic()
+                        self.progress = Progress(
+                            fun, i + 1, n, index, repeats, runtime, elapsed, 0,
+                        )
+                        try:
+                            fun()
+                        except StopSuite:
+                            raise
+                        except Exception as exc:
+                            print('-> {0!r}'.format(exc))
+                            print(pstatus(self.progress))
+                        else:
+                            print(pstatus(self.progress))
+                except Exception:
+                    failed = True
+                    self.speaker.beep()
+                    raise
+                finally:
+                    print('{0} {1} iterations in {2}s'.format(
+                        'failed after' if failed else 'completed',
+                        i + 1, humanize_seconds(monotonic() - elapsed),
+                    ))
+                    if not failed:
+                        self.progress = Progress(
+                            fun, i + 1, n, index, repeats, runtime, elapsed, 1,
+                        )
 
     def termbysig(self):
         self._evil_groupmember(kill)
@@ -272,8 +280,10 @@ class Suite(object):
                 self.speaker.beep()
                 marker(
                     'Still waiting for {0}/{1}: [{2}]: {3!r}'.format(
-                        len(received), len(r), ','.join(waiting_for), exc), '!',
+                        len(received), len(r),
+                        ','.join(waiting_for), exc), '!',
                 )
+                self.fbi.diag(waiting_for)
             except self.connerrors as exc:
                 self.speaker.beep()
                 marker('join: connection lost: {0!r}'.format(exc), '!')