bulk_task_producer.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from eventlet import spawn_n, monkey_patch, Timeout
  2. from eventlet.queue import LightQueue
  3. from eventlet.event import Event
  4. monkey_patch()
  5. class Receipt(object):
  6. result = None
  7. def __init__(self, callback=None):
  8. self.callback = callback
  9. self.ready = Event()
  10. def finished(self, result):
  11. self.result = result
  12. if self.callback:
  13. self.callback(result)
  14. self.ready.send()
  15. def wait(self, timeout=None):
  16. with Timeout(timeout):
  17. return self.ready.wait()
  18. class ProducerPool(object):
  19. """Usage::
  20. >>> app = Celery(broker='amqp://')
  21. >>> ProducerPool(app)
  22. """
  23. Receipt = Receipt
  24. def __init__(self, app, size=20):
  25. self.app = app
  26. self.size = size
  27. self.inqueue = LightQueue()
  28. self._running = None
  29. self._producers = None
  30. def apply_async(self, task, args, kwargs, callback=None, **options):
  31. if self._running is None:
  32. self._running = spawn_n(self._run)
  33. receipt = self.Receipt(callback)
  34. self.inqueue.put((task, args, kwargs, options, receipt))
  35. return receipt
  36. def _run(self):
  37. self._producers = [
  38. spawn_n(self._producer) for _ in range(self.size)
  39. ]
  40. def _producer(self):
  41. inqueue = self.inqueue
  42. with self.app.producer_or_acquire() as producer:
  43. while 1:
  44. task, args, kwargs, options, receipt = inqueue.get()
  45. result = task.apply_async(args, kwargs,
  46. producer=producer,
  47. **options)
  48. receipt.finished(result)