Переглянути джерело

Stresstests: CSTRESS_TRANS enables transient messages

Ask Solem 10 роки тому
батько
коміт
7ced601d88
2 змінених файлів з 21 додано та 12 видалено
  1. 13 10
      funtests/benchmarks/bench_worker.py
  2. 8 2
      funtests/stress/stress/templates.py

+ 13 - 10
funtests/benchmarks/bench_worker.py

@@ -2,7 +2,6 @@ from __future__ import print_function, unicode_literals
 
 import os
 import sys
-import time
 
 os.environ.update(
     NOSETPS='yes',
@@ -11,6 +10,7 @@ os.environ.update(
 
 from celery import Celery, group
 from celery.five import range
+from kombu.five import monotonic
 
 DEFAULT_ITS = 40000
 
@@ -24,7 +24,6 @@ app.conf.update(
     BROKER_POOL_LIMIT=10,
     CELERYD_POOL='solo',
     CELERYD_PREFETCH_MULTIPLIER=0,
-    CELERY_DISABLE_RATE_LIMITS=True,
     CELERY_DEFAULT_DELIVERY_MODE=1,
     CELERY_QUEUES={
         'bench.worker': {
@@ -43,7 +42,7 @@ app.conf.update(
 
 
 def tdiff(then):
-    return time.time() - then
+    return monotonic() - then
 
 
 @app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
@@ -53,23 +52,27 @@ def it(_, n):
     i = it.cur
     if i and not i % 5000:
         print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
-        it.subt = time.time()
+        it.subt = monotonic()
     if not i:
-        it.subt = it.time_start = time.time()
-    elif i == n - 1:
+        it.subt = it.time_start = monotonic()
+    elif i > n - 2:
         total = tdiff(it.time_start)
         print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
         print('-- process {0} tasks: {1}s total, {2} tasks/s} '.format(
             n, total, n / (total + .0),
         ))
-        sys.exit()
+        import os
+        os._exit()
     it.cur += 1
 
 
 def bench_apply(n=DEFAULT_ITS):
-    time_start = time.time()
-    group(it.s(i, n) for i in range(n))()
-    print('-- apply {0} tasks: {1}s'.format(n, time.time() - time_start))
+    time_start = monotonic()
+    task = it._get_current_object()
+    with app.producer_or_acquire() as producer:
+        [task.apply_async((i, n), producer=producer) for i in range(n)]
+    #group(s(i, n) for i in range(n))()
+    print('-- apply {0} tasks: {1}s'.format(n, monotonic() - time_start))
 
 
 def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'):

+ 8 - 2
funtests/stress/stress/templates.py

@@ -6,7 +6,9 @@ from celery.five import items
 from kombu import Exchange, Queue
 from kombu.utils import symbol_by_name
 
-CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
+CSTRESS_TRANS = os.environ.get('CSTRESS_TRANS', False)
+default_queue = 'c.stress.trans' if CSTRESS_TRANS else 'c.stress'
+CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', default_queue)
 
 templates = {}
 
@@ -57,7 +59,9 @@ class default(object):
     CELERY_QUEUES = [
         Queue(CSTRESS_QUEUE,
               exchange=Exchange(CSTRESS_QUEUE),
-              routing_key=CSTRESS_QUEUE),
+              routing_key=CSTRESS_QUEUE,
+              durable=not CSTRESS_TRANS,
+              no_ack=CSTRESS_TRANS),
     ]
     CELERY_MAX_CACHED_RESULTS = -1
     BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
@@ -69,6 +73,8 @@ class default(object):
         'interval_step': 0.1,
     }
     CELERY_TASK_PROTOCOL = 2
+    if CSTRESS_TRANS:
+        CELERY_DEFAULT_DELIVERY_MODE = 1
 
 
 @template()