| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 | from __future__ import with_statementfrom eventlet import spawn_n, monkey_patch, Timeoutfrom eventlet.queue import LightQueuefrom eventlet.event import Eventfrom celery import current_appmonkey_patch()class Receipt(object):    result = None    def __init__(self, callback=None):        self.callback = None        self.ready = Event()    def finished(self, result):        self.result = result        if self.callback:            self.callback(result)        self.ready.send()    def wait(self, timeout=None):        with Timeout(timeout):            return self.ready.wait()class ProducerPool(object):    Receipt = Receipt    def __init__(self, size=20):        self.size = size        self.inqueue = LightQueue()        self._running = None        self._producers = None    def apply_async(self, task, args, kwargs, callback=None, **options):        if self._running is None:            self._running = spawn_n(self._run)        receipt = self.Receipt(callback)        self.inqueue.put((task, args, kwargs, options, receipt))        return receipt    def _run(self):        self._producers = [spawn_n(self._producer)                                for _ in xrange(self.size)]    def _producer(self):        connection = current_app.broker_connection()        publisher = current_app.amqp.TaskPublisher(connection)        inqueue = self.inqueue        while 1:            task, args, kwargs, options, receipt = inqueue.get()            result = task.apply_async(args, kwargs,                                      publisher=publisher,                                      **options)            receipt.finished(result)
 |