messaging.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. from datetime import datetime, timedelta
  6. from itertools import count
  7. from carrot.connection import BrokerConnection
  8. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  9. from celery import conf
  10. from celery import signals
  11. from celery.utils import gen_unique_id, mitemgetter, noop
  12. from celery.utils.functional import wraps
  13. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  14. "routing_key", "serializer", "delivery_mode")
  15. get_msg_options = mitemgetter(*MSG_OPTIONS)
  16. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  17. default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
  18. _queues_declared = False
  19. _exchanges_declared = set()
  20. class TaskPublisher(Publisher):
  21. """Publish tasks."""
  22. exchange = default_queue["exchange"]
  23. exchange_type = default_queue["exchange_type"]
  24. routing_key = conf.DEFAULT_ROUTING_KEY
  25. serializer = conf.TASK_SERIALIZER
  26. auto_declare = False
  27. def __init__(self, *args, **kwargs):
  28. super(TaskPublisher, self).__init__(*args, **kwargs)
  29. # Make sure all queues are declared.
  30. global _queues_declared
  31. if not _queues_declared:
  32. consumers = get_consumer_set(self.connection)
  33. consumers.close()
  34. _queues_declared = True
  35. self.declare()
  36. def declare(self):
  37. if self.exchange not in _exchanges_declared:
  38. super(TaskPublisher, self).declare()
  39. _exchanges_declared.add(self.exchange)
  40. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  41. countdown=None, eta=None, task_id=None, taskset_id=None,
  42. expires=None, **kwargs):
  43. """Delay task for execution by the celery nodes."""
  44. task_id = task_id or gen_unique_id()
  45. task_args = task_args or []
  46. task_kwargs = task_kwargs or {}
  47. now = None
  48. if countdown: # Convert countdown to ETA.
  49. now = datetime.now()
  50. eta = now + timedelta(seconds=countdown)
  51. if not isinstance(task_args, (list, tuple)):
  52. raise ValueError("task args must be a list or tuple")
  53. if not isinstance(task_kwargs, dict):
  54. raise ValueError("task kwargs must be a dictionary")
  55. if isinstance(expires, int):
  56. now = now or datetime.now()
  57. expires = now + timedelta(seconds=expires)
  58. message_data = {
  59. "task": task_name,
  60. "id": task_id,
  61. "args": task_args or [],
  62. "kwargs": task_kwargs or {},
  63. "retries": kwargs.get("retries", 0),
  64. "eta": eta and eta.isoformat(),
  65. "expires": expires and expires.isoformat(),
  66. }
  67. if taskset_id:
  68. message_data["taskset"] = taskset_id
  69. self.send(message_data, **extract_msg_options(kwargs))
  70. signals.task_sent.send(sender=task_name, **message_data)
  71. return task_id
  72. class ConsumerSet(_ConsumerSet):
  73. """ConsumerSet with an optional decode error callback.
  74. For more information see :class:`carrot.messaging.ConsumerSet`.
  75. .. attribute:: on_decode_error
  76. Callback called if a message had decoding errors.
  77. The callback is called with the signature::
  78. callback(message, exception)
  79. """
  80. on_decode_error = None
  81. def _receive_callback(self, raw_message):
  82. message = self.backend.message_to_python(raw_message)
  83. if self.auto_ack and not message.acknowledged:
  84. message.ack()
  85. try:
  86. decoded = message.decode()
  87. except Exception, exc:
  88. if self.on_decode_error:
  89. return self.on_decode_error(message, exc)
  90. else:
  91. raise
  92. self.receive(decoded, message)
  93. class TaskConsumer(Consumer):
  94. """Consume tasks"""
  95. queue = conf.DEFAULT_QUEUE
  96. exchange = default_queue["exchange"]
  97. routing_key = default_queue["binding_key"]
  98. exchange_type = default_queue["exchange_type"]
  99. class EventPublisher(Publisher):
  100. """Publish events"""
  101. exchange = conf.EVENT_EXCHANGE
  102. exchange_type = conf.EVENT_EXCHANGE_TYPE
  103. routing_key = conf.EVENT_ROUTING_KEY
  104. serializer = conf.EVENT_SERIALIZER
  105. class EventConsumer(Consumer):
  106. """Consume events"""
  107. queue = conf.EVENT_QUEUE
  108. exchange = conf.EVENT_EXCHANGE
  109. exchange_type = conf.EVENT_EXCHANGE_TYPE
  110. routing_key = conf.EVENT_ROUTING_KEY
  111. no_ack = True
  112. class ControlReplyConsumer(Consumer):
  113. exchange = "celerycrq"
  114. exchange_type = "direct"
  115. durable = False
  116. exclusive = False
  117. auto_delete = True
  118. no_ack = True
  119. def __init__(self, connection, ticket, **kwargs):
  120. self.ticket = ticket
  121. queue = "%s.%s" % (self.exchange, ticket)
  122. super(ControlReplyConsumer, self).__init__(connection,
  123. queue=queue,
  124. routing_key=ticket,
  125. **kwargs)
  126. def collect(self, limit=None, timeout=1):
  127. responses = []
  128. def callback(message_data, message):
  129. responses.append(message_data)
  130. self.callbacks = [callback]
  131. self.consume()
  132. for i in limit and range(limit) or count():
  133. try:
  134. self.connection.drain_events(timeout=timeout)
  135. except socket.timeout:
  136. break
  137. return responses
  138. class ControlReplyPublisher(Publisher):
  139. exchange = "celerycrq"
  140. exchange_type = "direct"
  141. delivery_mode = "non-persistent"
  142. durable = False
  143. auto_delete = True
  144. class BroadcastPublisher(Publisher):
  145. """Publish broadcast commands"""
  146. ReplyTo = ControlReplyConsumer
  147. exchange = conf.BROADCAST_EXCHANGE
  148. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  149. def send(self, type, arguments, destination=None, reply_ticket=None):
  150. """Send broadcast command."""
  151. arguments["command"] = type
  152. arguments["destination"] = destination
  153. if reply_ticket:
  154. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  155. "routing_key": reply_ticket}
  156. super(BroadcastPublisher, self).send({"control": arguments})
  157. class BroadcastConsumer(Consumer):
  158. """Consume broadcast commands"""
  159. queue = conf.BROADCAST_QUEUE
  160. exchange = conf.BROADCAST_EXCHANGE
  161. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  162. no_ack = True
  163. def __init__(self, *args, **kwargs):
  164. hostname = kwargs.pop("hostname", None) or socket.gethostname()
  165. self.queue = "%s_%s" % (self.queue, hostname)
  166. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  167. def establish_connection(hostname=None, userid=None, password=None,
  168. virtual_host=None, port=None, ssl=None, insist=None,
  169. connect_timeout=None, backend_cls=None):
  170. """Establish a connection to the message broker."""
  171. if insist is None:
  172. insist = conf.BROKER_INSIST
  173. if ssl is None:
  174. ssl = conf.BROKER_USE_SSL
  175. if connect_timeout is None:
  176. connect_timeout = conf.BROKER_CONNECTION_TIMEOUT
  177. return BrokerConnection(hostname or conf.BROKER_HOST,
  178. userid or conf.BROKER_USER,
  179. password or conf.BROKER_PASSWORD,
  180. virtual_host or conf.BROKER_VHOST,
  181. port or conf.BROKER_PORT,
  182. backend_cls=backend_cls or conf.BROKER_BACKEND,
  183. insist=insist, ssl=ssl,
  184. connect_timeout=connect_timeout)
  185. def with_connection(fun):
  186. """Decorator for providing default message broker connection for functions
  187. supporting the ``connection`` and ``connect_timeout`` keyword
  188. arguments."""
  189. @wraps(fun)
  190. def _inner(*args, **kwargs):
  191. connection = kwargs.get("connection")
  192. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  193. kwargs["connection"] = conn = connection or \
  194. establish_connection(connect_timeout=timeout)
  195. close_connection = not connection and conn.close or noop
  196. try:
  197. return fun(*args, **kwargs)
  198. finally:
  199. close_connection()
  200. return _inner
  201. def get_consumer_set(connection, queues=None, **options):
  202. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  203. configuration.
  204. Defaults to the queues in ``CELERY_QUEUES``.
  205. """
  206. queues = queues or conf.get_queues()
  207. cset = ConsumerSet(connection)
  208. for queue_name, queue_options in queues.items():
  209. queue_options = dict(queue_options)
  210. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  211. consumer = Consumer(connection, queue=queue_name,
  212. backend=cset.backend, **queue_options)
  213. cset.consumers.append(consumer)
  214. return cset