bulk_task_producer.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import absolute_import, unicode_literals
  2. from eventlet import spawn_n, monkey_patch, Timeout
  3. from eventlet.queue import LightQueue
  4. from eventlet.event import Event
  5. monkey_patch()
  6. class Receipt(object):
  7. result = None
  8. def __init__(self, callback=None):
  9. self.callback = callback
  10. self.ready = Event()
  11. def finished(self, result):
  12. self.result = result
  13. if self.callback:
  14. self.callback(result)
  15. self.ready.send()
  16. def wait(self, timeout=None):
  17. with Timeout(timeout):
  18. return self.ready.wait()
  19. class ProducerPool(object):
  20. """Usage::
  21. >>> app = Celery(broker='amqp://')
  22. >>> ProducerPool(app)
  23. """
  24. Receipt = Receipt
  25. def __init__(self, app, size=20):
  26. self.app = app
  27. self.size = size
  28. self.inqueue = LightQueue()
  29. self._running = None
  30. self._producers = None
  31. def apply_async(self, task, args, kwargs, callback=None, **options):
  32. if self._running is None:
  33. self._running = spawn_n(self._run)
  34. receipt = self.Receipt(callback)
  35. self.inqueue.put((task, args, kwargs, options, receipt))
  36. return receipt
  37. def _run(self):
  38. self._producers = [
  39. spawn_n(self._producer) for _ in range(self.size)
  40. ]
  41. def _producer(self):
  42. inqueue = self.inqueue
  43. with self.app.producer_or_acquire() as producer:
  44. while 1:
  45. task, args, kwargs, options, receipt = inqueue.get()
  46. result = task.apply_async(args, kwargs,
  47. producer=producer,
  48. **options)
  49. receipt.finished(result)