messaging.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.messaging import Publisher, Consumer, ConsumerSet
  5. from celery import conf
  6. from celery.utils import gen_unique_id
  7. from celery.utils import mitemgetter
  8. from celery.serialization import pickle
  9. MSG_OPTIONS = ("mandatory", "priority",
  10. "immediate", "routing_key")
  11. get_msg_options = mitemgetter(*MSG_OPTIONS)
  12. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  13. class TaskPublisher(Publisher):
  14. """The AMQP Task Publisher class."""
  15. exchange = conf.AMQP_EXCHANGE
  16. exchange_type = conf.AMQP_EXCHANGE_TYPE
  17. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  18. serializer = "pickle"
  19. encoder = pickle.dumps
  20. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  21. """Delay task for execution by the celery nodes."""
  22. return self._delay_task(task_name=task_name, task_args=task_args,
  23. task_kwargs=task_kwargs, **kwargs)
  24. def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
  25. **kwargs):
  26. """Delay a task which part of a task set."""
  27. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  28. task_args=task_args, task_kwargs=task_kwargs,
  29. **kwargs)
  30. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  31. task_args=None, task_kwargs=None, **kwargs):
  32. """INTERNAL"""
  33. task_id = task_id or gen_unique_id()
  34. message_data = {
  35. "task": task_name,
  36. "id": task_id,
  37. "args": task_args or [],
  38. "kwargs": task_kwargs or {},
  39. "retries": kwargs.get("retries", 0),
  40. "eta": kwargs.get("eta"),
  41. }
  42. if part_of_set:
  43. message_data["taskset"] = part_of_set
  44. self.send(message_data, **extract_msg_options(kwargs))
  45. return task_id
  46. def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
  47. return ConsumerSet(connection, from_dict=queues, **options)
  48. class TaskConsumer(Consumer):
  49. """The AMQP Task Consumer class."""
  50. queue = conf.AMQP_CONSUMER_QUEUE
  51. exchange = conf.AMQP_EXCHANGE
  52. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  53. exchange_type = conf.AMQP_EXCHANGE_TYPE
  54. decoder = pickle.loads
  55. auto_ack = False
  56. no_ack = False
  57. class StatsPublisher(Publisher):
  58. exchange = "celerygraph"
  59. routing_key = "stats"
  60. encoder = pickle.dumps
  61. class StatsConsumer(Consumer):
  62. queue = "celerygraph"
  63. exchange = "celerygraph"
  64. routing_key = "stats"
  65. exchange_type = "direct"
  66. decoder = pickle.loads
  67. no_ack=True