messaging.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer
  5. from celery import conf
  6. import uuid
  7. try:
  8. import cPickle as pickle
  9. except ImportError:
  10. import pickle
  11. class TaskPublisher(Publisher):
  12. """The AMQP Task Publisher class."""
  13. exchange = conf.AMQP_EXCHANGE
  14. exchange_type = conf.AMQP_EXCHANGE_TYPE
  15. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  16. serializer = "pickle"
  17. encoder = pickle.dumps
  18. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  19. """Delay task for execution by the celery nodes."""
  20. return self._delay_task(task_name=task_name, task_args=task_args,
  21. task_kwargs=task_kwargs, **kwargs)
  22. def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
  23. **kwargs):
  24. """Delay a task which part of a task set."""
  25. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  26. task_args=task_args, task_kwargs=task_kwargs,
  27. **kwargs)
  28. def requeue_task(self, task_name, task_id, task_args, task_kwargs,
  29. part_of_set=None, **kwargs):
  30. """Requeue a failed task."""
  31. return self._delay_task(task_name=task_name, part_of_set=part_of_set,
  32. task_id=task_id, task_args=task_args,
  33. task_kwargs=task_kwargs, **kwargs)
  34. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  35. task_args=None, task_kwargs=None, **kwargs):
  36. """INTERNAL"""
  37. eta = kwargs.get("eta")
  38. priority = kwargs.get("priority")
  39. immediate = kwargs.get("immediate")
  40. mandatory = kwargs.get("mandatory")
  41. routing_key = kwargs.get("routing_key")
  42. task_args = task_args or []
  43. task_kwargs = task_kwargs or {}
  44. task_id = task_id or str(uuid.uuid4())
  45. message_data = {
  46. "id": task_id,
  47. "task": task_name,
  48. "args": task_args,
  49. "kwargs": task_kwargs,
  50. "eta": eta,
  51. }
  52. if part_of_set:
  53. message_data["taskset"] = part_of_set
  54. self.send(message_data,
  55. routing_key=routing_key, priority=priority,
  56. immediate=immediate, mandatory=mandatory)
  57. return task_id
  58. class TaskConsumer(Consumer):
  59. """The AMQP Task Consumer class."""
  60. queue = conf.AMQP_CONSUMER_QUEUE
  61. exchange = conf.AMQP_EXCHANGE
  62. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  63. exchange_type = conf.AMQP_EXCHANGE_TYPE
  64. decoder = pickle.loads
  65. auto_ack = False
  66. no_ack = False
  67. class StatsPublisher(Publisher):
  68. exchange = "celerygraph"
  69. routing_key = "stats"
  70. encoder = pickle.dumps
  71. class StatsConsumer(Consumer):
  72. queue = "celerygraph"
  73. exchange = "celerygraph"
  74. routing_key = "stats"
  75. exchange_type = "direct"
  76. decoder = pickle.loads
  77. no_ack=True
  78. def receive(self, message_data, message):
  79. pass