bench_worker.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import os
  3. import sys
  4. from time import monotonic
  5. os.environ.update(
  6. NOSETPS='yes',
  7. USE_FAST_LOCALS='yes',
  8. )
  9. from celery import Celery # noqa
  10. DEFAULT_ITS = 40000
  11. BROKER_TRANSPORT = os.environ.get('BROKER', 'librabbitmq')
  12. if hasattr(sys, 'pypy_version_info'):
  13. BROKER_TRANSPORT = 'pyamqp'
  14. app = Celery('bench_worker')
  15. app.conf.update(
  16. broker_transport=BROKER_TRANSPORT,
  17. broker_pool_limit=10,
  18. celeryd_pool='solo',
  19. celeryd_prefetch_multiplier=0,
  20. default_delivery_mode=1,
  21. queues={
  22. 'bench.worker': {
  23. 'exchange': 'bench.worker',
  24. 'routing_key': 'bench.worker',
  25. 'no_ack': True,
  26. 'exchange_durable': False,
  27. 'queue_durable': False,
  28. 'auto_delete': True,
  29. }
  30. },
  31. task_serializer='json',
  32. default_queue='bench.worker',
  33. result_backend=None,
  34. ),
  35. def tdiff(then):
  36. return monotonic() - then
  37. @app.task(cur=0, time_start=None, queue='bench.worker', bare=True)
  38. def it(_, n):
  39. # use internal counter, as ordering can be skewed
  40. # by previous runs, or the broker.
  41. i = it.cur
  42. if i and not i % 5000:
  43. print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
  44. it.subt = monotonic()
  45. if not i:
  46. it.subt = it.time_start = monotonic()
  47. elif i > n - 2:
  48. total = tdiff(it.time_start)
  49. print('({0} so far: {1}s)'.format(i, tdiff(it.subt)), file=sys.stderr)
  50. print('-- process {0} tasks: {1}s total, {2} tasks/s} '.format(
  51. n, total, n / (total + .0),
  52. ))
  53. import os
  54. os._exit()
  55. it.cur += 1
  56. def bench_apply(n=DEFAULT_ITS):
  57. time_start = monotonic()
  58. task = it._get_current_object()
  59. with app.producer_or_acquire() as producer:
  60. [task.apply_async((i, n), producer=producer) for i in range(n)]
  61. print('-- apply {0} tasks: {1}s'.format(n, monotonic() - time_start))
  62. def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'):
  63. loglevel = os.environ.get('BENCH_LOGLEVEL') or loglevel
  64. if loglevel:
  65. app.log.setup_logging_subsystem(loglevel=loglevel)
  66. worker = app.WorkController(concurrency=15,
  67. queues=['bench.worker'])
  68. try:
  69. print('-- starting worker')
  70. worker.start()
  71. except SystemExit:
  72. raise
  73. assert sum(worker.state.total_count.values()) == n + 1
  74. def bench_both(n=DEFAULT_ITS):
  75. bench_apply(n)
  76. bench_work(n)
  77. def main(argv=sys.argv):
  78. n = DEFAULT_ITS
  79. if len(argv) < 2:
  80. print('Usage: {0} [apply|work|both] [n=20k]'.format(
  81. os.path.basename(argv[0]),
  82. ))
  83. return sys.exit(1)
  84. try:
  85. try:
  86. n = int(argv[2])
  87. except IndexError:
  88. pass
  89. return {'apply': bench_apply,
  90. 'work': bench_work,
  91. 'both': bench_both}[argv[1]](n=n)
  92. except:
  93. raise
  94. if __name__ == '__main__':
  95. main()