bulk_task_producer.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from __future__ import with_statement
  2. from eventlet import spawn_n, monkey_patch, Timeout
  3. from eventlet.queue import LightQueue
  4. from eventlet.event import Event
  5. from celery.messaging import establish_connection, TaskPublisher
  6. monkey_patch()
  7. class Receipt(object):
  8. result = None
  9. def __init__(self, callback=None):
  10. self.callback = None
  11. self.event = Event()
  12. def finished(self, result):
  13. self.result = result
  14. if self.callback:
  15. self.callback(result)
  16. self.ready.send()
  17. def wait(self, timeout=None):
  18. with Timeout(timeout):
  19. return self.ready.wait()
  20. class ProducerPool(object):
  21. def __init__(self, size=20):
  22. self.size = size
  23. self.inqueue = LightQueue()
  24. self._running = None
  25. self._producers = None
  26. def apply_async(self, task, args, kwargs, callback=None, **options):
  27. if self._running is None:
  28. self._running = spawn_n(self._run)
  29. receipt = self.Receipt(callback)
  30. self.inqueue.put((task, args, kwargs, options, receipt))
  31. return receipt
  32. def _run(self):
  33. self._producers = [spawn_n(self._producer)
  34. for _ in xrange(self.size)]
  35. def _producer(self):
  36. connection = establish_connection()
  37. publisher = TaskPublisher(connection)
  38. inqueue = self.inqueue
  39. while 1:
  40. task, args, kwargs, options, receipt = inqueue.get()
  41. result = task.apply_async(args, kwargs,
  42. publisher=publisher,
  43. **options)
  44. receipt.finished(result)