| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 | 
							
- from eventlet import spawn_n, monkey_patch, Timeout
 
- from eventlet.queue import LightQueue
 
- from eventlet.event import Event
 
- from celery import current_app
 
- monkey_patch()
 
- class Receipt(object):
 
-     result = None
 
-     def __init__(self, callback=None):
 
-         self.callback = None
 
-         self.ready = Event()
 
-     def finished(self, result):
 
-         self.result = result
 
-         if self.callback:
 
-             self.callback(result)
 
-         self.ready.send()
 
-     def wait(self, timeout=None):
 
-         with Timeout(timeout):
 
-             return self.ready.wait()
 
- class ProducerPool(object):
 
-     Receipt = Receipt
 
-     def __init__(self, size=20):
 
-         self.size = size
 
-         self.inqueue = LightQueue()
 
-         self._running = None
 
-         self._producers = None
 
-     def apply_async(self, task, args, kwargs, callback=None, **options):
 
-         if self._running is None:
 
-             self._running = spawn_n(self._run)
 
-         receipt = self.Receipt(callback)
 
-         self.inqueue.put((task, args, kwargs, options, receipt))
 
-         return receipt
 
-     def _run(self):
 
-         self._producers = [spawn_n(self._producer)
 
-                                 for _ in range(self.size)]
 
-     def _producer(self):
 
-         connection = current_app.connection()
 
-         publisher = current_app.amqp.TaskProducer(connection)
 
-         inqueue = self.inqueue
 
-         while 1:
 
-             task, args, kwargs, options, receipt = inqueue.get()
 
-             result = task.apply_async(args, kwargs,
 
-                                       publisher=publisher,
 
-                                       **options)
 
-             receipt.finished(result)
 
 
  |