messaging.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from carrot.connection import DjangoBrokerConnection
  5. from carrot.messaging import Publisher, Consumer, ConsumerSet
  6. from celery import conf
  7. from celery import signals
  8. from celery.utils import gen_unique_id
  9. from celery.utils import mitemgetter
  10. MSG_OPTIONS = ("mandatory", "priority",
  11. "immediate", "routing_key",
  12. "serializer")
  13. get_msg_options = mitemgetter(*MSG_OPTIONS)
  14. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  15. class TaskPublisher(Publisher):
  16. """The AMQP Task Publisher class."""
  17. exchange = conf.AMQP_EXCHANGE
  18. exchange_type = conf.AMQP_EXCHANGE_TYPE
  19. routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
  20. serializer = conf.TASK_SERIALIZER
  21. def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
  22. """Delay task for execution by the celery nodes."""
  23. return self._delay_task(task_name=task_name, task_args=task_args,
  24. task_kwargs=task_kwargs, **kwargs)
  25. def _delay_task(self, task_name, task_id=None, taskset_id=None,
  26. task_args=None, task_kwargs=None, **kwargs):
  27. """INTERNAL"""
  28. task_id = task_id or gen_unique_id()
  29. eta = kwargs.get("eta")
  30. eta = eta and eta.isoformat()
  31. message_data = {
  32. "task": task_name,
  33. "id": task_id,
  34. "args": task_args or [],
  35. "kwargs": task_kwargs or {},
  36. "retries": kwargs.get("retries", 0),
  37. "eta": eta,
  38. }
  39. if taskset_id:
  40. message_data["taskset"] = taskset_id
  41. self.send(message_data, **extract_msg_options(kwargs))
  42. signals.task_sent.send(sender=task_name, **message_data)
  43. return task_id
  44. def get_consumer_set(connection, queues=conf.AMQP_CONSUMER_QUEUES, **options):
  45. return ConsumerSet(connection, from_dict=queues, **options)
  46. class TaskConsumer(Consumer):
  47. """The AMQP Task Consumer class."""
  48. queue = conf.AMQP_CONSUMER_QUEUE
  49. exchange = conf.AMQP_EXCHANGE
  50. routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
  51. exchange_type = conf.AMQP_EXCHANGE_TYPE
  52. auto_ack = False
  53. no_ack = False
  54. class EventPublisher(Publisher):
  55. exchange = "celeryevent"
  56. routing_key = "event"
  57. class EventConsumer(Consumer):
  58. queue = "celeryevent"
  59. exchange = "celeryevent"
  60. routing_key = "event"
  61. exchange_type = "direct"
  62. no_ack = True
  63. class BroadcastPublisher(Publisher):
  64. exchange = "celeryctl"
  65. exchange_type = "fanout"
  66. routing_key = ""
  67. def revoke(self, task_id):
  68. self.send("revoke", dict(task_id=task_id))
  69. def send(self, type, data):
  70. data["command"] = type
  71. super(BroadcastPublisher, self).send({"control": data})
  72. class BroadcastConsumer(Consumer):
  73. queue = "celeryctl"
  74. exchange = "celeryctl"
  75. routing_key = ""
  76. exchange_type = "fanout"
  77. no_ack = True
  78. def establish_connection(connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  79. return DjangoBrokerConnection(connect_timeout=connect_timeout)
  80. def with_connection(fun, connection=None,
  81. connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  82. conn = connection or establish_connection()
  83. close_connection = not connection and conn.close or noop
  84. try:
  85. return fun(conn)
  86. finally:
  87. close_connection()
  88. def get_connection_info():
  89. broker_connection = DjangoBrokerConnection()
  90. carrot_backend = broker_connection.backend_cls
  91. if carrot_backend and not isinstance(carrot_backend, str):
  92. carrot_backend = carrot_backend.__name__
  93. port = broker_connection.port or \
  94. broker_connection.get_backend_cls().default_port
  95. port = port and ":%s" % port or ""
  96. vhost = broker_connection.virtual_host
  97. if not vhost.startswith("/"):
  98. vhost = "/" + vhost
  99. return "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s" % {
  100. "carrot_backend": carrot_backend,
  101. "userid": broker_connection.userid,
  102. "host": broker_connection.hostname,
  103. "port": port,
  104. "vhost": vhost}