|
@@ -49,13 +49,13 @@ class TaskPublisher(Publisher):
|
|
|
self.declare()
|
|
|
|
|
|
def declare(self):
|
|
|
- if self.exchange not in _exchanges_declared:
|
|
|
+ if self.exchange and self.exchange not in _exchanges_declared:
|
|
|
super(TaskPublisher, self).declare()
|
|
|
_exchanges_declared.add(self.exchange)
|
|
|
|
|
|
def delay_task(self, task_name, task_args=None, task_kwargs=None,
|
|
|
countdown=None, eta=None, task_id=None, taskset_id=None,
|
|
|
- expires=None, **kwargs):
|
|
|
+ exchange=None, exchange_type=None, expires=None, **kwargs):
|
|
|
"""Delay task for execution by the celery nodes."""
|
|
|
|
|
|
task_id = task_id or gen_unique_id()
|
|
@@ -88,7 +88,16 @@ class TaskPublisher(Publisher):
|
|
|
if taskset_id:
|
|
|
message_data["taskset"] = taskset_id
|
|
|
|
|
|
- self.send(message_data, **extract_msg_options(kwargs))
|
|
|
+ # custom exchange passed, need to declare it
|
|
|
+ if exchange and exchange not in _exchanges_declared:
|
|
|
+ exchange_type = exchange_type or self.exchange_type
|
|
|
+ self.backend.exchange_declare(exchange=exchange,
|
|
|
+ type=exchange_type,
|
|
|
+ durable=self.durable,
|
|
|
+ auto_delete=self.auto_delete)
|
|
|
+ self.send(message_data, exchange=exchange,
|
|
|
+ **extract_msg_options(kwargs))
|
|
|
+
|
|
|
signals.task_sent.send(sender=task_name, **message_data)
|
|
|
|
|
|
return task_id
|