bulk_task_producer.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from eventlet import spawn_n, monkey_patch, Timeout
  2. from eventlet.queue import LightQueue
  3. from eventlet.event import Event
  4. from celery import current_app
  5. monkey_patch()
  6. class Receipt(object):
  7. result = None
  8. def __init__(self, callback=None):
  9. self.callback = None
  10. self.ready = Event()
  11. def finished(self, result):
  12. self.result = result
  13. if self.callback:
  14. self.callback(result)
  15. self.ready.send()
  16. def wait(self, timeout=None):
  17. with Timeout(timeout):
  18. return self.ready.wait()
  19. class ProducerPool(object):
  20. Receipt = Receipt
  21. def __init__(self, size=20):
  22. self.size = size
  23. self.inqueue = LightQueue()
  24. self._running = None
  25. self._producers = None
  26. def apply_async(self, task, args, kwargs, callback=None, **options):
  27. if self._running is None:
  28. self._running = spawn_n(self._run)
  29. receipt = self.Receipt(callback)
  30. self.inqueue.put((task, args, kwargs, options, receipt))
  31. return receipt
  32. def _run(self):
  33. self._producers = [
  34. spawn_n(self._producer) for _ in range(self.size)
  35. ]
  36. def _producer(self):
  37. connection = current_app.connection()
  38. publisher = current_app.amqp.TaskProducer(connection)
  39. inqueue = self.inqueue
  40. while 1:
  41. task, args, kwargs, options, receipt = inqueue.get()
  42. result = task.apply_async(args, kwargs,
  43. publisher=publisher,
  44. **options)
  45. receipt.finished(result)