|
@@ -22,6 +22,8 @@ get_msg_options = mitemgetter(*MSG_OPTIONS)
|
|
|
extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
|
|
|
default_queue = conf.routing_table[conf.DEFAULT_QUEUE]
|
|
|
|
|
|
+_queues_declared = False
|
|
|
+
|
|
|
|
|
|
class TaskPublisher(Publisher):
|
|
|
"""Publish tasks."""
|
|
@@ -30,6 +32,16 @@ class TaskPublisher(Publisher):
|
|
|
routing_key = conf.DEFAULT_ROUTING_KEY
|
|
|
serializer = conf.TASK_SERIALIZER
|
|
|
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ super(TaskPublisher, self).__init__(*args, **kwargs)
|
|
|
+
|
|
|
+ # Make sure all queues are declared.
|
|
|
+ global _queues_declared
|
|
|
+ if not _queues_declared:
|
|
|
+ consumers = get_consumer_set(self.connection)
|
|
|
+ consumers.close()
|
|
|
+ _queues_declared = True
|
|
|
+
|
|
|
def delay_task(self, task_name, task_args=None, task_kwargs=None,
|
|
|
task_id=None, taskset_id=None, **kwargs):
|
|
|
"""Delay task for execution by the celery nodes."""
|