| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 | from __future__ import print_functionimport osimport sysimport timeos.environ.update(    NOSETPS='yes',    USE_FAST_LOCALS='yes',)import anyjsonJSONIMP = os.environ.get('JSONIMP')if JSONIMP:    anyjson.force_implementation(JSONIMP)print('anyjson implementation: {0!r}'.format(anyjson.implementation.name))from celery import Celery, groupfrom celery.five import rangeDEFAULT_ITS = 40000BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq')if hasattr(sys, 'pypy_version_info'):    BROKER_TRANSPORT = 'pyamqp'celery = Celery(__name__)celery.conf.update(BROKER_TRANSPORT=BROKER_TRANSPORT,                   BROKER_POOL_LIMIT=10,                   CELERYD_POOL='solo',                   CELERYD_PREFETCH_MULTIPLIER=0,                   CELERY_DISABLE_RATE_LIMITS=True,                   CELERY_DEFAULT_DELIVERY_MODE=1,                   CELERY_QUEUES = {                       'bench.worker': {                           'exchange': 'bench.worker',                           'routing_key': 'bench.worker',                           'no_ack': True,                           'exchange_durable': False,                           'queue_durable': False,                           'auto_delete': True,                        }                   },                   CELERY_TASK_SERIALIZER='json',                   CELERY_DEFAULT_QUEUE='bench.worker',                   CELERY_BACKEND=None,                   )#CELERY_MESSAGE_COMPRESSION='zlib')def tdiff(then):    return time.time() - then@celery.task(cur=0, time_start=None, queue='bench.worker', bare=True)def it(_, n):    i = it.cur  # use internal counter, as ordering can be skewed                # by previous runs, or the broker.    if i and not i % 5000:        print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)        it.subt = time.time()    if not i:        it.subt = it.time_start = time.time()    elif i == n - 1:        total = tdiff(it.time_start)        print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)        print('-- process {0} tasks: {1}s total, {2} tasks/s} '.format(                n, total, n / (total + .0)))        sys.exit()    it.cur += 1def bench_apply(n=DEFAULT_ITS):    time_start = time.time()    group(it.s(i, n) for i in range(n))()    print('-- apply {0} tasks: {1}s'.format(n, time.time() - time_start))def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'):    loglevel = os.environ.get('BENCH_LOGLEVEL') or loglevel    if loglevel:        celery.log.setup_logging_subsystem(loglevel=loglevel)    worker = celery.WorkController(concurrency=15,                                   queues=['bench.worker'])    try:        print('STARTING WORKER')        worker.start()    except SystemExit:        raise        assert sum(worker.state.total_count.values()) == n + 1def bench_both(n=DEFAULT_ITS):    bench_apply(n)    bench_work(n)def main(argv=sys.argv):    n = DEFAULT_ITS    if len(argv) < 2:        print('Usage: {0} [apply|work|both] [n=20k]'.format(                os.path.basename(argv[0])))        return sys.exit(1)    try:        try:            n = int(argv[2])        except IndexError:            pass        return {'apply': bench_apply,                'work': bench_work,                'both': bench_both}[argv[1]](n=n)    except:        raiseif __name__ == '__main__':    main()
 |