messaging.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer
  5. from celery import conf
  6. import uuid
  7. try:
  8. import cPickle as pickle
  9. except ImportError:
  10. import pickle
  11. class TaskPublisher(Publisher):
  12. """The AMQP Task Publisher class."""
  13. exchange = conf.AMQP_EXCHANGE
  14. exchange_type = conf.AMQP_EXCHANGE_TYPE
  15. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  16. encoder = pickle.dumps
  17. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  18. """Delay task for execution by the celery nodes."""
  19. return self._delay_task(task_name=task_name, task_args=task_args,
  20. task_kwargs=task_kwargs, **kwargs)
  21. def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
  22. **kwargs):
  23. """Delay a task which part of a task set."""
  24. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  25. task_args=task_args, task_kwargs=task_kwargs,
  26. **kwargs)
  27. def requeue_task(self, task_name, task_id, task_args, task_kwargs,
  28. part_of_set=None, **kwargs):
  29. """Requeue a failed task."""
  30. return self._delay_task(task_name=task_name, part_of_set=part_of_set,
  31. task_id=task_id, task_args=task_args,
  32. task_kwargs=task_kwargs, **kwargs)
  33. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  34. task_args=None, task_kwargs=None, **kwargs):
  35. """INTERNAL"""
  36. eta = kwargs.get("eta")
  37. priority = kwargs.get("priority")
  38. immediate = kwargs.get("immediate")
  39. mandatory = kwargs.get("mandatory")
  40. routing_key = kwargs.get("routing_key")
  41. task_args = task_args or []
  42. task_kwargs = task_kwargs or {}
  43. task_id = task_id or str(uuid.uuid4())
  44. message_data = {
  45. "id": task_id,
  46. "task": task_name,
  47. "args": task_args,
  48. "kwargs": task_kwargs,
  49. "eta": eta,
  50. }
  51. if part_of_set:
  52. message_data["taskset"] = part_of_set
  53. self.send(message_data,
  54. routing_key=routing_key, priority=priority,
  55. immediate=immediate, mandatory=mandatory)
  56. return task_id
  57. class TaskConsumer(Consumer):
  58. """The AMQP Task Consumer class."""
  59. queue = conf.AMQP_CONSUMER_QUEUE
  60. exchange = conf.AMQP_EXCHANGE
  61. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  62. exchange_type = conf.AMQP_EXCHANGE_TYPE
  63. decoder = pickle.loads
  64. auto_ack = False
  65. no_ack = False
  66. class StatsPublisher(Publisher):
  67. exchange = "celerygraph"
  68. routing_key = "stats"
  69. encoder = pickle.dumps
  70. class StatsConsumer(Consumer):
  71. queue = "celerygraph"
  72. exchange = "celerygraph"
  73. routing_key = "stats"
  74. exchange_type = "direct"
  75. decoder = pickle.loads
  76. no_ack=True
  77. def receive(self, message_data, message):
  78. pass