|
@@ -9,6 +9,8 @@
|
|
"""
|
|
"""
|
|
from __future__ import absolute_import
|
|
from __future__ import absolute_import
|
|
|
|
|
|
|
|
+from kombu import Queue
|
|
|
|
+
|
|
from celery.exceptions import QueueNotFound
|
|
from celery.exceptions import QueueNotFound
|
|
from celery.five import string_t
|
|
from celery.five import string_t
|
|
from celery.utils import lpmerge
|
|
from celery.utils import lpmerge
|
|
@@ -63,13 +65,14 @@ class Router(object):
|
|
queue = route.pop('queue', None)
|
|
queue = route.pop('queue', None)
|
|
|
|
|
|
if queue:
|
|
if queue:
|
|
- try:
|
|
|
|
- Q = self.queues[queue] # noqa
|
|
|
|
- except KeyError:
|
|
|
|
- raise QueueNotFound(
|
|
|
|
- 'Queue {0!r} missing from CELERY_QUEUES'.format(queue))
|
|
|
|
- # needs to be declared by publisher
|
|
|
|
- route['queue'] = Q
|
|
|
|
|
|
+ if isinstance(queue, Queue):
|
|
|
|
+ route['queue'] = queue
|
|
|
|
+ else:
|
|
|
|
+ try:
|
|
|
|
+ route['queue'] = self.queues[queue]
|
|
|
|
+ except KeyError:
|
|
|
|
+ raise QueueNotFound(
|
|
|
|
+ 'Queue {0!r} missing from CELERY_QUEUES'.format(queue))
|
|
return route
|
|
return route
|
|
|
|
|
|
def lookup_route(self, task, args=None, kwargs=None):
|
|
def lookup_route(self, task, args=None, kwargs=None):
|