| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 | 
							- from eventlet import spawn_n, monkey_patch, Timeout
 
- from eventlet.queue import LightQueue
 
- from eventlet.event import Event
 
- monkey_patch()
 
- class Receipt:
 
-     result = None
 
-     def __init__(self, callback=None):
 
-         self.callback = callback
 
-         self.ready = Event()
 
-     def finished(self, result):
 
-         self.result = result
 
-         if self.callback:
 
-             self.callback(result)
 
-         self.ready.send()
 
-     def wait(self, timeout=None):
 
-         with Timeout(timeout):
 
-             return self.ready.wait()
 
- class ProducerPool:
 
-     """Usage::
 
-         >>> app = Celery(broker='amqp://')
 
-         >>> ProducerPool(app)
 
-     """
 
-     Receipt = Receipt
 
-     def __init__(self, app, size=20):
 
-         self.app = app
 
-         self.size = size
 
-         self.inqueue = LightQueue()
 
-         self._running = None
 
-         self._producers = None
 
-     def apply_async(self, task, args, kwargs, callback=None, **options):
 
-         if self._running is None:
 
-             self._running = spawn_n(self._run)
 
-         receipt = self.Receipt(callback)
 
-         self.inqueue.put((task, args, kwargs, options, receipt))
 
-         return receipt
 
-     def _run(self):
 
-         self._producers = [
 
-             spawn_n(self._producer) for _ in range(self.size)
 
-         ]
 
-     def _producer(self):
 
-         inqueue = self.inqueue
 
-         with self.app.producer_or_acquire() as producer:
 
-             while 1:
 
-                 task, args, kwargs, options, receipt = inqueue.get()
 
-                 result = task.apply_async(args, kwargs,
 
-                                           producer=producer,
 
-                                           **options)
 
-                 receipt.finished(result)
 
 
  |