messaging.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer
  5. from celery import conf
  6. from celery.utils import gen_unique_id
  7. from celery.utils import mitemgetter
  8. from celery.serialization import pickle
  9. MSG_OPTIONS = ("mandatory", "priority",
  10. "immediate", "routing_key")
  11. get_msg_options = mitemgetter(*MSG_OPTIONS)
  12. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  13. class TaskPublisher(Publisher):
  14. """The AMQP Task Publisher class."""
  15. exchange = conf.AMQP_EXCHANGE
  16. exchange_type = conf.AMQP_EXCHANGE_TYPE
  17. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  18. serializer = "pickle"
  19. encoder = pickle.dumps
  20. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  21. """Delay task for execution by the celery nodes."""
  22. return self._delay_task(task_name=task_name, task_args=task_args,
  23. task_kwargs=task_kwargs, **kwargs)
  24. def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
  25. **kwargs):
  26. """Delay a task which part of a task set."""
  27. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  28. task_args=task_args, task_kwargs=task_kwargs,
  29. **kwargs)
  30. def retry_task(self, task_name, task_id, delivery_info, **kwargs):
  31. kwargs["routing_key"] = delivery_info.get("routing_key")
  32. kwargs["retries"] = kwargs.get("retries", 0) + 1
  33. self._delay_task(task_name, task_id, **kwargs)
  34. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  35. task_args=None, task_kwargs=None, **kwargs):
  36. """INTERNAL"""
  37. task_id = task_id or gen_unique_id()
  38. message_data = {
  39. "task": task_name,
  40. "id": task_id,
  41. "args": task_args or [],
  42. "kwargs": task_kwargs or {},
  43. "retries": kwargs.get("retries", 0),
  44. "eta": kwargs.get("eta"),
  45. }
  46. if part_of_set:
  47. message_data["taskset"] = part_of_set
  48. self.send(message_data, **extract_msg_options(kwargs))
  49. return task_id
  50. class TaskConsumer(Consumer):
  51. """The AMQP Task Consumer class."""
  52. queue = conf.AMQP_CONSUMER_QUEUE
  53. exchange = conf.AMQP_EXCHANGE
  54. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  55. exchange_type = conf.AMQP_EXCHANGE_TYPE
  56. decoder = pickle.loads
  57. auto_ack = False
  58. no_ack = False
  59. class StatsPublisher(Publisher):
  60. exchange = "celerygraph"
  61. routing_key = "stats"
  62. encoder = pickle.dumps
  63. class StatsConsumer(Consumer):
  64. queue = "celerygraph"
  65. exchange = "celerygraph"
  66. routing_key = "stats"
  67. exchange_type = "direct"
  68. decoder = pickle.loads
  69. no_ack=True