Browse Source

Improve worker benchmark

Ask Solem 13 years ago
parent
commit
f75e9de9a0
2 changed files with 72 additions and 42 deletions
  1. 0 42
      funtests/bench/worker.py
  2. 72 0
      funtests/benchmarks/bench_worker.py

+ 0 - 42
funtests/bench/worker.py

@@ -1,42 +0,0 @@
-import time
-
-from celery import Celery
-
-celery = Celery()
-celery.conf.update(BROKER_TRANSPORT="memory",
-                   BROKER_POOL_LIMIT=1,
-                   CELERY_PREFETCH_MULTIPLIER=0,
-                   CELERY_DISABLE_RATE_LIMITS=True,
-                   CELERY_BACKEND=None)
-
-
-def bench_consumer(n=10000):
-    from celery.worker import WorkController
-    from celery.worker import state
-
-    worker = WorkController(app=celery, pool_cls="solo")
-    time_start = [None]
-
-    @celery.task()
-    def it(i):
-        if not i:
-            time_start[0] = time.time()
-        elif i == n - 1:
-            print(time.time() - time_start[0])
-
-    @celery.task()
-    def shutdown_worker():
-        raise SystemExit()
-
-    for i in xrange(n):
-        it.delay(i)
-    shutdown_worker.delay()
-
-    try:
-        worker.start()
-    except SystemExit:
-        assert sum(state.total_count.values()) == n + 1
-
-
-if __name__ == "__main__":
-    bench_consumer()

+ 72 - 0
funtests/benchmarks/bench_worker.py

@@ -0,0 +1,72 @@
+import os
+import sys
+import time
+
+from celery import Celery
+
+celery = Celery(__name__)
+celery.conf.update(BROKER_TRANSPORT="amqp",
+                   BROKER_POOL_LIMIT=10,
+                   CELERY_PREFETCH_MULTIPLIER=0,
+                   CELERY_DISABLE_RATE_LIMITS=True,
+                   CELERY_BACKEND=None)
+
+
+@celery.task()
+def shutdown_worker():
+    print("SHUTTING DOWN")
+    raise SystemExit()
+
+
+@celery.task(cur=0, time_start=None, queue="bench.worker")
+def it(_, n):
+    i = it.cur  # use internal counter, as ordering can be skewed
+                # by previous runs, or the broker.
+    if i and not i % 1000:
+        print >> sys.stderr, i
+    if not i:
+        it.time_start = time.time()
+    elif i == n - 1:
+        print("consume: %s" % (time.time() - it.time_start, ))
+        shutdown_worker.delay()
+    it.cur += 1
+
+
+def bench_apply(n=50000):
+    time_start = time.time()
+    celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
+    print("apply: %s" % (time.time() - time_start, ))
+
+
+def bench_consume(n=50000):
+    from celery.worker import WorkController
+    from celery.worker import state
+
+    import logging
+    #celery.log.setup_logging_subsystem(loglevel=logging.DEBUG)
+    worker = celery.WorkController(pool_cls="solo",
+                                   queues=["bench.worker"])
+
+    try:
+        print("STARTING WORKER")
+        worker.start()
+    except SystemExit:
+        assert sum(state.total_count.values()) == n + 1
+
+
+def bench_both(n=50000):
+    bench_apply(n)
+    bench_consumer(n)
+
+
+def main(argv=sys.argv):
+    if len(argv) < 2:
+        print("Usage: %s [apply|consume|both]" % (os.path.basename(argv[0]), ))
+        return sys.exit(1)
+    return {"apply": bench_apply,
+            "consume": bench_consume,
+            "both": bench_both}[argv[1]]()
+
+
+if __name__ == "__main__":
+    main()