messaging.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. from datetime import datetime, timedelta
  6. from itertools import count
  7. from carrot.connection import BrokerConnection
  8. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  9. from celery import conf
  10. from celery import signals
  11. from celery.utils import gen_unique_id, mitemgetter, noop
  12. from celery.utils.functional import wraps
  13. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  14. "routing_key", "serializer", "delivery_mode")
  15. get_msg_options = mitemgetter(*MSG_OPTIONS)
  16. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  17. default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
  18. _queues_declared = False
  19. _exchanges_declared = set()
  20. class TaskPublisher(Publisher):
  21. """Publish tasks."""
  22. exchange = default_queue["exchange"]
  23. exchange_type = default_queue["exchange_type"]
  24. routing_key = conf.DEFAULT_ROUTING_KEY
  25. serializer = conf.TASK_SERIALIZER
  26. auto_declare = False
  27. def __init__(self, *args, **kwargs):
  28. super(TaskPublisher, self).__init__(*args, **kwargs)
  29. # Make sure all queues are declared.
  30. global _queues_declared
  31. if not _queues_declared:
  32. consumers = get_consumer_set(self.connection)
  33. consumers.close()
  34. _queues_declared = True
  35. self.declare()
  36. def declare(self):
  37. if self.exchange not in _exchanges_declared:
  38. super(TaskPublisher, self).declare()
  39. _exchanges_declared.add(self.exchange)
  40. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  41. countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
  42. """Delay task for execution by the celery nodes."""
  43. task_id = task_id or gen_unique_id()
  44. task_args = task_args or []
  45. task_kwargs = task_kwargs or {}
  46. if countdown: # Convert countdown to ETA.
  47. eta = datetime.now() + timedelta(seconds=countdown)
  48. if not isinstance(task_args, (list, tuple)):
  49. raise ValueError("task args must be a list or tuple")
  50. if not isinstance(task_kwargs, dict):
  51. raise ValueError("task kwargs must be a dictionary")
  52. message_data = {
  53. "task": task_name,
  54. "id": task_id,
  55. "args": task_args or [],
  56. "kwargs": task_kwargs or {},
  57. "retries": kwargs.get("retries", 0),
  58. "eta": eta and eta.isoformat(),
  59. }
  60. if taskset_id:
  61. message_data["taskset"] = taskset_id
  62. self.send(message_data, **extract_msg_options(kwargs))
  63. signals.task_sent.send(sender=task_name, **message_data)
  64. return task_id
  65. class ConsumerSet(_ConsumerSet):
  66. """ConsumerSet with an optional decode error callback.
  67. For more information see :class:`carrot.messaging.ConsumerSet`.
  68. .. attribute:: on_decode_error
  69. Callback called if a message had decoding errors.
  70. The callback is called with the signature::
  71. callback(message, exception)
  72. """
  73. on_decode_error = None
  74. def _receive_callback(self, raw_message):
  75. message = self.backend.message_to_python(raw_message)
  76. if self.auto_ack and not message.acknowledged:
  77. message.ack()
  78. try:
  79. decoded = message.decode()
  80. except Exception, exc:
  81. if self.on_decode_error:
  82. return self.on_decode_error(message, exc)
  83. else:
  84. raise
  85. self.receive(decoded, message)
  86. class TaskConsumer(Consumer):
  87. """Consume tasks"""
  88. queue = conf.DEFAULT_QUEUE
  89. exchange = default_queue["exchange"]
  90. routing_key = default_queue["binding_key"]
  91. exchange_type = default_queue["exchange_type"]
  92. class EventPublisher(Publisher):
  93. """Publish events"""
  94. exchange = conf.EVENT_EXCHANGE
  95. exchange_type = conf.EVENT_EXCHANGE_TYPE
  96. routing_key = conf.EVENT_ROUTING_KEY
  97. serializer = conf.EVENT_SERIALIZER
  98. class EventConsumer(Consumer):
  99. """Consume events"""
  100. queue = conf.EVENT_QUEUE
  101. exchange = conf.EVENT_EXCHANGE
  102. exchange_type = conf.EVENT_EXCHANGE_TYPE
  103. routing_key = conf.EVENT_ROUTING_KEY
  104. no_ack = True
  105. class ControlReplyConsumer(Consumer):
  106. exchange = "celerycrq"
  107. exchange_type = "direct"
  108. durable = False
  109. exclusive = False
  110. auto_delete = True
  111. no_ack = True
  112. def __init__(self, connection, ticket, **kwargs):
  113. self.ticket = ticket
  114. queue = "%s.%s" % (self.exchange, ticket)
  115. super(ControlReplyConsumer, self).__init__(connection,
  116. queue=queue,
  117. routing_key=ticket,
  118. **kwargs)
  119. def collect(self, limit=None, timeout=1):
  120. responses = []
  121. def callback(message_data, message):
  122. responses.append(message_data)
  123. self.callbacks = [callback]
  124. self.consume()
  125. for i in limit and range(limit) or count():
  126. try:
  127. self.connection.drain_events(timeout=timeout)
  128. except socket.timeout:
  129. break
  130. return responses
  131. class ControlReplyPublisher(Publisher):
  132. exchange = "celerycrq"
  133. exchange_type = "direct"
  134. delivery_mode = "non-persistent"
  135. durable = False
  136. auto_delete = True
  137. class BroadcastPublisher(Publisher):
  138. """Publish broadcast commands"""
  139. ReplyTo = ControlReplyConsumer
  140. exchange = conf.BROADCAST_EXCHANGE
  141. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  142. def send(self, type, arguments, destination=None, reply_ticket=None):
  143. """Send broadcast command."""
  144. arguments["command"] = type
  145. arguments["destination"] = destination
  146. if reply_ticket:
  147. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  148. "routing_key": reply_ticket}
  149. super(BroadcastPublisher, self).send({"control": arguments})
  150. class BroadcastConsumer(Consumer):
  151. """Consume broadcast commands"""
  152. queue = conf.BROADCAST_QUEUE
  153. exchange = conf.BROADCAST_EXCHANGE
  154. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  155. no_ack = True
  156. def __init__(self, *args, **kwargs):
  157. hostname = kwargs.pop("hostname", None) or socket.gethostname()
  158. self.queue = "%s_%s" % (self.queue, hostname)
  159. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  160. def establish_connection(hostname=None, userid=None, password=None,
  161. virtual_host=None, port=None, ssl=None, insist=None,
  162. connect_timeout=None, backend_cls=None):
  163. """Establish a connection to the message broker."""
  164. if insist is None:
  165. insist = conf.BROKER_INSIST
  166. if ssl is None:
  167. ssl = conf.BROKER_USE_SSL
  168. if connect_timeout is None:
  169. connect_timeout = conf.BROKER_CONNECTION_TIMEOUT
  170. return BrokerConnection(hostname or conf.BROKER_HOST,
  171. userid or conf.BROKER_USER,
  172. password or conf.BROKER_PASSWORD,
  173. virtual_host or conf.BROKER_VHOST,
  174. port or conf.BROKER_PORT,
  175. backend_cls=backend_cls or conf.BROKER_BACKEND,
  176. insist=insist, ssl=ssl,
  177. connect_timeout=connect_timeout)
  178. def with_connection(fun):
  179. """Decorator for providing default message broker connection for functions
  180. supporting the ``connection`` and ``connect_timeout`` keyword
  181. arguments."""
  182. @wraps(fun)
  183. def _inner(*args, **kwargs):
  184. connection = kwargs.get("connection")
  185. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  186. kwargs["connection"] = conn = connection or \
  187. establish_connection(connect_timeout=timeout)
  188. close_connection = not connection and conn.close or noop
  189. try:
  190. return fun(*args, **kwargs)
  191. finally:
  192. close_connection()
  193. return _inner
  194. def get_consumer_set(connection, queues=None, **options):
  195. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  196. configuration.
  197. Defaults to the queues in ``CELERY_QUEUES``.
  198. """
  199. queues = queues or conf.get_queues()
  200. cset = ConsumerSet(connection)
  201. for queue_name, queue_options in queues.items():
  202. queue_options = dict(queue_options)
  203. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  204. consumer = Consumer(connection, queue=queue_name,
  205. backend=cset.backend, **queue_options)
  206. cset.consumers.append(consumer)
  207. return cset
  208. @with_connection
  209. def reply(data, exchange, routing_key, connection=None, connect_timeout=None,
  210. **kwargs):
  211. pub = ControlReplyPublisher(connection, exchange=exchange,
  212. routing_key=routing_key, **kwargs)
  213. pub.send(data)