Selaa lähdekoodia

Don't import celery.utils package before concurrency setup

Ask Solem 12 vuotta sitten
vanhempi
commit
76c4769de7

+ 2 - 2
celery/__init__.py

@@ -22,6 +22,8 @@ __all__ = [
 ]
 VERSION_BANNER = '%s (%s)' % (__version__, SERIES)
 
+# -eof meta-
+
 # This is for static analyzers
 Celery = object
 bugreport = lambda *a, **kw: None
@@ -38,8 +40,6 @@ xmap = lambda *a, **kw: None
 xstarmap = lambda *a, **kw: None
 uuid = lambda: None
 
-# -eof meta-
-
 # Lazy loading
 from .__compat__ import recreate_module
 

+ 1 - 1
celery/concurrency/__init__.py

@@ -8,7 +8,7 @@
 """
 from __future__ import absolute_import
 
-from celery.utils.imports import symbol_by_name
+from celery.local import symbol_by_name
 
 ALIASES = {
     'processes': 'celery.concurrency.processes:TaskPool',

+ 9 - 2
celery/concurrency/eventlet.py

@@ -9,11 +9,18 @@
 from __future__ import absolute_import
 
 import os
-if not os.environ.get('EVENTLET_NOPATCH'):
+
+EVENTLET_NOPATCH = int(os.environ.get('EVENTLET_NOPATCH', 0))
+EVENTLET_DBLOCK = int(os.environ.get('EVENTLET_NOBLOCK', 0))
+
+PATCHED = [0]
+if not EVENTLET_NOPATCH and not PATCHED[0]:
+    PATCHED[0] += 1
     import eventlet
     import eventlet.debug
     eventlet.monkey_patch()
-    eventlet.debug.hub_prevent_multiple_readers(False)
+    eventlet.debug.hub_prevent_multiple_readers(True)
+    eventlet.debug.hub_blocking_detection(EVENTLET_DBLOCK)
 
 from time import time
 

+ 4 - 1
celery/concurrency/gevent.py

@@ -9,7 +9,10 @@
 from __future__ import absolute_import
 
 import os
-if not os.environ.get('GEVENT_NOPATCH'):
+
+PATCHED = [0]
+if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
+    PATCHED[0] += 1
     from gevent import monkey
     monkey.patch_all()
 

+ 60 - 221
celery/local.py

@@ -12,27 +12,74 @@
 """
 from __future__ import absolute_import
 
-# since each thread has its own greenlet we can just use those as identifiers
-# for the context.  If greenlets are not available we fall back to the
-# current thread ident.
-try:
-    from greenlet import getcurrent as get_ident
-except ImportError:  # pragma: no cover
+import importlib
+import sys
+
+
+def symbol_by_name(name, aliases={}, imp=None, package=None,
+        sep='.', default=None, **kwargs):
+    """Get symbol by qualified name.
+
+    The name should be the full dot-separated path to the class::
+
+        modulename.ClassName
+
+    Example::
+
+        celery.concurrency.processes.TaskPool
+                                    ^- class name
+
+    or using ':' to separate module and symbol::
+
+        celery.concurrency.processes:TaskPool
+
+    If `aliases` is provided, a dict containing short name/long name
+    mappings, the name is looked up in the aliases first.
+
+    Examples:
+
+        >>> symbol_by_name('celery.concurrency.processes.TaskPool')
+        <class 'celery.concurrency.processes.TaskPool'>
+
+        >>> symbol_by_name('default', {
+        ...     'default': 'celery.concurrency.processes.TaskPool'})
+        <class 'celery.concurrency.processes.TaskPool'>
+
+        # Does not try to look up non-string names.
+        >>> from celery.concurrency.processes import TaskPool
+        >>> symbol_by_name(TaskPool) is TaskPool
+        True
+
+    """
+    if imp is None:
+        imp = importlib.import_module
+
+    if not isinstance(name, basestring):
+        return name                                 # already a class
+
+    name = aliases.get(name) or name
+    sep = ':' if ':' in name else sep
+    module_name, _, cls_name = name.rpartition(sep)
+    if not module_name:
+        cls_name, module_name = None, package if package else cls_name
     try:
-        from thread import get_ident  # noqa
-    except ImportError:  # pragma: no cover
         try:
-            from dummy_thread import get_ident  # noqa
-        except ImportError:  # pragma: no cover
-            from _thread import get_ident  # noqa
+            module = imp(module_name, package=package, **kwargs)
+        except ValueError, exc:
+            raise ValueError, ValueError(
+                    "Couldn't import %r: %s" % (name, exc)), sys.exc_info()[2]
+        return getattr(module, cls_name) if cls_name else module
+    except (ImportError, AttributeError):
+        if default is None:
+            raise
+    return default
 
 
 def try_import(module, default=None):
     """Try to import and return module, or return
     None if the module does not exist."""
-    from importlib import import_module
     try:
-        return import_module(module)
+        return importlib.import_module(module)
     except ImportError:
         return default
 
@@ -221,211 +268,3 @@ def maybe_evaluate(obj):
         return obj.__maybe_evaluate__()
     except AttributeError:
         return obj
-
-
-def release_local(local):
-    """Releases the contents of the local for the current context.
-    This makes it possible to use locals without a manager.
-
-    Example::
-
-        >>> loc = Local()
-        >>> loc.foo = 42
-        >>> release_local(loc)
-        >>> hasattr(loc, 'foo')
-        False
-
-    With this function one can release :class:`Local` objects as well
-    as :class:`StackLocal` objects.  However it is not possible to
-    release data held by proxies that way, one always has to retain
-    a reference to the underlying local object in order to be able
-    to release it.
-
-    .. versionadded:: 0.6.1
-    """
-    local.__release_local__()
-
-
-class Local(object):
-    __slots__ = ('__storage__', '__ident_func__')
-
-    def __init__(self):
-        object.__setattr__(self, '__storage__', {})
-        object.__setattr__(self, '__ident_func__', get_ident)
-
-    def __iter__(self):
-        return iter(self.__storage__.items())
-
-    def __call__(self, proxy):
-        """Create a proxy for a name."""
-        return Proxy(self, proxy)
-
-    def __release_local__(self):
-        self.__storage__.pop(self.__ident_func__(), None)
-
-    def __getattr__(self, name):
-        try:
-            return self.__storage__[self.__ident_func__()][name]
-        except KeyError:
-            raise AttributeError(name)
-
-    def __setattr__(self, name, value):
-        ident = self.__ident_func__()
-        storage = self.__storage__
-        try:
-            storage[ident][name] = value
-        except KeyError:
-            storage[ident] = {name: value}
-
-    def __delattr__(self, name):
-        try:
-            del self.__storage__[self.__ident_func__()][name]
-        except KeyError:
-            raise AttributeError(name)
-
-
-class LocalStack(object):
-    """This class works similar to a :class:`Local` but keeps a stack
-    of objects instead.  This is best explained with an example::
-
-        >>> ls = LocalStack()
-        >>> ls.push(42)
-        >>> ls.top
-        42
-        >>> ls.push(23)
-        >>> ls.top
-        23
-        >>> ls.pop()
-        23
-        >>> ls.top
-        42
-
-    They can be force released by using a :class:`LocalManager` or with
-    the :func:`release_local` function but the correct way is to pop the
-    item from the stack after using.  When the stack is empty it will
-    no longer be bound to the current context (and as such released).
-
-    By calling the stack without arguments it returns a proxy that resolves to
-    the topmost item on the stack.
-
-    """
-
-    def __init__(self):
-        self._local = Local()
-
-    def __release_local__(self):
-        self._local.__release_local__()
-
-    def _get__ident_func__(self):
-        return self._local.__ident_func__
-
-    def _set__ident_func__(self, value):
-        object.__setattr__(self._local, '__ident_func__', value)
-    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
-    del _get__ident_func__, _set__ident_func__
-
-    def __call__(self):
-        def _lookup():
-            rv = self.top
-            if rv is None:
-                raise RuntimeError('object unbound')
-            return rv
-        return Proxy(_lookup)
-
-    def push(self, obj):
-        """Pushes a new item to the stack"""
-        rv = getattr(self._local, 'stack', None)
-        if rv is None:
-            self._local.stack = rv = []
-        rv.append(obj)
-        return rv
-
-    def pop(self):
-        """Removes the topmost item from the stack, will return the
-        old value or `None` if the stack was already empty.
-        """
-        stack = getattr(self._local, 'stack', None)
-        if stack is None:
-            return None
-        elif len(stack) == 1:
-            release_local(self._local)
-            return stack[-1]
-        else:
-            return stack.pop()
-
-    @property
-    def stack(self):
-        """get_current_worker_task uses this to find
-        the original task that was executed by the worker."""
-        stack = getattr(self._local, 'stack', None)
-        if stack is not None:
-            return stack
-        return []
-
-    @property
-    def top(self):
-        """The topmost item on the stack.  If the stack is empty,
-        `None` is returned.
-        """
-        try:
-            return self._local.stack[-1]
-        except (AttributeError, IndexError):
-            return None
-
-
-class LocalManager(object):
-    """Local objects cannot manage themselves. For that you need a local
-    manager.  You can pass a local manager multiple locals or add them later
-    by appending them to `manager.locals`.  Everytime the manager cleans up
-    it, will clean up all the data left in the locals for this context.
-
-    The `ident_func` parameter can be added to override the default ident
-    function for the wrapped locals.
-
-    .. versionchanged:: 0.6.1
-       Instead of a manager the :func:`release_local` function can be used
-       as well.
-
-    .. versionchanged:: 0.7
-       `ident_func` was added.
-    """
-
-    def __init__(self, locals=None, ident_func=None):
-        if locals is None:
-            self.locals = []
-        elif isinstance(locals, Local):
-            self.locals = [locals]
-        else:
-            self.locals = list(locals)
-        if ident_func is not None:
-            self.ident_func = ident_func
-            for local in self.locals:
-                object.__setattr__(local, '__ident_func__', ident_func)
-        else:
-            self.ident_func = get_ident
-
-    def get_ident(self):
-        """Return the context identifier the local objects use internally for
-        this context.  You cannot override this method to change the behavior
-        but use it to link other context local objects (such as SQLAlchemy's
-        scoped sessions) to the Werkzeug locals.
-
-        .. versionchanged:: 0.7
-           You can pass a different ident function to the local manager that
-           will then be propagated to all the locals passed to the
-           constructor.
-        """
-        return self.ident_func()
-
-    def cleanup(self):
-        """Manually clean up the data in the locals for this context.  Call
-        this at the end of the request or use `make_middleware()`.
-        """
-        for local in self.locals:
-            release_local(local)
-
-    def __repr__(self):
-        return '<%s storages: %d>' % (
-            self.__class__.__name__,
-            len(self.locals)
-        )

+ 5 - 59
celery/utils/imports.py

@@ -16,6 +16,11 @@ import sys
 
 from contextlib import contextmanager
 
+# symbol_by_name was moved to local because it's used
+# early in the import stage, where celery.utils loads
+# too much (e.g. for eventlet patching)
+from celery.local import symbol_by_name
+
 from .compat import reload
 
 
@@ -37,65 +42,6 @@ else:
         return '%s.%s' % (obj.__module__, obj.__name__)
 
 
-def symbol_by_name(name, aliases={}, imp=None, package=None,
-        sep='.', default=None, **kwargs):
-    """Get symbol by qualified name.
-
-    The name should be the full dot-separated path to the class::
-
-        modulename.ClassName
-
-    Example::
-
-        celery.concurrency.processes.TaskPool
-                                    ^- class name
-
-    or using ':' to separate module and symbol::
-
-        celery.concurrency.processes:TaskPool
-
-    If `aliases` is provided, a dict containing short name/long name
-    mappings, the name is looked up in the aliases first.
-
-    Examples:
-
-        >>> symbol_by_name('celery.concurrency.processes.TaskPool')
-        <class 'celery.concurrency.processes.TaskPool'>
-
-        >>> symbol_by_name('default', {
-        ...     'default': 'celery.concurrency.processes.TaskPool'})
-        <class 'celery.concurrency.processes.TaskPool'>
-
-        # Does not try to look up non-string names.
-        >>> from celery.concurrency.processes import TaskPool
-        >>> symbol_by_name(TaskPool) is TaskPool
-        True
-
-    """
-    if imp is None:
-        imp = importlib.import_module
-
-    if not isinstance(name, basestring):
-        return name                                 # already a class
-
-    name = aliases.get(name) or name
-    sep = ':' if ':' in name else sep
-    module_name, _, cls_name = name.rpartition(sep)
-    if not module_name:
-        cls_name, module_name = None, package if package else cls_name
-    try:
-        try:
-            module = imp(module_name, package=package, **kwargs)
-        except ValueError, exc:
-            raise ValueError, ValueError(
-                    "Couldn't import %r: %s" % (name, exc)), sys.exc_info()[2]
-        return getattr(module, cls_name) if cls_name else module
-    except (ImportError, AttributeError):
-        if default is None:
-            raise
-    return default
-
-
 def instantiate(name, *args, **kwargs):
     """Instantiate class by name.