messaging.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. from datetime import datetime, timedelta
  6. from itertools import count
  7. from carrot.connection import BrokerConnection
  8. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  9. from celery import conf
  10. from celery import signals
  11. from celery.utils import gen_unique_id, mitemgetter, noop
  12. from celery.utils.functional import wraps
  13. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  14. "routing_key", "serializer", "delivery_mode")
  15. get_msg_options = mitemgetter(*MSG_OPTIONS)
  16. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  17. default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
  18. _queues_declared = False
  19. _exchanges_declared = {}
  20. class TaskPublisher(Publisher):
  21. """Publish tasks."""
  22. exchange = default_queue["exchange"]
  23. exchange_type = default_queue["exchange_type"]
  24. routing_key = conf.DEFAULT_ROUTING_KEY
  25. serializer = conf.TASK_SERIALIZER
  26. auto_declare = False
  27. def __init__(self, *args, **kwargs):
  28. super(TaskPublisher, self).__init__(*args, **kwargs)
  29. # Make sure all queues are declared.
  30. global _queues_declared
  31. if not _queues_declared:
  32. consumers = get_consumer_set(self.connection)
  33. consumers.close()
  34. _queues_declared = True
  35. if self.exchange not in _exchanges_declared:
  36. self.declare()
  37. _exchanges_declared[self.exchange] = True
  38. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  39. countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
  40. """Delay task for execution by the celery nodes."""
  41. task_id = task_id or gen_unique_id()
  42. task_args = task_args or []
  43. task_kwargs = task_kwargs or {}
  44. if countdown: # Convert countdown to ETA.
  45. eta = datetime.now() + timedelta(seconds=countdown)
  46. if not isinstance(task_args, (list, tuple)):
  47. raise ValueError("task args must be a list or tuple")
  48. if not isinstance(task_kwargs, dict):
  49. raise ValueError("task kwargs must be a dictionary")
  50. message_data = {
  51. "task": task_name,
  52. "id": task_id,
  53. "args": task_args or [],
  54. "kwargs": task_kwargs or {},
  55. "retries": kwargs.get("retries", 0),
  56. "eta": eta and eta.isoformat(),
  57. }
  58. if taskset_id:
  59. message_data["taskset"] = taskset_id
  60. self.send(message_data, **extract_msg_options(kwargs))
  61. signals.task_sent.send(sender=task_name, **message_data)
  62. return task_id
  63. class ConsumerSet(_ConsumerSet):
  64. """ConsumerSet with an optional decode error callback.
  65. For more information see :class:`carrot.messaging.ConsumerSet`.
  66. .. attribute:: on_decode_error
  67. Callback called if a message had decoding errors.
  68. The callback is called with the signature::
  69. callback(message, exception)
  70. """
  71. on_decode_error = None
  72. def _receive_callback(self, raw_message):
  73. message = self.backend.message_to_python(raw_message)
  74. if self.auto_ack and not message.acknowledged:
  75. message.ack()
  76. try:
  77. decoded = message.decode()
  78. except Exception, exc:
  79. if self.on_decode_error:
  80. return self.on_decode_error(message, exc)
  81. else:
  82. raise
  83. self.receive(decoded, message)
  84. class TaskConsumer(Consumer):
  85. """Consume tasks"""
  86. queue = conf.DEFAULT_QUEUE
  87. exchange = default_queue["exchange"]
  88. routing_key = default_queue["binding_key"]
  89. exchange_type = default_queue["exchange_type"]
  90. class EventPublisher(Publisher):
  91. """Publish events"""
  92. exchange = conf.EVENT_EXCHANGE
  93. exchange_type = conf.EVENT_EXCHANGE_TYPE
  94. routing_key = conf.EVENT_ROUTING_KEY
  95. serializer = conf.EVENT_SERIALIZER
  96. class EventConsumer(Consumer):
  97. """Consume events"""
  98. queue = conf.EVENT_QUEUE
  99. exchange = conf.EVENT_EXCHANGE
  100. exchange_type = conf.EVENT_EXCHANGE_TYPE
  101. routing_key = conf.EVENT_ROUTING_KEY
  102. no_ack = True
  103. class ControlReplyConsumer(Consumer):
  104. exchange = "celerycrq"
  105. exchange_type = "direct"
  106. durable = False
  107. exclusive = False
  108. auto_delete = True
  109. no_ack = True
  110. def __init__(self, connection, ticket, **kwargs):
  111. self.ticket = ticket
  112. queue = "%s.%s" % (self.exchange, ticket)
  113. super(ControlReplyConsumer, self).__init__(connection,
  114. queue=queue,
  115. routing_key=ticket,
  116. **kwargs)
  117. def collect(self, limit=None, timeout=1):
  118. responses = []
  119. def callback(message_data, message):
  120. responses.append(message_data)
  121. self.callbacks = [callback]
  122. self.consume()
  123. for i in limit and range(limit) or count():
  124. try:
  125. self.connection.drain_events(timeout=timeout)
  126. except socket.timeout:
  127. break
  128. return responses
  129. class ControlReplyPublisher(Publisher):
  130. exchange = "celerycrq"
  131. exchange_type = "direct"
  132. delivery_mode = "non-persistent"
  133. class BroadcastPublisher(Publisher):
  134. """Publish broadcast commands"""
  135. ReplyTo = ControlReplyConsumer
  136. exchange = conf.BROADCAST_EXCHANGE
  137. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  138. def send(self, type, arguments, destination=None, reply_ticket=None):
  139. """Send broadcast command."""
  140. arguments["command"] = type
  141. arguments["destination"] = destination
  142. if reply_ticket:
  143. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  144. "routing_key": reply_ticket}
  145. super(BroadcastPublisher, self).send({"control": arguments})
  146. class BroadcastConsumer(Consumer):
  147. """Consume broadcast commands"""
  148. queue = conf.BROADCAST_QUEUE
  149. exchange = conf.BROADCAST_EXCHANGE
  150. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  151. no_ack = True
  152. def __init__(self, *args, **kwargs):
  153. hostname = kwargs.pop("hostname", None) or socket.gethostname()
  154. self.queue = "%s_%s" % (self.queue, hostname)
  155. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  156. def establish_connection(hostname=None, userid=None, password=None,
  157. virtual_host=None, port=None, ssl=None, insist=None,
  158. connect_timeout=None, backend_cls=None):
  159. """Establish a connection to the message broker."""
  160. if insist is None:
  161. insist = conf.BROKER_INSIST
  162. if ssl is None:
  163. ssl = conf.BROKER_USE_SSL
  164. if connect_timeout is None:
  165. connect_timeout = conf.BROKER_CONNECTION_TIMEOUT
  166. return BrokerConnection(hostname or conf.BROKER_HOST,
  167. userid or conf.BROKER_USER,
  168. password or conf.BROKER_PASSWORD,
  169. virtual_host or conf.BROKER_VHOST,
  170. port or conf.BROKER_PORT,
  171. backend_cls=backend_cls or conf.BROKER_BACKEND,
  172. insist=insist, ssl=ssl,
  173. connect_timeout=connect_timeout)
  174. def with_connection(fun):
  175. """Decorator for providing default message broker connection for functions
  176. supporting the ``connection`` and ``connect_timeout`` keyword
  177. arguments."""
  178. @wraps(fun)
  179. def _inner(*args, **kwargs):
  180. connection = kwargs.get("connection")
  181. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  182. kwargs["connection"] = conn = connection or \
  183. establish_connection(connect_timeout=timeout)
  184. close_connection = not connection and conn.close or noop
  185. try:
  186. return fun(*args, **kwargs)
  187. finally:
  188. close_connection()
  189. return _inner
  190. def get_consumer_set(connection, queues=None, **options):
  191. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  192. configuration.
  193. Defaults to the queues in ``CELERY_QUEUES``.
  194. """
  195. queues = queues or conf.get_queues()
  196. cset = ConsumerSet(connection)
  197. for queue_name, queue_options in queues.items():
  198. queue_options = dict(queue_options)
  199. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  200. consumer = Consumer(connection, queue=queue_name,
  201. backend=cset.backend, **queue_options)
  202. cset.consumers.append(consumer)
  203. return cset
  204. @with_connection
  205. def reply(data, exchange, routing_key, connection=None, connect_timeout=None,
  206. **kwargs):
  207. pub = Publisher(connection, exchange=exchange,
  208. routing_key=routing_key, **kwargs)
  209. pub.send(data)