|
@@ -6,12 +6,20 @@ Sending and Receiving Messages
|
|
|
from carrot.messaging import Publisher, Consumer
|
|
|
from celery import conf
|
|
|
from celery.utils import gen_unique_id
|
|
|
+from celery.utils import mitemgetter
|
|
|
|
|
|
try:
|
|
|
import cPickle as pickle
|
|
|
except ImportError:
|
|
|
import pickle
|
|
|
|
|
|
+MSG_OPTIONS = ("mandatory", "priority",
|
|
|
+ "immediate", "routing_key")
|
|
|
+
|
|
|
+get_msg_options = mitemgetter(MSG_OPTIONS)
|
|
|
+
|
|
|
+extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
|
|
|
+
|
|
|
|
|
|
class TaskPublisher(Publisher):
|
|
|
"""The AMQP Task Publisher class."""
|
|
@@ -33,37 +41,28 @@ class TaskPublisher(Publisher):
|
|
|
task_args=task_args, task_kwargs=task_kwargs,
|
|
|
**kwargs)
|
|
|
|
|
|
- def requeue_task(self, task_name, task_id, task_args, task_kwargs,
|
|
|
- part_of_set=None, **kwargs):
|
|
|
- """Requeue a failed task."""
|
|
|
- return self._delay_task(task_name=task_name, part_of_set=part_of_set,
|
|
|
- task_id=task_id, task_args=task_args,
|
|
|
- task_kwargs=task_kwargs, **kwargs)
|
|
|
+ def retry_task(self, task_name, task_id, delivery_info, **kwargs):
|
|
|
+ kwargs["routing_key"] = delivery_info.get("routing_key")
|
|
|
+ self._delay_task(task_name, task_id, **kwargs)
|
|
|
|
|
|
def _delay_task(self, task_name, task_id=None, part_of_set=None,
|
|
|
task_args=None, task_kwargs=None, **kwargs):
|
|
|
"""INTERNAL"""
|
|
|
- eta = kwargs.get("eta")
|
|
|
- priority = kwargs.get("priority")
|
|
|
- immediate = kwargs.get("immediate")
|
|
|
- mandatory = kwargs.get("mandatory")
|
|
|
- routing_key = kwargs.get("routing_key")
|
|
|
-
|
|
|
- task_args = task_args or []
|
|
|
- task_kwargs = task_kwargs or {}
|
|
|
+
|
|
|
task_id = task_id or gen_unique_id()
|
|
|
+
|
|
|
message_data = {
|
|
|
- "id": task_id,
|
|
|
"task": task_name,
|
|
|
- "args": task_args,
|
|
|
- "kwargs": task_kwargs,
|
|
|
- "eta": eta,
|
|
|
+ "id": task_id,
|
|
|
+ "args": task_args or [],
|
|
|
+ "kwargs": task_kwargs or {},
|
|
|
+ "retries": kwargs.get("retries", 0),
|
|
|
+ "eta": kwargs.get("eta"),
|
|
|
}
|
|
|
if part_of_set:
|
|
|
message_data["taskset"] = part_of_set
|
|
|
- self.send(message_data,
|
|
|
- routing_key=routing_key, priority=priority,
|
|
|
- immediate=immediate, mandatory=mandatory)
|
|
|
+
|
|
|
+ self.send(message_data, **extract_msg_options(kwargs))
|
|
|
return task_id
|
|
|
|
|
|
|