from eventlet import spawn_n, monkey_patch, Timeout from eventlet.queue import LightQueue from eventlet.event import Event from celery import current_app monkey_patch() class Receipt(object): result = None def __init__(self, callback=None): self.callback = None self.ready = Event() def finished(self, result): self.result = result if self.callback: self.callback(result) self.ready.send() def wait(self, timeout=None): with Timeout(timeout): return self.ready.wait() class ProducerPool(object): Receipt = Receipt def __init__(self, size=20): self.size = size self.inqueue = LightQueue() self._running = None self._producers = None def apply_async(self, task, args, kwargs, callback=None, **options): if self._running is None: self._running = spawn_n(self._run) receipt = self.Receipt(callback) self.inqueue.put((task, args, kwargs, options, receipt)) return receipt def _run(self): self._producers = [ spawn_n(self._producer) for _ in range(self.size) ] def _producer(self): connection = current_app.connection() publisher = current_app.amqp.TaskProducer(connection) inqueue = self.inqueue while 1: task, args, kwargs, options, receipt = inqueue.get() result = task.apply_async(args, kwargs, publisher=publisher, **options) receipt.finished(result)