|
@@ -182,8 +182,16 @@ class TaskProducer(Producer):
|
|
|
queue=None, now=None, retries=0, chord=None, callbacks=None,
|
|
|
errbacks=None, mandatory=None, priority=None, immediate=None,
|
|
|
routing_key=None, serializer=None, delivery_mode=None,
|
|
|
- compression=None, **kwargs):
|
|
|
+ compression=None, declare=None, **kwargs):
|
|
|
"""Send task message."""
|
|
|
+
|
|
|
+ declare = declare or []
|
|
|
+ if queue is not None:
|
|
|
+ if isinstance(queue, basestring):
|
|
|
+ queue = self.queues[queue]
|
|
|
+ exchange = exchange or queue.exchange.name
|
|
|
+ routing_key = routing_key or queue.routing_key
|
|
|
+
|
|
|
# merge default and custom policy
|
|
|
retry = self.retry if retry is None else retry
|
|
|
_rp = (dict(self.retry_policy, **retry_policy) if retry_policy
|
|
@@ -225,7 +233,7 @@ class TaskProducer(Producer):
|
|
|
serializer=serializer or self.serializer,
|
|
|
compression=compression or self.compression,
|
|
|
retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
|
|
|
- priority=priority, declare=[self.queues[queue]] if queue else [],
|
|
|
+ priority=priority, declare=declare)
|
|
|
**kwargs)
|
|
|
|
|
|
signals.task_sent.send(sender=task_name, **body)
|