messaging.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. from datetime import datetime, timedelta
  6. from itertools import count
  7. from carrot.connection import DjangoBrokerConnection
  8. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  9. from billiard.utils.functional import wraps
  10. from celery import conf
  11. from celery import signals
  12. from celery.utils import gen_unique_id, mitemgetter, noop
  13. from celery.routes import lookup_route, expand_destination
  14. from celery.loaders import load_settings
  15. MSG_OPTIONS = ("mandatory", "priority",
  16. "immediate", "routing_key",
  17. "serializer", "delivery_mode")
  18. get_msg_options = mitemgetter(*MSG_OPTIONS)
  19. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  20. default_queue = conf.get_routing_table()[conf.DEFAULT_QUEUE]
  21. _queues_declared = False
  22. _exchanges_declared = {}
  23. class TaskPublisher(Publisher):
  24. """Publish tasks."""
  25. exchange = default_queue["exchange"]
  26. exchange_type = default_queue["exchange_type"]
  27. routing_key = conf.DEFAULT_ROUTING_KEY
  28. serializer = conf.TASK_SERIALIZER
  29. auto_declare = False
  30. def __init__(self, *args, **kwargs):
  31. super(TaskPublisher, self).__init__(*args, **kwargs)
  32. # Make sure all queues are declared.
  33. global _queues_declared
  34. if not _queues_declared:
  35. consumers = get_consumer_set(self.connection)
  36. consumers.close()
  37. _queues_declared = True
  38. if self.exchange not in _exchanges_declared:
  39. self.declare()
  40. _exchanges_declared[self.exchange] = True
  41. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  42. countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
  43. """Delay task for execution by the celery nodes."""
  44. task_id = task_id or gen_unique_id()
  45. if countdown: # Convert countdown to ETA.
  46. eta = datetime.now() + timedelta(seconds=countdown)
  47. task_args = task_args or []
  48. task_kwargs = task_kwargs or {}
  49. if not isinstance(task_args, (list, tuple)):
  50. raise ValueError("task args must be a list or tuple")
  51. if not isinstance(task_kwargs, dict):
  52. raise ValueError("task kwargs must be a dictionary")
  53. message_data = {
  54. "task": task_name,
  55. "id": task_id,
  56. "args": task_args or [],
  57. "kwargs": task_kwargs or {},
  58. "retries": kwargs.get("retries", 0),
  59. "eta": eta and eta.isoformat(),
  60. }
  61. if taskset_id:
  62. message_data["taskset"] = taskset_id
  63. route = {}
  64. if conf.ROUTES:
  65. route = lookup_route(conf.ROUTES, task_name, task_id,
  66. task_args, task_kwargs)
  67. if route:
  68. dest = expand_destination(route, conf.get_routing_table())
  69. msg_options = dict(extract_msg_options(kwargs), **dest)
  70. else:
  71. msg_options = extract_msg_options(kwargs)
  72. self.send(message_data, **msg_options)
  73. signals.task_sent.send(sender=task_name, **message_data)
  74. return task_id
  75. class ConsumerSet(_ConsumerSet):
  76. """ConsumerSet with an optional decode error callback.
  77. For more information see :class:`carrot.messaging.ConsumerSet`.
  78. .. attribute:: on_decode_error
  79. Callback called if a message had decoding errors.
  80. The callback is called with the signature::
  81. callback(message, exception)
  82. """
  83. on_decode_error = None
  84. def _receive_callback(self, raw_message):
  85. message = self.backend.message_to_python(raw_message)
  86. if self.auto_ack and not message.acknowledged:
  87. message.ack()
  88. try:
  89. decoded = message.decode()
  90. except Exception, exc:
  91. if self.on_decode_error:
  92. return self.on_decode_error(message, exc)
  93. else:
  94. raise
  95. self.receive(decoded, message)
  96. class TaskConsumer(Consumer):
  97. """Consume tasks"""
  98. queue = conf.DEFAULT_QUEUE
  99. exchange = default_queue["exchange"]
  100. routing_key = default_queue["binding_key"]
  101. exchange_type = default_queue["exchange_type"]
  102. class EventPublisher(Publisher):
  103. """Publish events"""
  104. exchange = conf.EVENT_EXCHANGE
  105. exchange_type = conf.EVENT_EXCHANGE_TYPE
  106. routing_key = conf.EVENT_ROUTING_KEY
  107. serializer = conf.EVENT_SERIALIZER
  108. class EventConsumer(Consumer):
  109. """Consume events"""
  110. queue = conf.EVENT_QUEUE
  111. exchange = conf.EVENT_EXCHANGE
  112. exchange_type = conf.EVENT_EXCHANGE_TYPE
  113. routing_key = conf.EVENT_ROUTING_KEY
  114. no_ack = True
  115. class ControlReplyConsumer(Consumer):
  116. exchange = "celerycrq"
  117. exchange_type = "direct"
  118. durable = False
  119. exclusive = False
  120. auto_delete = True
  121. no_ack = True
  122. def __init__(self, connection, ticket, **kwargs):
  123. self.ticket = ticket
  124. queue = "%s.%s" % (self.exchange, ticket)
  125. super(ControlReplyConsumer, self).__init__(connection,
  126. queue=queue,
  127. routing_key=ticket,
  128. **kwargs)
  129. def collect(self, limit=None, timeout=1):
  130. responses = []
  131. def callback(message_data, message):
  132. responses.append(message_data)
  133. self.callbacks = [callback]
  134. self.consume()
  135. for i in limit and range(limit) or count():
  136. try:
  137. self.connection.drain_events(timeout=timeout)
  138. except socket.timeout:
  139. break
  140. return responses
  141. class ControlReplyPublisher(Publisher):
  142. exchange = "celerycrq"
  143. exchange_type = "direct"
  144. delivery_mode = "non-persistent"
  145. class BroadcastPublisher(Publisher):
  146. """Publish broadcast commands"""
  147. ReplyTo = ControlReplyConsumer
  148. exchange = conf.BROADCAST_EXCHANGE
  149. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  150. def send(self, type, arguments, destination=None, reply_ticket=None):
  151. """Send broadcast command."""
  152. arguments["command"] = type
  153. arguments["destination"] = destination
  154. if reply_ticket:
  155. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  156. "routing_key": reply_ticket}
  157. super(BroadcastPublisher, self).send({"control": arguments})
  158. class BroadcastConsumer(Consumer):
  159. """Consume broadcast commands"""
  160. queue = conf.BROADCAST_QUEUE
  161. exchange = conf.BROADCAST_EXCHANGE
  162. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  163. no_ack = True
  164. def __init__(self, *args, **kwargs):
  165. hostname = kwargs.pop("hostname", None) or socket.gethostname()
  166. self.queue = "%s_%s" % (self.queue, hostname)
  167. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  168. def establish_connection(connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  169. """Establish a connection to the message broker."""
  170. return DjangoBrokerConnection(connect_timeout=connect_timeout,
  171. settings=load_settings())
  172. def with_connection(fun):
  173. """Decorator for providing default message broker connection for functions
  174. supporting the ``connection`` and ``connect_timeout`` keyword
  175. arguments."""
  176. @wraps(fun)
  177. def _inner(*args, **kwargs):
  178. connection = kwargs.get("connection")
  179. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  180. kwargs["connection"] = conn = connection or \
  181. establish_connection(connect_timeout=timeout)
  182. close_connection = not connection and conn.close or noop
  183. try:
  184. return fun(*args, **kwargs)
  185. finally:
  186. close_connection()
  187. return _inner
  188. def get_consumer_set(connection, queues=None, **options):
  189. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  190. configuration.
  191. Defaults to the queues in ``CELERY_QUEUES``.
  192. """
  193. queues = queues or conf.get_routing_table()
  194. cset = ConsumerSet(connection)
  195. for queue_name, queue_options in queues.items():
  196. queue_options = dict(queue_options)
  197. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  198. consumer = Consumer(connection, queue=queue_name,
  199. backend=cset.backend, **queue_options)
  200. cset.consumers.append(consumer)
  201. return cset
  202. @with_connection
  203. def reply(data, exchange, routing_key, connection=None, connect_timeout=None,
  204. **kwargs):
  205. pub = Publisher(connection, exchange=exchange,
  206. routing_key=routing_key, **kwargs)
  207. pub.send(data)