|
@@ -20,25 +20,26 @@ class NoProcessConsumer(Consumer):
|
|
|
class TaskPublisher(Publisher):
|
|
|
"""The AMQP Task Publisher class."""
|
|
|
exchange = conf.AMQP_EXCHANGE
|
|
|
- routing_key = conf.AMQP_ROUTING_KEY
|
|
|
+ routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
|
|
|
|
|
|
def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
|
|
|
"""Delay task for execution by the celery nodes."""
|
|
|
- return self._delay_task(task_name=task_name, args=task_args,
|
|
|
- kwargs=task_kwargs, **kwargs)
|
|
|
+ return self._delay_task(task_name=task_name, task_args=task_args,
|
|
|
+ task_kwargs=task_kwargs, **kwargs)
|
|
|
|
|
|
def delay_task_in_set(self, task_name, taskset_id, task_args,
|
|
|
task_kwargs, **kwargs):
|
|
|
"""Delay a task which part of a task set."""
|
|
|
return self._delay_task(task_name=task_name, part_of_set=taskset_id,
|
|
|
- args=task_args, kwargs=task_kwargs, **kwargs)
|
|
|
+ task_args=task_args, task_kwargs=task_kwargs,
|
|
|
+ **kwargs)
|
|
|
|
|
|
def requeue_task(self, task_name, task_id, task_args, task_kwargs,
|
|
|
part_of_set=None, **kwargs):
|
|
|
"""Requeue a failed task."""
|
|
|
return self._delay_task(task_name=task_name, part_of_set=part_of_set,
|
|
|
- task_id=task_id, args=task_args,
|
|
|
- kwargs=task_kwargs, **kwargs)
|
|
|
+ task_id=task_id, task_args=task_args,
|
|
|
+ task_kwargs=task_kwargs, **kwargs)
|
|
|
|
|
|
def _delay_task(self, task_name, task_id=None, part_of_set=None,
|
|
|
task_args=None, task_kwargs=None, **kwargs):
|
|
@@ -58,7 +59,9 @@ class TaskPublisher(Publisher):
|
|
|
}
|
|
|
if part_of_set:
|
|
|
message_data["taskset"] = part_of_set
|
|
|
- self.send(message_data)
|
|
|
+ self.send(message_data,
|
|
|
+ routing_key=routing_key, priority=priority,
|
|
|
+ immediate=immediate, mandatory=mandatory)
|
|
|
return task_id
|
|
|
|
|
|
|
|
@@ -66,5 +69,5 @@ class TaskConsumer(NoProcessConsumer):
|
|
|
"""The AMQP Task Consumer class."""
|
|
|
queue = conf.AMQP_CONSUMER_QUEUE
|
|
|
exchange = conf.AMQP_EXCHANGE
|
|
|
- routing_key = conf.AMQP_ROUTING_KEY
|
|
|
+ routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
|
|
|
exchange_type = conf.AMQP_EXCHANGE_TYPE
|