Kaynağa Gözat

Merge branch 'master' into release23-maint

Ask Solem 13 yıl önce
ebeveyn
işleme
4afdb39ce3
7 değiştirilmiş dosya ile 27 ekleme ve 221 silme
  1. 1 1
      AUTHORS
  2. 10 2
      celery/backends/base.py
  3. 11 3
      celery/datastructures.py
  4. 1 213
      celery/utils/compat.py
  5. 1 0
      requirements/py25.txt
  6. 1 0
      requirements/py26.txt
  7. 2 2
      setup.py

+ 1 - 1
AUTHORS

@@ -77,4 +77,4 @@ Ordered by date of first contribution:
   Mauro Rocco <fireantology@gmail.com>
   Matthew J Morrison <mattj.morrison@gmail.com>
   Daniel Watkins <daniel@daniel-watkins.co.uk>
-  rnoel <rnoel@ltutech.com>
+  Remy Noel <mocramis@gmail.com>

+ 10 - 2
celery/backends/base.py

@@ -219,7 +219,11 @@ class BaseDictBackend(BaseBackend):
 
     def get_task_meta(self, task_id, cache=True):
         if cache and task_id in self._cache:
-            return self._cache[task_id]
+            try:
+              return self._cache[task_id]
+            except KeyError:
+              #The Backend has been emptied in the meantime
+              pass
 
         meta = self._get_task_meta_for(task_id)
         if cache and meta.get("status") == states.SUCCESS:
@@ -235,7 +239,11 @@ class BaseDictBackend(BaseBackend):
 
     def get_taskset_meta(self, taskset_id, cache=True):
         if cache and taskset_id in self._cache:
-            return self._cache[taskset_id]
+            try:
+              return self._cache[taskset_id]
+            except KeyError:
+              #The Backend has been emptied in the meantime
+              pass
 
         meta = self._restore_taskset(taskset_id)
         if cache and meta is not None:

+ 11 - 3
celery/datastructures.py

@@ -9,12 +9,14 @@ Custom data structures.
 
 """
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import time
 import traceback
 
 from itertools import chain
 from Queue import Empty
+from threading import RLock
 
 from celery.utils.compat import OrderedDict
 
@@ -299,11 +301,17 @@ class LocalCache(OrderedDict):
     def __init__(self, limit=None):
         super(LocalCache, self).__init__()
         self.limit = limit
+        self.lock = RLock()
 
     def __setitem__(self, key, value):
-        while len(self) >= self.limit:
-            self.popitem(last=False)
-        super(LocalCache, self).__setitem__(key, value)
+        with self.lock:
+            while len(self) >= self.limit:
+                self.popitem(last=False)
+            super(LocalCache, self).__setitem__(key, value)
+
+    def pop(self, key, *args):
+        with self.lock:
+            self.pop(key, *args)
 
 
 class TokenBucket(object):

+ 1 - 213
celery/utils/compat.py

@@ -18,222 +18,10 @@ except ImportError:
         from io import StringIO         # noqa
 
 ############## collections.OrderedDict ######################################
-
-import weakref
-try:
-    from collections import MutableMapping
-except ImportError:
-    from UserDict import DictMixin as MutableMapping  # noqa
-from itertools import imap as _imap
-from operator import eq as _eq
-
-
-class _Link(object):
-    """Doubly linked list."""
-    # next can't be lowercase because 2to3 thinks it's a generator
-    # and renames it to __next__.
-    __slots__ = 'PREV', 'NEXT', 'key', '__weakref__'
-
-
-class CompatOrderedDict(dict, MutableMapping):
-    """Dictionary that remembers insertion order"""
-    # An inherited dict maps keys to values.
-    # The inherited dict provides __getitem__, __len__, __contains__, and get.
-    # The remaining methods are order-aware.
-    # Big-O running times for all methods are the same as for regular
-    # dictionaries.
-
-    # The internal self.__map dictionary maps keys to links in a doubly
-    # linked list.
-    # The circular doubly linked list starts and ends with a sentinel element.
-    # The sentinel element never gets deleted (this simplifies the algorithm).
-    # The prev/next links are weakref proxies (to prevent circular
-    # references).
-    # Individual links are kept alive by the hard reference in self.__map.
-    # Those hard references disappear when a key is deleted from
-    # an OrderedDict.
-
-    __marker = object()
-
-    def __init__(self, *args, **kwds):
-        """Initialize an ordered dictionary.
-
-        Signature is the same as for regular dictionaries, but keyword
-        arguments are not recommended because their insertion order is
-        arbitrary.
-
-        """
-        if len(args) > 1:
-            raise TypeError("expected at most 1 arguments, got %d" % (
-                                len(args)))
-        try:
-            self._root
-        except AttributeError:
-            # sentinel node for the doubly linked list
-            self._root = root = _Link()
-            root.PREV = root.NEXT = root
-            self.__map = {}
-        self.update(*args, **kwds)
-
-    def clear(self):
-        "od.clear() -> None.  Remove all items from od."
-        root = self._root
-        root.PREV = root.NEXT = root
-        self.__map.clear()
-        dict.clear(self)
-
-    def __setitem__(self, key, value):
-        "od.__setitem__(i, y) <==> od[i]=y"
-        # Setting a new item creates a new link which goes at the end of the
-        # linked list, and the inherited dictionary is updated with the new
-        # key/value pair.
-        if key not in self:
-            self.__map[key] = link = _Link()
-            root = self._root
-            last = root.PREV
-            link.PREV, link.NEXT, link.key = last, root, key
-            last.NEXT = root.PREV = weakref.proxy(link)
-        dict.__setitem__(self, key, value)
-
-    def __delitem__(self, key):
-        """od.__delitem__(y) <==> del od[y]"""
-        # Deleting an existing item uses self.__map to find the
-        # link which is then removed by updating the links in the
-        # predecessor and successor nodes.
-        dict.__delitem__(self, key)
-        link = self.__map.pop(key)
-        link.PREV.NEXT = link.NEXT
-        link.NEXT.PREV = link.PREV
-
-    def __iter__(self):
-        """od.__iter__() <==> iter(od)"""
-        # Traverse the linked list in order.
-        root = self._root
-        curr = root.NEXT
-        while curr is not root:
-            yield curr.key
-            curr = curr.NEXT
-
-    def __reversed__(self):
-        """od.__reversed__() <==> reversed(od)"""
-        # Traverse the linked list in reverse order.
-        root = self._root
-        curr = root.PREV
-        while curr is not root:
-            yield curr.key
-            curr = curr.PREV
-
-    def __reduce__(self):
-        """Return state information for pickling"""
-        items = [[k, self[k]] for k in self]
-        tmp = self.__map, self._root
-        del(self.__map, self._root)
-        inst_dict = vars(self).copy()
-        self.__map, self._root = tmp
-        if inst_dict:
-            return (self.__class__, (items,), inst_dict)
-        return self.__class__, (items,)
-
-    def setdefault(self, key, default=None):
-        try:
-            return self[key]
-        except KeyError:
-            self[key] = default
-        return default
-
-    def update(self, other=(), **kwds):
-        if isinstance(other, dict):
-            for key in other:
-                self[key] = other[key]
-        elif hasattr(other, "keys"):
-            for key in other.keys():
-                self[key] = other[key]
-        else:
-            for key, value in other:
-                self[key] = value
-        for key, value in kwds.items():
-            self[key] = value
-
-    def pop(self, key, default=__marker):
-        try:
-            value = self[key]
-        except KeyError:
-            if default is self.__marker:
-                raise
-            return default
-        else:
-            del self[key]
-            return value
-
-    def values(self):
-        return [self[key] for key in self]
-
-    def items(self):
-        return [(key, self[key]) for key in self]
-
-    def itervalues(self):
-        for key in self:
-            yield self[key]
-
-    def iteritems(self):
-        for key in self:
-            yield (key, self[key])
-
-    def iterkeys(self):
-        return iter(self)
-
-    def keys(self):
-        return list(self)
-
-    def popitem(self, last=True):
-        """od.popitem() -> (k, v)
-
-        Return and remove a (key, value) pair.
-        Pairs are returned in LIFO order if last is true or FIFO
-        order if false.
-
-        """
-        if not self:
-            raise KeyError('dictionary is empty')
-        key = (last and reversed(self) or iter(self)).next()
-        value = self.pop(key)
-        return key, value
-
-    def __repr__(self):
-        "od.__repr__() <==> repr(od)"
-        if not self:
-            return '%s()' % (self.__class__.__name__,)
-        return '%s(%r)' % (self.__class__.__name__, self.items())
-
-    def copy(self):
-        "od.copy() -> a shallow copy of od"
-        return self.__class__(self)
-
-    @classmethod
-    def fromkeys(cls, iterable, value=None):
-        """OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
-        and values equal to v (which defaults to None)."""
-        d = cls()
-        for key in iterable:
-            d[key] = value
-        return d
-
-    def __eq__(self, other):
-        """od.__eq__(y) <==> od==y.  Comparison to another OD is
-        order-sensitive while comparison to a regular mapping
-        is order-insensitive."""
-        if isinstance(other, OrderedDict):
-            return len(self) == len(other) and \
-                   all(_imap(_eq, self.iteritems(), other.iteritems()))
-        return dict.__eq__(self, other)
-
-    def __ne__(self, other):
-        return not (self == other)
-
 try:
     from collections import OrderedDict
 except ImportError:
-    OrderedDict = CompatOrderedDict  # noqa
+    from ordereddict import OrderedDict
 
 ############## logging.LoggerAdapter ########################################
 import logging

+ 1 - 0
requirements/py25.txt

@@ -1,2 +1,3 @@
 multiprocessing==2.6.2.1
 importlib
+ordereddict

+ 1 - 0
requirements/py26.txt

@@ -1 +1,2 @@
 importlib
+ordereddict

+ 2 - 2
setup.py

@@ -55,10 +55,10 @@ install_requires.extend([
 py_version = sys.version_info
 is_jython = sys.platform.startswith("java")
 is_pypy = hasattr(sys, "pypy_version_info")
+if sys.version_info < (2, 7):
+    install_requires.append("ordereddict") # Replacement for the ordered dict
 if sys.version_info < (2, 6) and not (is_jython or is_pypy):
     install_requires.append("multiprocessing")
-if sys.version_info < (2, 5):
-    install_requires.append("uuid")
 
 if is_jython:
     install_requires.append("threadpool")