| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 | from __future__ import with_statementfrom eventlet import spawn_n, monkey_patch, Timeoutfrom eventlet.queue import LightQueuefrom eventlet.event import Eventfrom celery.messaging import establish_connection, TaskPublishermonkey_patch()class Receipt(object):    result = None    def __init__(self, callback=None):        self.callback = None        self.event = Event()    def finished(self, result):        self.result = result        if self.callback:            self.callback(result)        self.ready.send()    def wait(self, timeout=None):        with Timeout(timeout):            return self.ready.wait()class ProducerPool(object):    def __init__(self, size=20):        self.size = size        self.inqueue = LightQueue()        self._running = None        self._producers = None    def apply_async(self, task, args, kwargs, callback=None, **options):        if self._running is None:            self._running = spawn_n(self._run)        receipt = self.Receipt(callback)        self.inqueue.put((task, args, kwargs, options, receipt))        return receipt    def _run(self):        self._producers = [spawn_n(self._producer)                                for _ in xrange(self.size)]    def _producer(self):        connection = establish_connection()        publisher = TaskPublisher(connection)        inqueue = self.inqueue        while 1:            task, args, kwargs, options, receipt = inqueue.get()            result = task.apply_async(args, kwargs,                                      publisher=publisher,                                      **options)            receipt.finished(result)
 |