| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 | 
							- #
 
- # Module for starting a process object using os.fork() or CreateProcess()
 
- #
 
- # multiprocessing/forking.py
 
- #
 
- # Copyright (c) 2006-2008, R Oudkerk
 
- # All rights reserved.
 
- #
 
- # Redistribution and use in source and binary forms, with or without
 
- # modification, are permitted provided that the following conditions
 
- # are met:
 
- #
 
- # 1. Redistributions of source code must retain the above copyright
 
- #    notice, this list of conditions and the following disclaimer.
 
- # 2. Redistributions in binary form must reproduce the above copyright
 
- #    notice, this list of conditions and the following disclaimer in the
 
- #    documentation and/or other materials provided with the distribution.
 
- # 3. Neither the name of author nor the names of any contributors may be
 
- #    used to endorse or promote products derived from this software
 
- #    without specific prior written permission.
 
- #
 
- # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
 
- # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
- # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 
- # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 
- # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
- # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 
- # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 
- # SUCH DAMAGE.
 
- #
 
- from __future__ import absolute_import
 
- import os
 
- import sys
 
- from multiprocessing import current_process
 
- from multiprocessing import forking as _forking
 
- from multiprocessing import process
 
- from pickle import load, dump as _dump, HIGHEST_PROTOCOL
 
- Popen = _forking.Popen
 
- def dump(obj, file, protocol=None):
 
-     _forking.ForkingPickler(file, protocol).dump(obj)
 
- if sys.platform != "win32":
 
-     import threading
 
-     class Popen(_forking.Popen):  # noqa
 
-         _tls = threading.local()
 
-         returncode = None
 
-         def __init__(self, process_obj):
 
-             self.force_execv = process_obj.force_execv
 
-             if self.force_execv:
 
-                 sys.stdout.flush()
 
-                 sys.stderr.flush()
 
-                 r, w = os.pipe()
 
-                 self.sentinel = r
 
-                 from_parent_fd, to_child_fd = os.pipe()
 
-                 cmd = get_command_line() + [str(from_parent_fd)]
 
-                 self.pid = os.fork()
 
-                 if self.pid == 0:
 
-                     os.close(r)
 
-                     os.close(to_child_fd)
 
-                     os.execv(sys.executable, cmd)
 
-                 # send information to child
 
-                 prep_data = get_preparation_data(process_obj._name)
 
-                 os.close(from_parent_fd)
 
-                 to_child = os.fdopen(to_child_fd, 'wb')
 
-                 Popen._tls.process_handle = self.pid
 
-                 try:
 
-                     dump(prep_data, to_child, HIGHEST_PROTOCOL)
 
-                     dump(process_obj, to_child, HIGHEST_PROTOCOL)
 
-                 finally:
 
-                     del(Popen._tls.process_handle)
 
-                     to_child.close()
 
-             else:
 
-                 super(Popen, self).__init__(process_obj)
 
-         @staticmethod
 
-         def thread_is_spawning():
 
-             return getattr(Popen._tls, "process_handle", None) is not None
 
-         @staticmethod
 
-         def duplicate_for_child(handle):
 
-             return handle
 
-     def is_forking(argv):
 
-         if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
 
-             assert len(argv) == 3
 
-             return True
 
-         return False
 
-     def freeze_support():
 
-         if is_forking(sys.argv):
 
-             main()
 
-             sys.exit()
 
-     def get_command_line():
 
-         if current_process()._identity == () and is_forking(sys.argv):
 
-             raise RuntimeError(
 
-                 "Can't start new process while bootstrapping another")
 
-         if getattr(sys, "frozen", False):
 
-             return [sys.executable, '--multiprocessing-fork']
 
-         else:
 
-             prog = """\
 
- from celery.concurrency.processes.forking import main; main()"""
 
-             return [sys.executable, '-c', prog, '--multiprocessing-fork']
 
-     def main():
 
-         assert is_forking(sys.argv)
 
-         fd = int(sys.argv[-1])
 
-         from_parent = os.fdopen(fd, 'rb')
 
-         current_process()._inheriting = True
 
-         preparation_data = load(from_parent)
 
-         _forking.prepare(preparation_data)
 
-         # Huge hack to make logging before Process.run work.
 
-         loglevel = os.environ.get("_MP_FORK_LOGLEVEL_")
 
-         logfile = os.environ.get("_MP_FORK_LOGFILE_") or None
 
-         format = os.environ.get("_MP_FORK_LOGFORMAT_")
 
-         if loglevel:
 
-             from multiprocessing import util
 
-             import logging
 
-             logger = util.get_logger()
 
-             logger.setLevel(int(loglevel))
 
-             if not logger.handlers:
 
-                 logger._rudimentary_setup = True
 
-                 logfile = logfile or sys.__stderr__
 
-                 if hasattr(logfile, "write"):
 
-                     handler = logging.StreamHandler(logfile)
 
-                 else:
 
-                     handler = logging.FileHandler(logfile)
 
-                 formatter = logging.Formatter(
 
-                         format or util.DEFAULT_LOGGING_FORMAT)
 
-                 handler.setFormatter(formatter)
 
-                 logger.addHandler(handler)
 
-         self = load(from_parent)
 
-         current_process()._inheriting = False
 
-         exitcode = self._bootstrap()
 
-         exit(exitcode)
 
-     def get_preparation_data(name):
 
-         from multiprocessing.util import _logger, _log_to_stderr
 
-         d = dict(name=name,
 
-                  sys_path=sys.path,
 
-                  sys_argv=sys.argv,
 
-                  log_to_stderr=_log_to_stderr,
 
-                  orig_dir=process.ORIGINAL_DIR,
 
-                  authkey=process.current_process().authkey)
 
-         if _logger is not None:
 
-             d["log_level"] = _logger.getEffectiveLevel()
 
-         main_path = getattr(sys.modules['__main__'], '__file__', None)
 
-         if not main_path and sys.argv[0] not in ('', '-c'):
 
-             main_path = sys.argv[0]
 
-         if main_path is not None:
 
-             if not os.path.isabs(main_path) \
 
-                     and process.ORIGINAL_DIR is not None:
 
-                 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
 
-             d["main_path"] = os.path.normpath(main_path)
 
-         return d
 
-     from _multiprocessing import Connection
 
-     def reduce_connection(conn):
 
-         if not Popen.thread_is_spawning():
 
-             raise RuntimeError("blabla")
 
-         return type(conn), (Popen.duplicate_for_child(conn.fileno()),
 
-                             conn.readable, conn.writable)
 
-     _forking.ForkingPickler.register(Connection, reduce_connection)
 
-     _forking.Popen = Popen
 
- else:
 
-     from multiprocessing.forking import freeze_support
 
 
  |