import os import sys import time os.environ.update( NOSETPS='yes', USE_FAST_LOCALS='yes', ) import anyjson JSONIMP = os.environ.get('JSONIMP') if JSONIMP: anyjson.force_implementation(JSONIMP) print('anyjson implementation: %r' % (anyjson.implementation.name, )) from celery import Celery, group DEFAULT_ITS = 40000 BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq') if hasattr(sys, 'pypy_version_info'): BROKER_TRANSPORT = 'pyamqp' celery = Celery(__name__) celery.conf.update(BROKER_TRANSPORT=BROKER_TRANSPORT, BROKER_POOL_LIMIT=10, CELERYD_POOL='solo', CELERYD_PREFETCH_MULTIPLIER=0, CELERY_DISABLE_RATE_LIMITS=True, CELERY_DEFAULT_DELIVERY_MODE=1, CELERY_QUEUES = { 'bench.worker': { 'exchange': 'bench.worker', 'routing_key': 'bench.worker', 'no_ack': True, 'exchange_durable': False, 'queue_durable': False, 'auto_delete': True, } }, CELERY_TASK_SERIALIZER='json', CELERY_DEFAULT_QUEUE='bench.worker', CELERY_BACKEND=None, )#CELERY_MESSAGE_COMPRESSION='zlib') def tdiff(then): return time.time() - then @celery.task(cur=0, time_start=None, queue='bench.worker', bare=True) def it(_, n): i = it.cur # use internal counter, as ordering can be skewed # by previous runs, or the broker. if i and not i % 5000: print >> sys.stderr, '(%s so far: %ss)' % (i, tdiff(it.subt)) it.subt = time.time() if not i: it.subt = it.time_start = time.time() elif i == n - 1: total = tdiff(it.time_start) print >> sys.stderr, '(%s so far: %ss)' % (i, tdiff(it.subt)) print('-- process %s tasks: %ss total, %s tasks/s} ' % ( n, total, n / (total + .0))) sys.exit() it.cur += 1 def bench_apply(n=DEFAULT_ITS): time_start = time.time() group(it.s(i, n) for i in xrange(n))() print('-- apply %s tasks: %ss' % (n, time.time() - time_start, )) def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'): loglevel = os.environ.get('BENCH_LOGLEVEL') or loglevel if loglevel: celery.log.setup_logging_subsystem(loglevel=loglevel) worker = celery.WorkController(concurrency=15, queues=['bench.worker']) try: print('STARTING WORKER') worker.start() except SystemExit: raise assert sum(worker.state.total_count.values()) == n + 1 def bench_both(n=DEFAULT_ITS): bench_apply(n) bench_work(n) def main(argv=sys.argv): n = DEFAULT_ITS if len(argv) < 2: print('Usage: %s [apply|work|both] [n=20k]' % ( os.path.basename(argv[0]), )) return sys.exit(1) try: try: n = int(argv[2]) except IndexError: pass return {'apply': bench_apply, 'work': bench_work, 'both': bench_both}[argv[1]](n=n) except: raise if __name__ == '__main__': main()