bench_worker.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import os
  2. import sys
  3. import time
  4. from celery import Celery
  5. celery = Celery(__name__)
  6. celery.conf.update(BROKER_TRANSPORT="amqp",
  7. BROKER_POOL_LIMIT=10,
  8. CELERY_PREFETCH_MULTIPLIER=0,
  9. CELERY_DISABLE_RATE_LIMITS=True,
  10. CELERY_BACKEND=None)
  11. @celery.task()
  12. def shutdown_worker():
  13. print("SHUTTING DOWN")
  14. raise SystemExit()
  15. @celery.task(cur=0, time_start=None, queue="bench.worker")
  16. def it(_, n):
  17. i = it.cur # use internal counter, as ordering can be skewed
  18. # by previous runs, or the broker.
  19. if i and not i % 1000:
  20. print >> sys.stderr, i
  21. if not i:
  22. it.time_start = time.time()
  23. elif i == n - 1:
  24. print("consume: %s" % (time.time() - it.time_start, ))
  25. shutdown_worker.delay()
  26. it.cur += 1
  27. def bench_apply(n=50000):
  28. time_start = time.time()
  29. celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
  30. print("apply: %s" % (time.time() - time_start, ))
  31. def bench_consume(n=50000):
  32. from celery.worker import WorkController
  33. from celery.worker import state
  34. import logging
  35. #celery.log.setup_logging_subsystem(loglevel=logging.DEBUG)
  36. worker = celery.WorkController(pool_cls="solo",
  37. queues=["bench.worker"])
  38. try:
  39. print("STARTING WORKER")
  40. worker.start()
  41. except SystemExit:
  42. assert sum(state.total_count.values()) == n + 1
  43. def bench_both(n=50000):
  44. bench_apply(n)
  45. bench_consumer(n)
  46. def main(argv=sys.argv):
  47. if len(argv) < 2:
  48. print("Usage: %s [apply|consume|both]" % (os.path.basename(argv[0]), ))
  49. return sys.exit(1)
  50. return {"apply": bench_apply,
  51. "consume": bench_consume,
  52. "both": bench_both}[argv[1]]()
  53. if __name__ == "__main__":
  54. main()