Prechádzať zdrojové kódy

Improve worker benchmark

Ask Solem 13 rokov pred
rodič
commit
1e64e96d0a
1 zmenil súbory, kde vykonal 23 pridanie a 12 odobranie
  1. 23 12
      funtests/benchmarks/bench_worker.py

+ 23 - 12
funtests/benchmarks/bench_worker.py

@@ -2,15 +2,28 @@ import os
 import sys
 import time
 
+#import anyjson
+#anyjson.force_implementation("cjson")
+
 from celery import Celery
 
-DEFAULT_ITS = 5000
+DEFAULT_ITS = 20000
 
 celery = Celery(__name__)
-celery.conf.update(BROKER_TRANSPORT="amqp",
+celery.conf.update(#BROKER_TRANSPORT="librabbitmq",
                    BROKER_POOL_LIMIT=10,
                    CELERY_PREFETCH_MULTIPLIER=0,
                    CELERY_DISABLE_RATE_LIMITS=True,
+                   #CELERY_DEFAULT_DELIVERY_MODE="transient",
+                   CELERY_QUEUES = {
+                       "bench.worker": {
+                           "exchange": "bench.worker",
+                           "routing_key": "bench.worker",
+                           #"no_ack": True,
+                        }
+                   },
+                   CELERY_TASK_SERIALIZER="json",
+                   CELERY_DEFAULT_QUEUE="bench.worker",
                    CELERY_BACKEND=None)
 
 
@@ -19,11 +32,11 @@ def it(_, n):
     i = it.cur  # use internal counter, as ordering can be skewed
                 # by previous runs, or the broker.
     if i and not i % 1000:
-        print >> sys.stderr, i
+        print >> sys.stderr, "(%s so far)"% (i, )
     if not i:
         it.time_start = time.time()
     elif i == n - 1:
-        print("consume: %s" % (time.time() - it.time_start, ))
+        print("-- process %s tasks: %ss" % (n, time.time() - it.time_start, ))
         sys.exit()
     it.cur += 1
 
@@ -31,16 +44,14 @@ def it(_, n):
 def bench_apply(n=DEFAULT_ITS):
     time_start = time.time()
     celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
-    print("apply: %s" % (time.time() - time_start, ))
+    print("-- apply %s tasks: %ss" % (n, time.time() - time_start, ))
 
 
-def bench_consume(n=DEFAULT_ITS):
+def bench_work(n=DEFAULT_ITS):
     from celery.worker import WorkController
     from celery.worker import state
 
-    #import logging
-    #celery.log.setup_logging_subsystem(loglevel=logging.DEBUG)
-    worker = celery.WorkController(pool_cls="solo",
+    worker = celery.WorkController(concurrency=15, #pool_cls="solo",
                                    queues=["bench.worker"])
 
     try:
@@ -52,15 +63,15 @@ def bench_consume(n=DEFAULT_ITS):
 
 def bench_both(n=DEFAULT_ITS):
     bench_apply(n)
-    bench_consumer(n)
+    bench_work(n)
 
 
 def main(argv=sys.argv):
     if len(argv) < 2:
-        print("Usage: %s [apply|consume|both]" % (os.path.basename(argv[0]), ))
+        print("Usage: %s [apply|work|both]" % (os.path.basename(argv[0]), ))
         return sys.exit(1)
     return {"apply": bench_apply,
-            "consume": bench_consume,
+            "work": bench_work,
             "both": bench_both}[argv[1]]()