messaging.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.connection import DjangoBrokerConnection
  5. from carrot.messaging import Publisher, Consumer, ConsumerSet
  6. from celery import conf
  7. from celery import signals
  8. from celery.utils import gen_unique_id
  9. from celery.utils import mitemgetter
  10. MSG_OPTIONS = ("mandatory", "priority",
  11. "immediate", "routing_key",
  12. "serializer")
  13. get_msg_options = mitemgetter(*MSG_OPTIONS)
  14. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  15. class TaskPublisher(Publisher):
  16. """The AMQP Task Publisher class."""
  17. exchange = conf.AMQP_EXCHANGE
  18. exchange_type = conf.AMQP_EXCHANGE_TYPE
  19. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  20. serializer = conf.TASK_SERIALIZER
  21. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  22. """Delay task for execution by the celery nodes."""
  23. return self._delay_task(task_name=task_name, task_args=task_args,
  24. task_kwargs=task_kwargs, **kwargs)
  25. def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
  26. **kwargs):
  27. """Delay a task which part of a task set."""
  28. return self._delay_task(task_name=task_name, part_of_set=taskset_id,
  29. task_args=task_args, task_kwargs=task_kwargs,
  30. **kwargs)
  31. def _delay_task(self, task_name, task_id=None, part_of_set=None,
  32. task_args=None, task_kwargs=None, **kwargs):
  33. """INTERNAL"""
  34. task_id = task_id or gen_unique_id()
  35. eta = kwargs.get("eta")
  36. eta = eta and eta.isoformat()
  37. message_data = {
  38. "task": task_name,
  39. "id": task_id,
  40. "args": task_args or [],
  41. "kwargs": task_kwargs or {},
  42. "retries": kwargs.get("retries", 0),
  43. "eta": eta,
  44. }
  45. if part_of_set:
  46. message_data["taskset"] = part_of_set
  47. self.send(message_data, **extract_msg_options(kwargs))
  48. signals.task_sent.send(sender=task_name, **message_data)
  49. return task_id
  50. def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
  51. return ConsumerSet(connection, from_dict=queues, **options)
  52. class TaskConsumer(Consumer):
  53. """The AMQP Task Consumer class."""
  54. queue = conf.AMQP_CONSUMER_QUEUE
  55. exchange = conf.AMQP_EXCHANGE
  56. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  57. exchange_type = conf.AMQP_EXCHANGE_TYPE
  58. auto_ack = False
  59. no_ack = False
  60. class StatsPublisher(Publisher):
  61. exchange = "celerygraph"
  62. routing_key = "stats"
  63. class StatsConsumer(Consumer):
  64. queue = "celerygraph"
  65. exchange = "celerygraph"
  66. routing_key = "stats"
  67. exchange_type = "direct"
  68. no_ack = True
  69. class EventPublisher(Publisher):
  70. exchange = "celeryevent"
  71. routing_key = "event"
  72. class EventConsumer(Consumer):
  73. queue = "celeryevent"
  74. exchange = "celeryevent"
  75. routing_key = "event"
  76. exchange_type = "direct"
  77. no_ack = True
  78. def get_connection_info():
  79. broker_connection = DjangoBrokerConnection()
  80. carrot_backend = broker_connection.backend_cls
  81. if carrot_backend and not isinstance(carrot_backend, str):
  82. carrot_backend = carrot_backend.__name__
  83. port = broker_connection.port or \
  84. broker_connection.get_backend_cls().default_port
  85. port = port and ":%s" % port or ""
  86. vhost = broker_connection.virtual_host
  87. if not vhost.startswith("/"):
  88. vhost = "/" + vhost
  89. return "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s" % {
  90. "carrot_backend": carrot_backend,
  91. "userid": broker_connection.userid,
  92. "host": broker_connection.hostname,
  93. "port": port,
  94. "vhost": vhost}