Преглед на файлове

Use celery.__main__ to really patch eventlet as *early* as possible.

Ask Solem преди 12 години
родител
ревизия
41d90c9f8a
променени са 4 файла, в които са добавени 255 реда и са изтрити 23 реда
  1. 13 0
      celery/__main__.py
  2. 3 6
      celery/bin/celery.py
  3. 231 15
      celery/utils/threads.py
  4. 8 2
      setup.py

+ 13 - 0
celery/__main__.py

@@ -0,0 +1,13 @@
+from __future__ import absolute_import
+
+import sys
+
+
+def main():
+    from celery.platforms import maybe_patch_concurrency
+    maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
+    from celery.bin.celery import main
+    main()
+
+if __name__ == '__main__':
+    main()

+ 3 - 6
celery/bin/celery.py

@@ -8,12 +8,9 @@ The :program:`celery` umbrella command.
 """
 from __future__ import absolute_import, print_function
 
-import sys
-from celery.platforms import maybe_patch_concurrency
-maybe_patch_concurrency(sys.argv, ['-P'], ['--pool'])
-
 import anyjson
 import warnings
+import sys
 
 from future_builtins import map
 
@@ -932,7 +929,7 @@ def determine_exit_status(ret):
     return EX_OK if ret else EX_FAILURE
 
 
-def main():
+def main(argv=None):
     # Fix for setuptools generated scripts, so that it will
     # work with multiprocessing fork emulation.
     # (see multiprocessing.forking.get_preparation_data())
@@ -943,7 +940,7 @@ def main():
         cmd.maybe_patch_concurrency()
         from billiard import freeze_support
         freeze_support()
-        cmd.execute_from_commandline()
+        cmd.execute_from_commandline(argv)
     except KeyboardInterrupt:
         pass
 

+ 231 - 15
celery/utils/threads.py

@@ -15,6 +15,8 @@ import traceback
 
 from kombu.syn import detect_environment
 
+from celery.local import Proxy
+
 USE_PURE_LOCALS = os.environ.get('USE_PURE_LOCALS')
 
 
@@ -70,20 +72,234 @@ class bgThread(threading.Thread):
         if self.is_alive():
             self.join(1e100)
 
+try:
+    from greenlet import getcurrent as get_ident
+except ImportError:  # pragma: no cover
+    try:
+        from thread import get_ident  # noqa
+    except ImportError:  # pragma: no cover
+        try:
+            from dummy_thread import get_ident  # noqa
+        except ImportError:  # pragma: no cover
+            from _thread import get_ident  # noqa
+
+
+def release_local(local):
+    """Releases the contents of the local for the current context.
+    This makes it possible to use locals without a manager.
+
+    Example::
+
+        >>> loc = Local()
+        >>> loc.foo = 42
+        >>> release_local(loc)
+        >>> hasattr(loc, 'foo')
+        False
+
+    With this function one can release :class:`Local` objects as well
+    as :class:`StackLocal` objects.  However it is not possible to
+    release data held by proxies that way, one always has to retain
+    a reference to the underlying local object in order to be able
+    to release it.
+
+    .. versionadded:: 0.6.1
+    """
+    local.__release_local__()
+
+
+class Local(object):
+    __slots__ = ('__storage__', '__ident_func__')
+
+    def __init__(self):
+        object.__setattr__(self, '__storage__', {})
+        object.__setattr__(self, '__ident_func__', get_ident)
+
+    def __iter__(self):
+        return iter(self.__storage__.items())
+
+    def __call__(self, proxy):
+        """Create a proxy for a name."""
+        return Proxy(self, proxy)
+
+    def __release_local__(self):
+        self.__storage__.pop(self.__ident_func__(), None)
+
+    def __getattr__(self, name):
+        try:
+            return self.__storage__[self.__ident_func__()][name]
+        except KeyError:
+            raise AttributeError(name)
+
+    def __setattr__(self, name, value):
+        ident = self.__ident_func__()
+        storage = self.__storage__
+        try:
+            storage[ident][name] = value
+        except KeyError:
+            storage[ident] = {name: value}
+
+    def __delattr__(self, name):
+        try:
+            del self.__storage__[self.__ident_func__()][name]
+        except KeyError:
+            raise AttributeError(name)
+
+
+class _LocalStack(object):
+    """This class works similar to a :class:`Local` but keeps a stack
+    of objects instead.  This is best explained with an example::
+
+        >>> ls = LocalStack()
+        >>> ls.push(42)
+        >>> ls.top
+        42
+        >>> ls.push(23)
+        >>> ls.top
+        23
+        >>> ls.pop()
+        23
+        >>> ls.top
+        42
+
+    They can be force released by using a :class:`LocalManager` or with
+    the :func:`release_local` function but the correct way is to pop the
+    item from the stack after using.  When the stack is empty it will
+    no longer be bound to the current context (and as such released).
+
+    By calling the stack without arguments it returns a proxy that
+    resolves to the topmost item on the stack.
+
+    """
+
+    def __init__(self):
+        self._local = Local()
+
+    def __release_local__(self):
+        self._local.__release_local__()
+
+    def _get__ident_func__(self):
+        return self._local.__ident_func__
+
+    def _set__ident_func__(self, value):
+        object.__setattr__(self._local, '__ident_func__', value)
+    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
+    del _get__ident_func__, _set__ident_func__
+
+    def __call__(self):
+        def _lookup():
+            rv = self.top
+            if rv is None:
+                raise RuntimeError('object unbound')
+            return rv
+        return Proxy(_lookup)
+
+    def push(self, obj):
+        """Pushes a new item to the stack"""
+        rv = getattr(self._local, 'stack', None)
+        if rv is None:
+            self._local.stack = rv = []
+        rv.append(obj)
+        return rv
+
+    def pop(self):
+        """Removes the topmost item from the stack, will return the
+        old value or `None` if the stack was already empty.
+        """
+        stack = getattr(self._local, 'stack', None)
+        if stack is None:
+            return None
+        elif len(stack) == 1:
+            release_local(self._local)
+            return stack[-1]
+        else:
+            return stack.pop()
+
+    @property
+    def stack(self):
+        """get_current_worker_task uses this to find
+        the original task that was executed by the worker."""
+        stack = getattr(self._local, 'stack', None)
+        if stack is not None:
+            return stack
+        return []
+
+    @property
+    def top(self):
+        """The topmost item on the stack.  If the stack is empty,
+        `None` is returned.
+        """
+        try:
+            return self._local.stack[-1]
+        except (AttributeError, IndexError):
+            return None
+
+
+class LocalManager(object):
+    """Local objects cannot manage themselves. For that you need a local
+    manager.  You can pass a local manager multiple locals or add them
+    later by appending them to `manager.locals`.  Everytime the manager
+    cleans up it, will clean up all the data left in the locals for this
+    context.
+
+    The `ident_func` parameter can be added to override the default ident
+    function for the wrapped locals.
+
+    """
+
+    def __init__(self, locals=None, ident_func=None):
+        if locals is None:
+            self.locals = []
+        elif isinstance(locals, Local):
+            self.locals = [locals]
+        else:
+            self.locals = list(locals)
+        if ident_func is not None:
+            self.ident_func = ident_func
+            for local in self.locals:
+                object.__setattr__(local, '__ident_func__', ident_func)
+        else:
+            self.ident_func = get_ident
+
+    def get_ident(self):
+        """Return the context identifier the local objects use internally
+        for this context.  You cannot override this method to change the
+        behavior but use it to link other context local objects (such as
+        SQLAlchemy's scoped sessions) to the Werkzeug locals."""
+        return self.ident_func()
+
+    def cleanup(self):
+        """Manually clean up the data in the locals for this context.
+
+        Call this at the end of the request or use `make_middleware()`.
+
+        """
+        for local in self.locals:
+            release_local(local)
+
+    def __repr__(self):
+        return '<{0} storages: {1}>'.format(
+            self.__class__.__name__, len(self.locals))
+
+
+class _FastLocalStack(threading.local):
+
+    def __init__(self):
+        self.stack = []
+        self.push = self.stack.append
+        self.pop = self.stack.pop
+
+    @property
+    def top(self):
+        try:
+            return self.stack[-1]
+        except (AttributeError, IndexError):
+            return None
+
 if detect_environment() == 'default' and not USE_PURE_LOCALS:
-    class LocalStack(threading.local):
-
-        def __init__(self):
-            self.stack = []
-            self.push = self.stack.append
-            self.pop = self.stack.pop
-
-        @property
-        def top(self):
-            try:
-                return self.stack[-1]
-            except (AttributeError, IndexError):
-                return None
+    LocalStack = _FastLocalStack
 else:
-    # See #706
-    from celery.local import LocalStack  # noqa
+    # - See #706
+    # since each thread has its own greenlet we can just use those as
+    # identifiers for the context.  If greenlets are not available we
+    # fall back to the  current thread ident.
+    LocalStack = _LocalStack  # noqa

+ 8 - 2
setup.py

@@ -15,6 +15,8 @@ import os
 import sys
 import codecs
 
+CELERY_COMPAT_PROGRAMS = os.environ.get('CELERY_COMPAT_PROGRAMS')
+
 if sys.version_info < (2, 6):
     raise Exception('Celery 3.1 requires Python 2.6 or higher.')
 
@@ -175,14 +177,18 @@ else:
 # -*- Entry Points -*- #
 
 console_scripts = entrypoints['console_scripts'] = [
-        'celery = celery.bin.celery:main',
+    'celery = celery.__main__:main',
+]
+
+if CELERY_COMPAT_PROGRAMS:
+    console_scripts.extend([
         'celeryd = celery.bin.celeryd:main',
         'celerybeat = celery.bin.celerybeat:main',
         'camqadm = celery.bin.camqadm:main',
         'celeryev = celery.bin.celeryev:main',
         'celeryctl = celery.bin.celeryctl:main',
         'celeryd-multi = celery.bin.celeryd_multi:main',
-]
+    ])
 
 # bundles: Only relevant for Celery developers.
 entrypoints['bundle.bundles'] = ['celery = celery.contrib.bundles:bundles']