messaging.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer
  5. from celery import conf
  6. import uuid
  7. try:
  8. import cPickle as pickle
  9. except ImportError:
  10. import pickle
  11. class TaskPublisher(Publisher):
  12. """The AMQP Task Publisher class."""
  13. exchange = conf.AMQP_EXCHANGE
  14. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  15. encoder = pickle.dumps
  16. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  17. """Delay task for execution by the celery nodes."""
  18. return self._delay_task(task_name=task_name, task_args=task_args,
  19. task_kwargs=task_kwargs, **kwargs)
  20. def delay_task_in_set(self, task_name, taskset_id, task_args,
  21. task_kwargs, **kwargs):
  22. """Delay a task which part of a task set."""
  23. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  24. task_args=task_args, task_kwargs=task_kwargs,
  25. **kwargs)
  26. def requeue_task(self, task_name, task_id, task_args, task_kwargs,
  27. part_of_set=None, **kwargs):
  28. """Requeue a failed task."""
  29. return self._delay_task(task_name=task_name, part_of_set=part_of_set,
  30. task_id=task_id, task_args=task_args,
  31. task_kwargs=task_kwargs, **kwargs)
  32. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  33. task_args=None, task_kwargs=None, **kwargs):
  34. """INTERNAL"""
  35. priority = kwargs.get("priority")
  36. immediate = kwargs.get("immediate")
  37. mandatory = kwargs.get("mandatory")
  38. routing_key = kwargs.get("routing_key")
  39. task_args = task_args or []
  40. task_kwargs = task_kwargs or {}
  41. task_id = task_id or str(uuid.uuid4())
  42. message_data = {
  43. "id": task_id,
  44. "task": task_name,
  45. "args": task_args,
  46. "kwargs": task_kwargs,
  47. }
  48. if part_of_set:
  49. message_data["taskset"] = part_of_set
  50. self.send(message_data,
  51. routing_key=routing_key, priority=priority,
  52. immediate=immediate, mandatory=mandatory)
  53. return task_id
  54. class TaskConsumer(Consumer):
  55. """The AMQP Task Consumer class."""
  56. queue = conf.AMQP_CONSUMER_QUEUE
  57. exchange = conf.AMQP_EXCHANGE
  58. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  59. exchange_type = conf.AMQP_EXCHANGE_TYPE
  60. auto_ack = True
  61. decoder = pickle.loads