worker.py 953 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import time
  2. from celery import Celery
  3. celery = Celery()
  4. celery.conf.update(BROKER_TRANSPORT="memory",
  5. BROKER_POOL_LIMIT=1,
  6. CELERY_PREFETCH_MULTIPLIER=0,
  7. CELERY_DISABLE_RATE_LIMITS=True,
  8. CELERY_BACKEND=None)
  9. def bench_consumer(n=10000):
  10. from celery.worker import WorkController
  11. from celery.worker import state
  12. worker = WorkController(app=celery, pool_cls="solo")
  13. time_start = [None]
  14. @celery.task()
  15. def it(i):
  16. if not i:
  17. time_start[0] = time.time()
  18. elif i == n - 1:
  19. print(time.time() - time_start[0])
  20. @celery.task()
  21. def shutdown_worker():
  22. raise SystemExit()
  23. for i in xrange(n):
  24. it.delay(i)
  25. shutdown_worker.delay()
  26. try:
  27. worker.start()
  28. except SystemExit:
  29. assert sum(state.total_count.values()) == n + 1
  30. if __name__ == "__main__":
  31. bench_consumer()