|
@@ -4,6 +4,8 @@ import time
|
|
|
|
|
|
from celery import Celery
|
|
|
|
|
|
+DEFAULT_ITS = 5000
|
|
|
+
|
|
|
celery = Celery(__name__)
|
|
|
celery.conf.update(BROKER_TRANSPORT="amqp",
|
|
|
BROKER_POOL_LIMIT=10,
|
|
@@ -12,12 +14,6 @@ celery.conf.update(BROKER_TRANSPORT="amqp",
|
|
|
CELERY_BACKEND=None)
|
|
|
|
|
|
|
|
|
-@celery.task()
|
|
|
-def shutdown_worker():
|
|
|
- print("SHUTTING DOWN")
|
|
|
- raise SystemExit()
|
|
|
-
|
|
|
-
|
|
|
@celery.task(cur=0, time_start=None, queue="bench.worker")
|
|
|
def it(_, n):
|
|
|
i = it.cur # use internal counter, as ordering can be skewed
|
|
@@ -28,21 +24,21 @@ def it(_, n):
|
|
|
it.time_start = time.time()
|
|
|
elif i == n - 1:
|
|
|
print("consume: %s" % (time.time() - it.time_start, ))
|
|
|
- shutdown_worker.delay()
|
|
|
+ sys.exit()
|
|
|
it.cur += 1
|
|
|
|
|
|
|
|
|
-def bench_apply(n=50000):
|
|
|
+def bench_apply(n=DEFAULT_ITS):
|
|
|
time_start = time.time()
|
|
|
celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
|
|
|
print("apply: %s" % (time.time() - time_start, ))
|
|
|
|
|
|
|
|
|
-def bench_consume(n=50000):
|
|
|
+def bench_consume(n=DEFAULT_ITS):
|
|
|
from celery.worker import WorkController
|
|
|
from celery.worker import state
|
|
|
|
|
|
- import logging
|
|
|
+ #import logging
|
|
|
#celery.log.setup_logging_subsystem(loglevel=logging.DEBUG)
|
|
|
worker = celery.WorkController(pool_cls="solo",
|
|
|
queues=["bench.worker"])
|
|
@@ -54,7 +50,7 @@ def bench_consume(n=50000):
|
|
|
assert sum(state.total_count.values()) == n + 1
|
|
|
|
|
|
|
|
|
-def bench_both(n=50000):
|
|
|
+def bench_both(n=DEFAULT_ITS):
|
|
|
bench_apply(n)
|
|
|
bench_consumer(n)
|
|
|
|