|
@@ -1,7 +1,7 @@
|
|
|
from celery.tests.utils import unittest
|
|
|
|
|
|
from celery import routes
|
|
|
-from celery.app import app_or_default
|
|
|
+from celery import current_app
|
|
|
from celery.utils import maybe_promise
|
|
|
from celery.utils.functional import wraps
|
|
|
from celery.exceptions import QueueNotFound
|
|
@@ -18,7 +18,7 @@ def with_queues(**queues):
|
|
|
def patch_fun(fun):
|
|
|
@wraps(fun)
|
|
|
def __inner(*args, **kwargs):
|
|
|
- app = app_or_default()
|
|
|
+ app = current_app
|
|
|
prev_queues = app.conf.CELERY_QUEUES
|
|
|
app.conf.CELERY_QUEUES = queues
|
|
|
try:
|
|
@@ -35,13 +35,16 @@ a_queue = {"exchange": "fooexchange",
|
|
|
b_queue = {"exchange": "barexchange",
|
|
|
"exchange_type": "topic",
|
|
|
"binding_key": "b.b.#"}
|
|
|
+d_queue = {"exchange": current_app.conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
+ "exchange_type": current_app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
|
|
|
+ "routing_key": current_app.conf.CELERY_DEFAULT_ROUTING_KEY}
|
|
|
|
|
|
|
|
|
class test_MapRoute(unittest.TestCase):
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_route_for_task_expanded_route(self):
|
|
|
- expand = E(app_or_default().conf.CELERY_QUEUES)
|
|
|
+ expand = E(current_app.conf.CELERY_QUEUES)
|
|
|
route = routes.MapRoute({"celery.ping": {"queue": "foo"}})
|
|
|
self.assertDictContainsSubset(a_queue,
|
|
|
expand(route.route_for_task("celery.ping")))
|
|
@@ -49,14 +52,14 @@ class test_MapRoute(unittest.TestCase):
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_route_for_task(self):
|
|
|
- expand = E(app_or_default().conf.CELERY_QUEUES)
|
|
|
+ expand = E(current_app.conf.CELERY_QUEUES)
|
|
|
route = routes.MapRoute({"celery.ping": b_queue})
|
|
|
self.assertDictContainsSubset(b_queue,
|
|
|
expand(route.route_for_task("celery.ping")))
|
|
|
self.assertIsNone(route.route_for_task("celery.awesome"))
|
|
|
|
|
|
def test_expand_route_not_found(self):
|
|
|
- expand = E(app_or_default().conf.CELERY_QUEUES)
|
|
|
+ expand = E(current_app.conf.CELERY_QUEUES)
|
|
|
route = routes.MapRoute({"a": {"queue": "x"}})
|
|
|
self.assertRaises(QueueNotFound, expand, route.route_for_task("a"))
|
|
|
|
|
@@ -71,7 +74,7 @@ class test_lookup_route(unittest.TestCase):
|
|
|
def test_lookup_takes_first(self):
|
|
|
R = routes.prepare(({"celery.ping": {"queue": "bar"}},
|
|
|
{"celery.ping": {"queue": "foo"}}))
|
|
|
- router = routes.Router(R, app_or_default().conf.CELERY_QUEUES)
|
|
|
+ router = routes.Router(R, current_app.conf.CELERY_QUEUES)
|
|
|
self.assertDictContainsSubset(b_queue,
|
|
|
router.route({}, "celery.ping",
|
|
|
args=[1, 2], kwargs={}))
|
|
@@ -79,7 +82,7 @@ class test_lookup_route(unittest.TestCase):
|
|
|
@with_queues()
|
|
|
def test_expands_queue_in_options(self):
|
|
|
R = routes.prepare(())
|
|
|
- router = routes.Router(R, app_or_default().conf.CELERY_QUEUES,
|
|
|
+ router = routes.Router(R, current_app.conf.CELERY_QUEUES,
|
|
|
create_missing=True)
|
|
|
# apply_async forwards all arguments, even exchange=None etc,
|
|
|
# so need to make sure it's merged correctly.
|
|
@@ -97,19 +100,21 @@ class test_lookup_route(unittest.TestCase):
|
|
|
|
|
|
@with_queues(foo=a_queue, bar=b_queue)
|
|
|
def test_expand_destaintion_string(self):
|
|
|
- x = routes.Router({}, app_or_default().conf.CELERY_QUEUES)
|
|
|
+ x = routes.Router({}, current_app.conf.CELERY_QUEUES)
|
|
|
dest = x.expand_destination("foo")
|
|
|
self.assertEqual(dest["exchange"], "fooexchange")
|
|
|
|
|
|
- @with_queues(foo=a_queue, bar=b_queue)
|
|
|
+ @with_queues(foo=a_queue, bar=b_queue, **{
|
|
|
+ current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
|
|
|
def test_lookup_paths_traversed(self):
|
|
|
R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
|
|
|
{"celery.ping": {"queue": "foo"}}))
|
|
|
- router = routes.Router(R, app_or_default().conf.CELERY_QUEUES)
|
|
|
+ router = routes.Router(R, current_app.conf.CELERY_QUEUES)
|
|
|
self.assertDictContainsSubset(a_queue,
|
|
|
router.route({}, "celery.ping",
|
|
|
args=[1, 2], kwargs={}))
|
|
|
- self.assertEqual(router.route({}, "celery.poza"), {})
|
|
|
+ self.assertEqual(router.route({}, "celery.poza"),
|
|
|
+ dict(d_queue, queue=current_app.conf.CELERY_DEFAULT_QUEUE))
|
|
|
|
|
|
|
|
|
class test_prepare(unittest.TestCase):
|