Browse Source

Use complete stack implemenation for eventlet/gevent closes #706

Ask Solem 13 years ago
parent
commit
40d2302e53
4 changed files with 125 additions and 18 deletions
  1. 3 3
      celery/bin/base.py
  2. 13 1
      celery/bin/celeryd.py
  3. 90 0
      celery/local.py
  4. 19 14
      celery/utils/threads.py

+ 3 - 3
celery/bin/base.py

@@ -70,7 +70,7 @@ from collections import defaultdict
 from optparse import OptionParser, make_option as Option
 from types import ModuleType
 
-from celery import Celery, __version__
+import celery
 from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
 from celery.platforms import EX_FAILURE, EX_USAGE
 from celery.utils.imports import symbol_by_name, import_from_cwd
@@ -100,7 +100,7 @@ class Command(object):
     args = ''
 
     #: Application version.
-    version = __version__
+    version = celery.__version__
 
     #: If false the parser will raise an exception if positional
     #: args are provided.
@@ -309,7 +309,7 @@ class Command(object):
         return options
 
     def _get_default_app(self, *args, **kwargs):
-        return Celery(*args, **kwargs)
+        return celery.Celery(*args, **kwargs)
 
 
 def daemon_options(default_pidfile=None, default_logfile=None):

+ 13 - 1
celery/bin/celeryd.py

@@ -117,6 +117,7 @@ import sys
 
 from billiard import freeze_support
 
+from celery import concurrency
 from celery.bin.base import Command, Option
 from celery.utils.log import LOG_LEVELS, mlevel
 
@@ -127,11 +128,22 @@ class WorkerCommand(Command):
     enable_config_from_cmdline = True
     supports_args = False
 
+    def execute_from_commandline(self, argv=None):
+        if argv is None:
+            argv = list(sys.argv)
+        try:
+            pool = argv[argv.index('-P') + 1]
+        except ValueError:
+            pass
+        else:
+            # set up eventlet/gevent environments ASAP.
+            concurrency.get_implementation(pool)
+        return super(WorkerCommand, self).execute_from_commandline(argv)
+
     def run(self, *args, **kwargs):
         kwargs.pop("app", None)
         # Pools like eventlet/gevent needs to patch libs as early
         # as possible.
-        from celery import concurrency
         kwargs["pool_cls"] = concurrency.get_implementation(
                     kwargs.get("pool_cls") or self.app.conf.CELERYD_POOL)
         if self.app.IS_WINDOWS and kwargs.get("beat"):

+ 90 - 0
celery/local.py

@@ -280,6 +280,96 @@ class Local(object):
             raise AttributeError(name)
 
 
+class LocalStack(object):
+    """This class works similar to a :class:`Local` but keeps a stack
+    of objects instead.  This is best explained with an example::
+
+        >>> ls = LocalStack()
+        >>> ls.push(42)
+        >>> ls.top
+        42
+        >>> ls.push(23)
+        >>> ls.top
+        23
+        >>> ls.pop()
+        23
+        >>> ls.top
+        42
+
+    They can be force released by using a :class:`LocalManager` or with
+    the :func:`release_local` function but the correct way is to pop the
+    item from the stack after using.  When the stack is empty it will
+    no longer be bound to the current context (and as such released).
+
+    By calling the stack without arguments it returns a proxy that resolves to
+    the topmost item on the stack.
+
+    """
+
+    def __init__(self):
+        self._local = Local()
+
+    def __release_local__(self):
+        self._local.__release_local__()
+
+    def _get__ident_func__(self):
+        return self._local.__ident_func__
+
+    def _set__ident_func__(self, value):
+        object.__setattr__(self._local, '__ident_func__', value)
+    __ident_func__ = property(_get__ident_func__, _set__ident_func__)
+    del _get__ident_func__, _set__ident_func__
+
+    def __call__(self):
+        def _lookup():
+            rv = self.top
+            if rv is None:
+                raise RuntimeError('object unbound')
+            return rv
+        return Proxy(_lookup)
+
+    def push(self, obj):
+        """Pushes a new item to the stack"""
+        rv = getattr(self._local, 'stack', None)
+        if rv is None:
+            self._local.stack = rv = []
+        rv.append(obj)
+        return rv
+
+    def pop(self):
+        """Removes the topmost item from the stack, will return the
+        old value or `None` if the stack was already empty.
+        """
+        stack = getattr(self._local, 'stack', None)
+        if stack is None:
+            return None
+        elif len(stack) == 1:
+            release_local(self._local)
+            return stack[-1]
+        else:
+            return stack.pop()
+
+    @property
+    def stack(self):
+        """get_current_worker_task uses this to find
+        the original task that was executed by the worker."""
+        stack = getattr(self._local, 'stack', None)
+        if stack is not None:
+            return stack
+        return []
+
+    @property
+    def top(self):
+        """The topmost item on the stack.  If the stack is empty,
+        `None` is returned.
+        """
+        try:
+            return self._local.stack[-1]
+        except (AttributeError, IndexError):
+            return None
+
+
+
 class LocalManager(object):
     """Local objects cannot manage themselves. For that you need a local
     manager.  You can pass a local manager multiple locals or add them later

+ 19 - 14
celery/utils/threads.py

@@ -5,6 +5,8 @@ import sys
 import threading
 import traceback
 
+from kombu.syn import detect_environment
+
 _Thread = threading.Thread
 _Event = threading._Event
 
@@ -82,17 +84,20 @@ class bgThread(Thread):
         if self.is_alive():
             self.join(1e100)
 
-
-class LocalStack(threading.local):
-
-    def __init__(self):
-        self.stack = []
-        self.push = self.stack.append
-        self.pop = self.stack.pop
-
-    @property
-    def top(self):
-        try:
-            return self.stack[-1]
-        except (AttributeError, IndexError):
-            return None
+if detect_environment() == "default":
+    class LocalStack(threading.local):
+
+        def __init__(self):
+            self.stack = []
+            self.push = self.stack.append
+            self.pop = self.stack.pop
+
+        @property
+        def top(self):
+            try:
+                return self.stack[-1]
+            except (AttributeError, IndexError):
+                return None
+else:
+    # See #706
+    from celery.local import LocalStack