Browse Source

Importing .concurrency.eventlet|gevent no longer applies monkey patches

Ask Solem 11 years ago
parent
commit
6354343554

+ 70 - 3
celery/__init__.py

@@ -38,12 +38,12 @@ if os.environ.get('C_IMPDEBUG'):  # pragma: no cover
         return real_import(name, locals, globals, fromlist, level)
     builtins.__import__ = debug_import
 
+# This is never executed, but tricks static analyzers (PyDev, PyCharm,
+# pylint, etc.) into knowing the types of these symbols, and what
+# they contain.
 STATICA_HACK = True
 globals()['kcah_acitats'[::-1].upper()] = False
 if STATICA_HACK:  # pragma: no cover
-    # This is never executed, but tricks static analyzers (PyDev, PyCharm,
-    # pylint, etc.) into knowing the types of these symbols, and what
-    # they contain.
     from celery.app import shared_task                   # noqa
     from celery.app.base import Celery                   # noqa
     from celery.app.utils import bugreport               # noqa
@@ -54,6 +54,72 @@ if STATICA_HACK:  # pragma: no cover
     )
     from celery.utils import uuid                        # noqa
 
+# Eventlet/gevent patching must happen before importing
+# anything else, so these tools must be at top-level.
+
+
+def _find_option_with_arg(argv, short_opts=None, long_opts=None):
+    """Search argv for option specifying its short and longopt
+    alternatives.
+
+    Returns the value of the option if found.
+
+    """
+    for i, arg in enumerate(argv):
+        if arg.startswith('-'):
+            if long_opts and arg.startswith('--'):
+                name, _, val = arg.partition('=')
+                if name in long_opts:
+                    return val
+            if short_opts and arg in short_opts:
+                return argv[i + 1]
+    raise KeyError('|'.join(short_opts or [] + long_opts or []))
+
+
+def _patch_eventlet():
+    import eventlet
+    import eventlet.debug
+    eventlet.monkey_patch()
+    EVENTLET_DBLOCK = int(os.environ.get('EVENTLET_NOBLOCK', 0))
+    if EVENTLET_DBLOCK:
+        eventlet.debug.hub_blocking_detection(EVENTLET_DBLOCK)
+
+
+def _patch_gevent():
+    from gevent import monkey, version_info
+    monkey.patch_all()
+    if version_info[0] == 0:  # pragma: no cover
+        # Signals aren't working in gevent versions <1.0,
+        # and are not monkey patched by patch_all()
+        from gevent import signal as _gevent_signal
+        _signal = __import__('signal')
+        _signal.signal = _gevent_signal
+
+
+def maybe_patch_concurrency(argv=sys.argv,
+                            short_opts=['-P'], long_opts=['--pool'],
+                            patches={'eventlet': _patch_eventlet,
+                                     'gevent': _patch_gevent}):
+    """With short and long opt alternatives that specify the command line
+    option to set the pool, this makes sure that anything that needs
+    to be patched is completed as early as possible.
+    (e.g. eventlet/gevent monkey patches)."""
+    try:
+        pool = _find_option_with_arg(argv, short_opts, long_opts)
+    except KeyError:
+        pass
+    else:
+        try:
+            patcher = patches[pool]
+        except KeyError:
+            pass
+        else:
+            print('PATCHING CONCURRENCY USING %r' % (patcher, ))
+            patcher()
+        # set up eventlet/gevent environments ASAP.
+        from celery import concurrency
+        concurrency.get_implementation(pool)
+
 # Lazy loading
 from .five import recreate_module
 
@@ -73,6 +139,7 @@ old_module, new_module = recreate_module(  # pragma: no cover
     __author__=__author__, __contact__=__contact__,
     __homepage__=__homepage__, __docformat__=__docformat__,
     VERSION=VERSION, SERIES=SERIES, VERSION_BANNER=VERSION_BANNER,
+    maybe_patch_concurrency=maybe_patch_concurrency,
 )
 
 

+ 2 - 5
celery/__main__.py

@@ -4,6 +4,8 @@ import sys
 
 from os.path import basename
 
+from . import maybe_patch_concurrency
+
 __all__ = ['main']
 
 DEPRECATED_FMT = """
@@ -21,11 +23,6 @@ def _warn_deprecated(new):
     )
 
 
-def maybe_patch_concurrency():
-    from celery.platforms import maybe_patch_concurrency
-    maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
-
-
 def main():
     maybe_patch_concurrency()
     from celery.bin.celery import main

+ 4 - 8
celery/bin/base.py

@@ -79,13 +79,10 @@ from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
 from pprint import pformat
 from types import ModuleType
 
-import celery
+from celery import VERSION_BANNER, Celery, maybe_patch_concurrency
 from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
 from celery.five import items, string, string_t, values
-from celery.platforms import (
-    EX_FAILURE, EX_OK, EX_USAGE,
-    maybe_patch_concurrency,
-)
+from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
 from celery.utils.imports import symbol_by_name, import_from_cwd
@@ -181,7 +178,7 @@ class Command(object):
     args = ''
 
     #: Application version.
-    version = celery.VERSION_BANNER
+    version = VERSION_BANNER
 
     #: If false the parser will raise an exception if positional
     #: args are provided.
@@ -429,7 +426,7 @@ class Command(object):
             if self.enable_config_from_cmdline:
                 argv = self.process_cmdline_config(argv)
         else:
-            self.app = celery.Celery()
+            self.app = Celery()
         return argv
 
     def find_app(self, app):
@@ -448,7 +445,6 @@ class Command(object):
                                              app.replace(':', '')))
                     except ImportError:
                         pass
-                from celery.app.base import Celery
                 for suspect in values(vars(sym)):
                     if isinstance(suspect, Celery):
                         return suspect

+ 0 - 11
celery/concurrency/eventlet.py

@@ -13,8 +13,6 @@ import sys
 
 __all__ = ['TaskPool']
 
-EVENTLET_NOPATCH = os.environ.get('EVENTLET_NOPATCH', False)
-EVENTLET_DBLOCK = int(os.environ.get('EVENTLET_NOBLOCK', 0))
 W_RACE = """\
 Celery module with %s imported before eventlet patched\
 """
@@ -30,15 +28,6 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             warnings.warn(RuntimeWarning(W_RACE % side))
 
 
-PATCHED = [0]
-if not EVENTLET_NOPATCH and not PATCHED[0]:  # pragma: no cover
-    PATCHED[0] += 1
-    import eventlet
-    import eventlet.debug
-    eventlet.monkey_patch()
-    if EVENTLET_DBLOCK:
-        eventlet.debug.hub_blocking_detection(EVENTLET_DBLOCK)
-
 from time import time
 
 from celery import signals

+ 0 - 14
celery/concurrency/gevent.py

@@ -8,20 +8,6 @@
 """
 from __future__ import absolute_import
 
-import os
-
-PATCHED = [0]
-if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
-    PATCHED[0] += 1
-    from gevent import monkey, version_info
-    monkey.patch_all()
-    if version_info[0] == 0:  # pragma: no cover
-        # Signals aren't working in gevent versions <1.0,
-        # and are not monkey patched by patch_all()
-        from gevent import signal as _gevent_signal
-        _signal = __import__('signal')
-        _signal.signal = _gevent_signal
-
 try:
     from gevent import Timeout
 except ImportError:  # pragma: no cover

+ 0 - 33
celery/platforms.py

@@ -75,39 +75,6 @@ def pyimplementation():
         return 'CPython'
 
 
-def _find_option_with_arg(argv, short_opts=None, long_opts=None):
-    """Search argv for option specifying its short and longopt
-    alternatives.
-
-    Returns the value of the option if found.
-
-    """
-    for i, arg in enumerate(argv):
-        if arg.startswith('-'):
-            if long_opts and arg.startswith('--'):
-                name, _, val = arg.partition('=')
-                if name in long_opts:
-                    return val
-            if short_opts and arg in short_opts:
-                return argv[i + 1]
-    raise KeyError('|'.join(short_opts or [] + long_opts or []))
-
-
-def maybe_patch_concurrency(argv, short_opts=None, long_opts=None):
-    """With short and long opt alternatives that specify the command line
-    option to set the pool, this makes sure that anything that needs
-    to be patched is completed as early as possible.
-    (e.g. eventlet/gevent monkey patches)."""
-    try:
-        pool = _find_option_with_arg(argv, short_opts, long_opts)
-    except KeyError:
-        pass
-    else:
-        # set up eventlet/gevent environments ASAP.
-        from celery import concurrency
-        concurrency.get_implementation(pool)
-
-
 class LockFailed(Exception):
     """Raised if a pidlock can't be acquired."""
 

+ 6 - 10
celery/tests/__init__.py

@@ -14,18 +14,14 @@ except NameError:
     class WindowsError(Exception):
         pass
 
-os.environ.update(
-    #: warn if config module not found
-    C_WNOCONF='yes',
-    EVENTLET_NOPATCH='yes',
-    GEVENT_NOPATCH='yes',
-    KOMBU_DISABLE_LIMIT_PROTECTION='yes',
-    # virtual.QoS will not do sanity assertions when this is set.
-    KOMBU_UNITTEST='yes',
-)
-
 
 def setup():
+    os.environ.update(
+        # warn if config module not found
+        C_WNOCONF='yes',
+        KOMBU_DISABLE_LIMIT_PROTECTION='yes',
+    )
+
     if os.environ.get('COVER_ALL_MODULES') or '--with-coverage3' in sys.argv:
         from warnings import catch_warnings
         with catch_warnings(record=True):

+ 1 - 1
celery/tests/bin/test_celery.py

@@ -41,7 +41,7 @@ class test__main__(AppCase):
             self.assertIn('YADDA YADDA', stdout.getvalue())
 
     def test_maybe_patch_concurrency(self):
-        with patch('celery.platforms.maybe_patch_concurrency') as _mpc:
+        with patch('celery.maybe_patch_concurrency') as _mpc:
             __main__.maybe_patch_concurrency()
             _mpc.assert_called_with(sys.argv, ['-P'], ['--pool'])
 

+ 4 - 13
celery/tests/concurrency/test_eventlet.py

@@ -41,19 +41,10 @@ class EventletCase(AppCase):
 class test_aaa_eventlet_patch(EventletCase):
 
     def test_aaa_is_patched(self):
-        raise SkipTest('side effects')
-        monkey_patched = []
-        prev_monkey_patch = self.eventlet.monkey_patch
-        self.eventlet.monkey_patch = lambda: monkey_patched.append(True)
-        prev_eventlet = sys.modules.pop('celery.concurrency.eventlet', None)
-        os.environ.pop('EVENTLET_NOPATCH')
-        try:
-            import celery.concurrency.eventlet  # noqa
-            self.assertTrue(monkey_patched)
-        finally:
-            sys.modules['celery.concurrency.eventlet'] = prev_eventlet
-            os.environ['EVENTLET_NOPATCH'] = 'yes'
-            self.eventlet.monkey_patch = prev_monkey_patch
+        with patch('eventlet.monkey_patch', create=True) as monkey_patch:
+            from celery import maybe_patch_concurrency
+            maybe_patch_concurrency(['x', '-P', 'eventlet'])
+            monkey_patch.assert_called_with()
 
 
 eventlet_modules = (

+ 7 - 16
celery/tests/concurrency/test_gevent.py

@@ -14,7 +14,7 @@ from celery.concurrency.gevent import (
 )
 
 from celery.tests.case import (
-    AppCase, mock_module, patch_many, skip_if_pypy,
+    AppCase, mock_module, patch, patch_many, skip_if_pypy,
 )
 
 gevent_modules = (
@@ -41,21 +41,12 @@ class test_gevent_patch(GeventCase):
 
     def test_is_patched(self):
         with mock_module(*gevent_modules):
-            monkey_patched = []
-            import gevent
-            from gevent import monkey
-            gevent.version_info = (1, 0, 0)
-            prev_monkey_patch = monkey.patch_all
-            monkey.patch_all = lambda: monkey_patched.append(True)
-            prev_gevent = sys.modules.pop('celery.concurrency.gevent', None)
-            os.environ.pop('GEVENT_NOPATCH')
-            try:
-                import celery.concurrency.gevent  # noqa
-                self.assertTrue(monkey_patched)
-            finally:
-                sys.modules['celery.concurrency.gevent'] = prev_gevent
-                os.environ['GEVENT_NOPATCH'] = 'yes'
-                monkey.patch_all = prev_monkey_patch
+            with patch('gevent.monkey.patch_all', create=True) as patch_all:
+                import gevent
+                gevent.version_info = (1, 0, 0)
+                from celery import maybe_patch_concurrency
+                maybe_patch_concurrency(['x', '-P', 'gevent'])
+                self.assertTrue(patch_all.called)
 
 
 class test_Schedule(AppCase):

+ 2 - 3
celery/worker/control.py

@@ -250,9 +250,8 @@ def hello(state, from_node, revoked=None, **kwargs):
     if from_node != state.hostname:
         logger.info('sync with %s', from_node)
         worker_state.revoked.update(revoked)
-        return  {'revoked': worker_state.revoked._data,
-                 'clock': state.app.clock.forward()}
-
+        return {'revoked': worker_state.revoked._data,
+                'clock': state.app.clock.forward()}
 
 
 @Panel.register

+ 0 - 15
docs/conf.py

@@ -3,11 +3,6 @@
 import sys
 import os
 
-# eventlet/gevent should not monkey patch anything.
-os.environ["GEVENT_NOPATCH"] = "yes"
-os.environ["EVENTLET_NOPATCH"] = "yes"
-os.environ["CELERY_LOADER"] = "default"
-
 this = os.path.dirname(os.path.abspath(__file__))
 
 # If your extensions are in another directory, add it here. If the directory
@@ -17,16 +12,6 @@ sys.path.append(os.path.join(os.pardir, "tests"))
 sys.path.append(os.path.join(this, "_ext"))
 import celery
 
-
-# use app loader
-from celery import Celery
-app = Celery(set_as_current=True)
-app.conf.update(BROKER_URL="memory://",
-                CELERY_RESULT_BACKEND="cache",
-                CELERY_CACHE_BACKEND="memory",
-                CELERYD_HIJACK_ROOT_LOGGER=False,
-                CELERYD_LOG_COLOR=False)
-
 # General configuration
 # ---------------------