queuelog.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import sys
  2. from multiprocessing.queues import SimpleQueue
  3. from multiprocessing.process import Process
  4. from multiprocessing.pool import Pool
  5. class Logwriter(Process):
  6. def start(self, log_queue, logfile="process.log"):
  7. self.log_queue = log_queue
  8. self.logfile = logfile
  9. super(Logwriter, self).start()
  10. def run(self):
  11. self.process_logs(self.log_queue, self.logfile)
  12. def process_logs(self, log_queue, logfile):
  13. need_to_close_fh = False
  14. logfh = logfile
  15. if isinstance(logfile, basestring):
  16. need_to_close_fh = True
  17. logfh = open(logfile, "a")
  18. logfh = open(logfile, "a")
  19. while 1:
  20. message = log_queue.get()
  21. if message is None: # received sentinel
  22. break
  23. logfh.write(message)
  24. log_queue.put(None) # cascade sentinel
  25. if need_to_close_fh:
  26. logfh.close()
  27. class QueueLogger(object):
  28. def __init__(self, log_queue, log_process):
  29. self.log_queue = log_queue
  30. self.log_process = log_process
  31. @classmethod
  32. def start(cls):
  33. log_queue = SimpleQueue()
  34. log_process = Logwriter()
  35. log_process.start(log_queue)
  36. return cls(log_queue, log_process)
  37. def write(self, message):
  38. self.log_queue.put(message)
  39. def stop(self):
  40. self.log_queue.put(None) # send sentinel
  41. def flush(self):
  42. pass
  43. def some_process_body():
  44. sys.stderr.write("Vandelay industries!\n")
  45. def setup_redirection():
  46. queue_logger = QueueLogger.start()
  47. sys.stderr = queue_logger
  48. return queue_logger
  49. def main():
  50. queue_logger = setup_redirection()
  51. queue_logger.write("ABCDEF\n")
  52. try:
  53. p = Pool(10)
  54. results = [p.apply_async(some_process_body) for i in xrange(20)]
  55. [result.get() for result in results]
  56. p.close()
  57. finally:
  58. queue_logger.stop()
  59. if __name__ == "__main__":
  60. main()