| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 | 
							- from __future__ import with_statement
 
- from eventlet import spawn_n, monkey_patch, Timeout
 
- from eventlet.queue import LightQueue
 
- from eventlet.event import Event
 
- from celery.messaging import establish_connection, TaskPublisher
 
- monkey_patch()
 
- class Receipt(object):
 
-     result = None
 
-     def __init__(self, callback=None):
 
-         self.callback = None
 
-         self.event = Event()
 
-     def finished(self, result):
 
-         self.result = result
 
-         if self.callback:
 
-             self.callback(result)
 
-         self.ready.send()
 
-     def wait(self, timeout=None):
 
-         with Timeout(timeout):
 
-             return self.ready.wait()
 
- class ProducerPool(object):
 
-     def __init__(self, size=20):
 
-         self.size = size
 
-         self.inqueue = LightQueue()
 
-         self._running = None
 
-         self._producers = None
 
-     def apply_async(self, task, args, kwargs, callback=None, **options):
 
-         if self._running is None:
 
-             self._running = spawn_n(self._run)
 
-         receipt = self.Receipt(callback)
 
-         self.inqueue.put((task, args, kwargs, options, receipt))
 
-         return receipt
 
-     def _run(self):
 
-         self._producers = [spawn_n(self._producer)
 
-                                 for _ in xrange(self.size)]
 
-     def _producer(self):
 
-         connection = establish_connection()
 
-         publisher = TaskPublisher(connection)
 
-         inqueue = self.inqueue
 
-         while 1:
 
-             task, args, kwargs, options, receipt = inqueue.get()
 
-             result = task.apply_async(args, kwargs,
 
-                                       publisher=publisher,
 
-                                       **options)
 
-             receipt.finished(result)
 
 
  |