bench_worker.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import os
  2. import sys
  3. import time
  4. from celery import Celery
  5. DEFAULT_ITS = 5000
  6. celery = Celery(__name__)
  7. celery.conf.update(BROKER_TRANSPORT="amqp",
  8. BROKER_POOL_LIMIT=10,
  9. CELERY_PREFETCH_MULTIPLIER=0,
  10. CELERY_DISABLE_RATE_LIMITS=True,
  11. CELERY_BACKEND=None)
  12. @celery.task(cur=0, time_start=None, queue="bench.worker")
  13. def it(_, n):
  14. i = it.cur # use internal counter, as ordering can be skewed
  15. # by previous runs, or the broker.
  16. if i and not i % 1000:
  17. print >> sys.stderr, i
  18. if not i:
  19. it.time_start = time.time()
  20. elif i == n - 1:
  21. print("consume: %s" % (time.time() - it.time_start, ))
  22. sys.exit()
  23. it.cur += 1
  24. def bench_apply(n=DEFAULT_ITS):
  25. time_start = time.time()
  26. celery.TaskSet(it.subtask((i, n)) for i in xrange(n)).apply_async()
  27. print("apply: %s" % (time.time() - time_start, ))
  28. def bench_consume(n=DEFAULT_ITS):
  29. from celery.worker import WorkController
  30. from celery.worker import state
  31. #import logging
  32. #celery.log.setup_logging_subsystem(loglevel=logging.DEBUG)
  33. worker = celery.WorkController(pool_cls="solo",
  34. queues=["bench.worker"])
  35. try:
  36. print("STARTING WORKER")
  37. worker.start()
  38. except SystemExit:
  39. assert sum(state.total_count.values()) == n + 1
  40. def bench_both(n=DEFAULT_ITS):
  41. bench_apply(n)
  42. bench_consumer(n)
  43. def main(argv=sys.argv):
  44. if len(argv) < 2:
  45. print("Usage: %s [apply|consume|both]" % (os.path.basename(argv[0]), ))
  46. return sys.exit(1)
  47. return {"apply": bench_apply,
  48. "consume": bench_consume,
  49. "both": bench_both}[argv[1]]()
  50. if __name__ == "__main__":
  51. main()