Kaynağa Gözat

Routers: Load router classes lazily.

This is done because if the module the router class originates in
is loading the django/celery environment, then loading it in celery.conf will
cause a circular dependency.
Ask Solem 15 yıl önce
ebeveyn
işleme
4bd65cc317

+ 2 - 2
celery/routes.py

@@ -1,5 +1,5 @@
 from celery.exceptions import QueueNotFound
-from celery.utils import instantiate, firstmethod
+from celery.utils import instantiate, firstmethod, mpromise
 
 _first_route = firstmethod("route_for_task")
 
@@ -77,7 +77,7 @@ def prepare(routes):
         if isinstance(route, dict):
             return MapRoute(route)
         if isinstance(route, basestring):
-            return instantiate(route)
+            return mpromise(instantiate, route)
         return route
 
     if not hasattr(routes, "__iter__"):

+ 75 - 0
celery/tests/test_utils.py

@@ -1,12 +1,19 @@
+import pickle
 import sys
 import socket
 import unittest2 as unittest
 
 from celery import utils
+from celery.utils import promise, mpromise, maybe_promise
 
 from celery.tests.utils import sleepdeprived, execute_context
 from celery.tests.utils import mask_modules
 
+
+def double(x):
+    return x * 2
+
+
 class test_chunks(unittest.TestCase):
 
     def test_chunks(self):
@@ -79,6 +86,22 @@ class test_utils(unittest.TestCase):
     def test_firstmethod_AttributeError(self):
         self.assertIsNone(utils.firstmethod("foo")([object()]))
 
+    def test_firstmethod_promises(self):
+
+        class A(object):
+
+            def __init__(self, value=None):
+                self.value = value
+
+            def m(self):
+                return self.value
+
+        self.assertEqual("four", utils.firstmethod("m")([
+            A(), A(), A(), A("four"), A("five")]))
+        self.assertEqual("four", utils.firstmethod("m")([
+            A(), A(), A(), promise(lambda: A("four")), A("five")]))
+
+
     def test_first(self):
         iterations = [0]
 
@@ -140,3 +163,55 @@ class test_retry_over_time(unittest.TestCase):
 
         self.assertRaises(socket.error, utils.retry_over_time,
                         _fun, (socket.error, ), args=[32, 32], max_retries=1)
+
+
+class test_promise(unittest.TestCase):
+
+    def test__str__(self):
+        self.assertEqual(str(promise(lambda: "the quick brown fox")),
+                "the quick brown fox")
+
+    def test__repr__(self):
+        self.assertEqual(repr(promise(lambda: "fi fa fo")),
+                "'fi fa fo'")
+
+    def test_evaluate(self):
+        self.assertEqual(promise(lambda: 2 + 2)(), 4)
+        self.assertEqual(promise(lambda x: x * 4, 2), 8)
+        self.assertEqual(promise(lambda x: x * 8, 2)(), 16)
+
+    def test_cmp(self):
+        self.assertEqual(promise(lambda: 10), promise(lambda: 10))
+        self.assertNotEqual(promise(lambda: 10), promise(lambda: 20))
+
+    def test__reduce__(self):
+        x = promise(double, 4)
+        y = pickle.loads(pickle.dumps(x))
+        self.assertEqual(x(), y())
+
+    def test__deepcopy__(self):
+        from copy import deepcopy
+        x = promise(double, 4)
+        y = deepcopy(x)
+        self.assertEqual(x._fun, y._fun)
+        self.assertEqual(x._args, y._args)
+        self.assertEqual(x(), y())
+
+
+class test_mpromise(unittest.TestCase):
+
+    def test_is_memoized(self):
+
+        it = iter(xrange(20, 30))
+        p = mpromise(it.next)
+        self.assertEqual(p(), 20)
+        self.assertTrue(p.evaluated)
+        self.assertEqual(p(), 20)
+        self.assertEqual(repr(p), "20")
+
+
+class test_maybe_promise(unittest.TestCase):
+
+    def test_evaluates(self):
+        self.assertEqual(maybe_promise(promise(lambda: 10)), 10)
+        self.assertEqual(maybe_promise(20), 20)

+ 1 - 1
celery/tests/test_worker_controllers.py

@@ -5,7 +5,7 @@ from Queue import Queue
 from celery.utils import gen_unique_id
 from celery.worker.controllers import Mediator
 from celery.worker.controllers import BackgroundThread, ScheduleController
-from celery.worker.revoke import revoked as revoked_tasks
+from celery.worker.state import revoked as revoked_tasks
 
 
 class MockTask(object):

+ 78 - 2
celery/utils/__init__.py

@@ -18,6 +18,76 @@ from celery.utils.timeutils import timedelta_seconds # was here before
 from celery.utils.functional import curry
 
 
+class promise(object):
+    """A promise.
+
+    Evaluated when called or if the :meth:`evaluate` method is called.
+    The function is evaluated on every access, so the value is not
+    memoized (see :class:`mpromise`).
+
+    Overloaded operations that will evaluate the promise:
+        :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`.
+
+    """
+
+    def __init__(self, fun, *args, **kwargs):
+        self._fun = fun
+        self._args = args
+        self._kwargs = kwargs
+
+    def __call__(self):
+        return self.evaluate()
+
+    def evaluate(self):
+        return self._fun(*self._args, **self._kwargs)
+
+    def __str__(self):
+        return str(self())
+
+    def __repr__(self):
+        return repr(self())
+
+    def __cmp__(self, rhs):
+        if isinstance(rhs, self.__class__):
+            return -cmp(rhs, self())
+        return cmp(self(), rhs)
+
+    def __deepcopy__(self, memo):
+        memo[id(self)] = self
+        return self
+
+    def __reduce__(self):
+        return (self.__class__, (self._fun, ), {"_args": self._args,
+                                                "_kwargs": self._kwargs})
+
+
+class mpromise(promise):
+    """Memoized promise.
+
+    The function is only evaluated once, every subsequent access
+    will return the same value.
+
+    .. attribute:: evaluated
+
+        Set to to :const:`True` after the promise has been evaluated.
+
+    """
+    evaluated = False
+    _value = None
+
+    def evaluate(self):
+        if not self.evaluated:
+            self._value = super(mpromise, self).evaluate()
+            self.evaluated = True
+        return self._value
+
+
+def maybe_promise(value):
+    """Evaluates if the value is a promise."""
+    if isinstance(value, promise):
+        return value.evaluate()
+    return value
+
 def noop(*args, **kwargs):
     """No operation.
 
@@ -48,12 +118,16 @@ def first(predicate, iterable):
 
 def firstmethod(method):
     """Returns a functions that with a list of instances,
-    finds the first instance that returns a value for the given method."""
+    finds the first instance that returns a value for the given method.
+
+    The list can also contain promises (:class:`promise`.)
+
+    """
 
     def _matcher(seq, *args, **kwargs):
         for cls in seq:
             try:
-                answer = getattr(cls, method)(*args, **kwargs)
+                answer = getattr(maybe_promise(cls), method)(*args, **kwargs)
                 if answer is not None:
                     return answer
             except AttributeError:
@@ -267,3 +341,5 @@ def instantiate(name, *args, **kwargs):
 
     """
     return get_cls_by_name(name)(*args, **kwargs)
+
+