bench_worker.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import os
  2. import sys
  3. import time
  4. os.environ["NOSETPS"] = "yes"
  5. import anyjson
  6. JSONIMP = os.environ.get("JSONIMP")
  7. if JSONIMP:
  8. anyjson.force_implementation(JSONIMP)
  9. print("anyjson implementation: %r" % (anyjson.implementation.name, ))
  10. from celery import Celery
  11. DEFAULT_ITS = 20000
  12. celery = Celery(__name__)
  13. celery.conf.update(BROKER_TRANSPORT="librabbitmq",
  14. BROKER_POOL_LIMIT=10,
  15. CELERYD_POOL="solo",
  16. CELERY_PREFETCH_MULTIPLIER=0,
  17. CELERY_DISABLE_RATE_LIMITS=True,
  18. CELERY_DEFAULT_DELIVERY_MODE=1,
  19. CELERY_QUEUES = {
  20. "bench.worker": {
  21. "exchange": "bench.worker",
  22. "routing_key": "bench.worker",
  23. "no_ack": True,
  24. #"exchange_durable": False,
  25. #"queue_durable": False,
  26. "auto_delete": True,
  27. }
  28. },
  29. CELERY_TASK_SERIALIZER="json",
  30. CELERY_DEFAULT_QUEUE="bench.worker",
  31. CELERY_BACKEND=None,
  32. )#CELERY_MESSAGE_COMPRESSION="zlib")
  33. def tdiff(then):
  34. return time.time() - then
  35. @celery.task(cur=0, time_start=None, queue="bench.worker")
  36. def it(_, n):
  37. i = it.cur # use internal counter, as ordering can be skewed
  38. # by previous runs, or the broker.
  39. if i and not i % 5000:
  40. print >> sys.stderr, "(%s so far: %ss)" % (i, tdiff(it.subt))
  41. it.subt = time.time()
  42. if not i:
  43. it.subt = it.time_start = time.time()
  44. elif i == n - 1:
  45. total = tdiff(it.time_start)
  46. print >> sys.stderr, "(%s so far: %ss)" % (i, tdiff(it.subt))
  47. print("-- process %s tasks: %ss total, %s tasks/s} " % (
  48. n, total, n / (total + .0)))
  49. sys.exit()
  50. it.cur += 1
  51. def bench_apply(n=DEFAULT_ITS):
  52. time_start = time.time()
  53. celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
  54. print("-- apply %s tasks: %ss" % (n, time.time() - time_start, ))
  55. def bench_work(n=DEFAULT_ITS, loglevel="CRITICAL"):
  56. loglevel = os.environ.get("BENCH_LOGLEVEL") or loglevel
  57. if loglevel:
  58. celery.log.setup_logging_subsystem(loglevel=loglevel)
  59. worker = celery.WorkController(concurrency=15,
  60. queues=["bench.worker"])
  61. try:
  62. print("STARTING WORKER")
  63. worker.start()
  64. except SystemExit:
  65. assert sum(worker.state.total_count.values()) == n + 1
  66. def bench_both(n=DEFAULT_ITS):
  67. bench_apply(n)
  68. bench_work(n)
  69. def main(argv=sys.argv):
  70. n = DEFAULT_ITS
  71. if len(argv) < 2:
  72. print("Usage: %s [apply|work|both] [n=20k]" % (
  73. os.path.basename(argv[0]), ))
  74. return sys.exit(1)
  75. try:
  76. try:
  77. n = int(argv[2])
  78. except IndexError:
  79. pass
  80. return {"apply": bench_apply,
  81. "work": bench_work,
  82. "both": bench_both}[argv[1]](n=n)
  83. except KeyboardInterrupt:
  84. pass
  85. if __name__ == "__main__":
  86. main()