| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 | 
							- """
 
- Worker Controller Threads
 
- """
 
- import time
 
- import threading
 
- from Queue import Empty as QueueEmpty
 
- from celery import conf
 
- from celery import log
 
- class Mediator(threading.Thread):
 
-     """Thread continuously sending tasks in the queue to the pool.
 
-     .. attribute:: ready_queue
 
-         The task queue, a :class:`Queue.Queue` instance.
 
-     .. attribute:: callback
 
-         The callback used to process tasks retrieved from the
 
-         :attr:`ready_queue`.
 
-     """
 
-     def __init__(self, ready_queue, callback, logger=None):
 
-         threading.Thread.__init__(self)
 
-         self.logger = logger or log.get_default_logger()
 
-         self.ready_queue = ready_queue
 
-         self.callback = callback
 
-         self._shutdown = threading.Event()
 
-         self._stopped = threading.Event()
 
-         self.setDaemon(True)
 
-     def move(self):
 
-         try:
 
-             # This blocks until there's a message in the queue.
 
-             task = self.ready_queue.get(timeout=1)
 
-         except QueueEmpty:
 
-             time.sleep(0.2)
 
-         else:
 
-             if task.revoked():
 
-                 return
 
-             self.logger.debug(
 
-                 "Mediator: Running callback for task: %s[%s]" % (
 
-                     task.task_name, task.task_id))
 
-             self.callback(task) # execute
 
-     def run(self):
 
-         while not self._shutdown.isSet():
 
-             self.move()
 
-         self._stopped.set() # indicate that we are stopped
 
-     def stop(self):
 
-         """Gracefully shutdown the thread."""
 
-         self._shutdown.set()
 
-         self._stopped.wait() # block until this thread is done
 
-         self.join(1e100)
 
 
  |