forking.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #
  2. # Module for starting a process object using os.fork() or CreateProcess()
  3. #
  4. # multiprocessing/forking.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # All rights reserved.
  8. #
  9. # Redistribution and use in source and binary forms, with or without
  10. # modification, are permitted provided that the following conditions
  11. # are met:
  12. #
  13. # 1. Redistributions of source code must retain the above copyright
  14. # notice, this list of conditions and the following disclaimer.
  15. # 2. Redistributions in binary form must reproduce the above copyright
  16. # notice, this list of conditions and the following disclaimer in the
  17. # documentation and/or other materials provided with the distribution.
  18. # 3. Neither the name of author nor the names of any contributors may be
  19. # used to endorse or promote products derived from this software
  20. # without specific prior written permission.
  21. #
  22. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
  23. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  26. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  27. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  28. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  29. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  30. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  31. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  32. # SUCH DAMAGE.
  33. #
  34. from __future__ import absolute_import
  35. import os
  36. import sys
  37. from multiprocessing import current_process
  38. from multiprocessing import forking as _forking
  39. from multiprocessing import process
  40. from pickle import load, dump as _dump, HIGHEST_PROTOCOL
  41. Popen = _forking.Popen
  42. def dump(obj, file, protocol=None):
  43. _forking.ForkingPickler(file, protocol).dump(obj)
  44. if sys.platform != "win32":
  45. import threading
  46. class Popen(_forking.Popen): # noqa
  47. _tls = threading.local()
  48. returncode = None
  49. def __init__(self, process_obj):
  50. self.force_execv = process_obj.force_execv
  51. if self.force_execv:
  52. sys.stdout.flush()
  53. sys.stderr.flush()
  54. r, w = os.pipe()
  55. self.sentinel = r
  56. from_parent_fd, to_child_fd = os.pipe()
  57. cmd = get_command_line() + [str(from_parent_fd)]
  58. self.pid = os.fork()
  59. if self.pid == 0:
  60. os.close(r)
  61. os.close(to_child_fd)
  62. os.execv(sys.executable, cmd)
  63. # send information to child
  64. prep_data = get_preparation_data(process_obj._name)
  65. os.close(from_parent_fd)
  66. to_child = os.fdopen(to_child_fd, 'wb')
  67. Popen._tls.process_handle = self.pid
  68. try:
  69. dump(prep_data, to_child, HIGHEST_PROTOCOL)
  70. dump(process_obj, to_child, HIGHEST_PROTOCOL)
  71. finally:
  72. del(Popen._tls.process_handle)
  73. to_child.close()
  74. else:
  75. super(Popen, self).__init__(process_obj)
  76. @staticmethod
  77. def thread_is_spawning():
  78. return getattr(Popen._tls, "process_handle", None) is not None
  79. @staticmethod
  80. def duplicate_for_child(handle):
  81. return handle
  82. def is_forking(argv):
  83. if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
  84. assert len(argv) == 3
  85. return True
  86. return False
  87. def freeze_support():
  88. if is_forking(sys.argv):
  89. main()
  90. sys.exit()
  91. def get_command_line():
  92. if current_process()._identity == () and is_forking(sys.argv):
  93. raise RuntimeError(
  94. "Can't start new process while bootstrapping another")
  95. if getattr(sys, "frozen", False):
  96. return [sys.executable, '--multiprocessing-fork']
  97. else:
  98. prog = """\
  99. from celery.concurrency.processes.forking import main; main()"""
  100. return [sys.executable, '-c', prog, '--multiprocessing-fork']
  101. def main():
  102. assert is_forking(sys.argv)
  103. fd = int(sys.argv[-1])
  104. from_parent = os.fdopen(fd, 'rb')
  105. current_process()._inheriting = True
  106. preparation_data = load(from_parent)
  107. _forking.prepare(preparation_data)
  108. # Huge hack to make logging before Process.run work.
  109. loglevel = os.environ.get("_MP_FORK_LOGLEVEL_")
  110. logfile = os.environ.get("_MP_FORK_LOGFILE_") or None
  111. format = os.environ.get("_MP_FORK_LOGFORMAT_")
  112. if loglevel:
  113. from multiprocessing import util
  114. import logging
  115. logger = util.get_logger()
  116. logger.setLevel(int(loglevel))
  117. if not logger.handlers:
  118. logger._rudimentary_setup = True
  119. logfile = logfile or sys.__stderr__
  120. if hasattr(logfile, "write"):
  121. handler = logging.StreamHandler(logfile)
  122. else:
  123. handler = logging.FileHandler(logfile)
  124. formatter = logging.Formatter(
  125. format or util.DEFAULT_LOGGING_FORMAT)
  126. handler.setFormatter(formatter)
  127. logger.addHandler(handler)
  128. self = load(from_parent)
  129. current_process()._inheriting = False
  130. exitcode = self._bootstrap()
  131. exit(exitcode)
  132. def get_preparation_data(name):
  133. from multiprocessing.util import _logger, _log_to_stderr
  134. d = dict(name=name,
  135. sys_path=sys.path,
  136. sys_argv=sys.argv,
  137. log_to_stderr=_log_to_stderr,
  138. orig_dir=process.ORIGINAL_DIR,
  139. authkey=process.current_process().authkey)
  140. if _logger is not None:
  141. d["log_level"] = _logger.getEffectiveLevel()
  142. main_path = getattr(sys.modules['__main__'], '__file__', None)
  143. if not main_path and sys.argv[0] not in ('', '-c'):
  144. main_path = sys.argv[0]
  145. if main_path is not None:
  146. if not os.path.isabs(main_path) \
  147. and process.ORIGINAL_DIR is not None:
  148. main_path = os.path.join(process.ORIGINAL_DIR, main_path)
  149. d["main_path"] = os.path.normpath(main_path)
  150. return d
  151. from _multiprocessing import Connection
  152. def reduce_connection(conn):
  153. if not Popen.thread_is_spawning():
  154. raise RuntimeError("blabla")
  155. return type(conn), (Popen.duplicate_for_child(conn.fileno()),
  156. conn.readable, conn.writable)
  157. _forking.ForkingPickler.register(Connection, reduce_connection)
  158. _forking.Popen = Popen
  159. else:
  160. from multiprocessing.forking import freeze_support