|
@@ -68,28 +68,51 @@ def task_ready(request):
|
|
|
|
|
|
|
|
|
if os.environ.get("CELERY_BENCH"): # pragma: no cover
|
|
|
+ import atexit
|
|
|
+
|
|
|
from time import time
|
|
|
+ from billiard import current_process
|
|
|
|
|
|
all_count = 0
|
|
|
+ bench_first = None
|
|
|
bench_start = None
|
|
|
+ bench_last = None
|
|
|
bench_every = int(os.environ.get("CELERY_BENCH_EVERY", 1000))
|
|
|
+ bench_sample = []
|
|
|
__reserved = task_reserved
|
|
|
__ready = task_ready
|
|
|
|
|
|
+ if current_process()._name == 'MainProcess':
|
|
|
+ @atexit.register
|
|
|
+ def on_shutdown():
|
|
|
+ if bench_first is not None and bench_last is not None:
|
|
|
+ print("\n- Time spent in benchmark: %r" % (
|
|
|
+ bench_last - bench_first))
|
|
|
+ print("- Avg: %s" % (sum(bench_sample) / len(bench_sample)))
|
|
|
+
|
|
|
def task_reserved(request): # noqa
|
|
|
global bench_start
|
|
|
+ global bench_first
|
|
|
+ now = None
|
|
|
if bench_start is None:
|
|
|
- bench_start = time()
|
|
|
+ bench_start = now = time()
|
|
|
+ if bench_first is None:
|
|
|
+ bench_first = now
|
|
|
+
|
|
|
return __reserved(request)
|
|
|
|
|
|
def task_ready(request): # noqa
|
|
|
- global all_count, bench_start
|
|
|
+ global all_count
|
|
|
+ global bench_start
|
|
|
+ global bench_last
|
|
|
all_count += 1
|
|
|
if not all_count % bench_every:
|
|
|
- print("* Time spent processing %s tasks (since first "
|
|
|
- "task received): ~%.4fs\n" % (
|
|
|
- bench_every, time() - bench_start))
|
|
|
- bench_start = None
|
|
|
+ now = time()
|
|
|
+ diff = now - bench_start
|
|
|
+ print("- Time spent processing %s tasks (since first "
|
|
|
+ "task received): ~%.4fs\n" % (bench_every, diff))
|
|
|
+ bench_start, bench_last = None, now
|
|
|
+ bench_sample.append(diff)
|
|
|
|
|
|
return __ready(request)
|
|
|
|