Browse Source

Do not rely on billiard.util being available (Issue #2345)

Ask Solem 10 years ago
parent
commit
77d7eb06e3

+ 5 - 2
celery/app/__init__.py

@@ -63,13 +63,16 @@ def _app_or_default(app=None):
 
 def _app_or_default_trace(app=None):  # pragma: no cover
     from traceback import print_stack
-    from billiard import current_process
+    try:
+        from billiard.process import current_process
+    except ImportError:
+        current_process = None
     if app is None:
         if getattr(_state._tls, 'current_app', None):
             print('-- RETURNING TO CURRENT APP --')  # noqa+
             print_stack()
             return _state._tls.current_app
-        if current_process()._name == 'MainProcess':
+        if not current_process or current_process()._name == 'MainProcess':
             raise Exception('DEFAULT APP')
         print('-- RETURNING TO DEFAULT APP --')      # noqa+
         print_stack()

+ 6 - 2
celery/app/base.py

@@ -17,7 +17,10 @@ from copy import deepcopy
 from operator import attrgetter
 
 from amqp import promise
-from billiard.util import register_after_fork
+try:
+    from billiard.util import register_after_fork
+except ImportError:
+    register_after_fork = None
 from kombu.clocks import LamportClock
 from kombu.common import oid_from
 from kombu.utils import cached_property, uuid
@@ -98,7 +101,8 @@ def _global_after_fork(obj):
 def _ensure_after_fork():
     global _after_fork_registered
     _after_fork_registered = True
-    register_after_fork(_global_after_fork, _global_after_fork)
+    if register_after_fork is not None:
+        register_after_fork(_global_after_fork, _global_after_fork)
 
 
 class Celery(object):

+ 1 - 1
celery/apps/worker.py

@@ -20,7 +20,7 @@ import warnings
 
 from functools import partial
 
-from billiard import current_process
+from billiard.process import current_process
 from kombu.utils.encoding import safe_str
 from kombu.utils.url import maybe_sanitize_url
 

+ 6 - 2
celery/backends/database/session.py

@@ -8,7 +8,10 @@
 """
 from __future__ import absolute_import
 
-from billiard.util import register_after_fork
+try:
+    from billiard.util import register_after_fork
+except ImportError:
+    register_after_fork = None
 
 from sqlalchemy import create_engine
 from sqlalchemy.ext.declarative import declarative_base
@@ -26,7 +29,8 @@ class SessionManager(object):
         self._sessions = {}
         self.forked = False
         self.prepared = False
-        register_after_fork(self, self._after_fork)
+        if register_after_fork is not None:
+            register_after_fork(self, self._after_fork)
 
     def _after_fork(self,):
         self.forked = True

+ 2 - 1
celery/beat.py

@@ -19,7 +19,8 @@ import traceback
 from collections import namedtuple
 from threading import Event, Thread
 
-from billiard import Process, ensure_multiprocessing
+from billiard import ensure_multiprocessing
+from billiard.process import Process
 from billiard.common import reset_signals
 from kombu.utils import cached_property, reprcall
 from kombu.utils.functional import maybe_evaluate

+ 1 - 1
celery/contrib/rdb.py

@@ -43,7 +43,7 @@ import sys
 
 from pdb import Pdb
 
-from billiard import current_process
+from billiard.process import current_process
 
 from celery.five import range
 

+ 6 - 3
celery/platforms.py

@@ -21,7 +21,10 @@ import warnings
 
 from collections import namedtuple
 
-from billiard import current_process
+try:
+    from billiard.process import current_process
+except ImportError:
+    current_process = None
 # fileno used to be in this module
 from kombu.utils import maybe_fileno
 from kombu.utils.encoding import safe_str
@@ -706,8 +709,8 @@ else:
         """
         if hostname:
             progname = '{0}: {1}'.format(progname, hostname)
-        return set_process_title(
-            '{0}:{1}'.format(progname, current_process().name), info=info)
+        name = current_process().name if current_process else 'MainProcess'
+        return set_process_title('{0}:{1}'.format(progname, name), info=info)
 
 
 def get_errno_name(n):

+ 2 - 5
celery/tests/app/test_log.py

@@ -21,7 +21,7 @@ from celery.utils.log import (
     logger_isa,
 )
 from celery.tests.case import (
-    AppCase, Mock, SkipTest,
+    AppCase, Mock, SkipTest, mask_modules,
     get_handlers, override_stdouts, patch, wrap_logger, restore_logging,
 )
 
@@ -209,11 +209,8 @@ class test_default_logger(AppCase):
     def test_setup_logging_subsystem_no_mputil(self):
         from celery.utils import log as logtools
         with restore_logging():
-            mputil, logtools.mputil = logtools.mputil, None
-            try:
+            with mask_modules('billiard.util'):
                 self.app.log.setup_logging_subsystem()
-            finally:
-                logtools.mputil = mputil
 
     def _assertLog(self, logger, logmsg, loglevel=logging.ERROR):
 

+ 1 - 1
celery/tests/bin/test_worker.py

@@ -6,7 +6,7 @@ import sys
 
 from functools import wraps
 
-from billiard import current_process
+from billiard.process import current_process
 from kombu import Exchange, Queue
 
 from celery import platforms

+ 24 - 7
celery/utils/log.py

@@ -16,7 +16,6 @@ import threading
 import traceback
 
 from contextlib import contextmanager
-from billiard import current_process, util as mputil
 from kombu.five import values
 from kombu.log import get_logger as _get_logger, LOG_LEVELS
 from kombu.utils.encoding import safe_str
@@ -253,15 +252,33 @@ class LoggingProxy(object):
 
 
 def get_multiprocessing_logger():
-    return mputil.get_logger() if mputil else None
+    try:
+        from billiard import util
+    except ImportError:
+            pass
+    else:
+        return util.get_logger()
 
 
 def reset_multiprocessing_logger():
-    if mputil and hasattr(mputil, '_logger'):
-        mputil._logger = None
+    try:
+        from billiard import util
+    except ImportError:
+        pass
+    else:
+        if hasattr(util, '_logger'):
+            util._logger = None
+
+
+def current_process():
+    try:
+        from billiard import process
+    except ImportError:
+        pass
+    else:
+        return process.current_process()
 
 
 def current_process_index(base=1):
-    if current_process:
-        index = getattr(current_process(), 'index', None)
-        return index + base if index is not None else index
+    index = getattr(current_process(), 'index', None)
+    return index + base if index is not None else index

+ 1 - 1
celery/worker/state.py

@@ -90,7 +90,7 @@ C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
 if C_BENCH:  # pragma: no cover
     import atexit
 
-    from billiard import current_process
+    from billiard.process import current_process
     from celery.five import monotonic
     from celery.utils.debug import memdump, sample_mem