bulk_task_producer.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from __future__ import with_statement
  2. from eventlet import spawn_n, monkey_patch, Timeout
  3. from eventlet.queue import LightQueue
  4. from eventlet.event import Event
  5. from celery import current_app
  6. monkey_patch()
  7. class Receipt(object):
  8. result = None
  9. def __init__(self, callback=None):
  10. self.callback = None
  11. self.ready = Event()
  12. def finished(self, result):
  13. self.result = result
  14. if self.callback:
  15. self.callback(result)
  16. self.ready.send()
  17. def wait(self, timeout=None):
  18. with Timeout(timeout):
  19. return self.ready.wait()
  20. class ProducerPool(object):
  21. Receipt = Receipt
  22. def __init__(self, size=20):
  23. self.size = size
  24. self.inqueue = LightQueue()
  25. self._running = None
  26. self._producers = None
  27. def apply_async(self, task, args, kwargs, callback=None, **options):
  28. if self._running is None:
  29. self._running = spawn_n(self._run)
  30. receipt = self.Receipt(callback)
  31. self.inqueue.put((task, args, kwargs, options, receipt))
  32. return receipt
  33. def _run(self):
  34. self._producers = [spawn_n(self._producer)
  35. for _ in xrange(self.size)]
  36. def _producer(self):
  37. connection = current_app.connection()
  38. publisher = current_app.amqp.TaskProducer(connection)
  39. inqueue = self.inqueue
  40. while 1:
  41. task, args, kwargs, options, receipt = inqueue.get()
  42. result = task.apply_async(args, kwargs,
  43. publisher=publisher,
  44. **options)
  45. receipt.finished(result)