messaging.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. from datetime import datetime, timedelta
  6. from itertools import count
  7. from carrot.connection import BrokerConnection
  8. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  9. from celery import conf
  10. from celery import signals
  11. from celery.utils import gen_unique_id, mitemgetter, noop
  12. from celery.utils.functional import wraps
  13. from celery.loaders import load_settings
  14. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  15. "routing_key", "serializer", "delivery_mode")
  16. get_msg_options = mitemgetter(*MSG_OPTIONS)
  17. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  18. default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
  19. _queues_declared = False
  20. _exchanges_declared = {}
  21. class TaskPublisher(Publisher):
  22. """Publish tasks."""
  23. exchange = default_queue["exchange"]
  24. exchange_type = default_queue["exchange_type"]
  25. routing_key = conf.DEFAULT_ROUTING_KEY
  26. serializer = conf.TASK_SERIALIZER
  27. auto_declare = False
  28. def __init__(self, *args, **kwargs):
  29. super(TaskPublisher, self).__init__(*args, **kwargs)
  30. # Make sure all queues are declared.
  31. global _queues_declared
  32. if not _queues_declared:
  33. consumers = get_consumer_set(self.connection)
  34. consumers.close()
  35. _queues_declared = True
  36. if self.exchange not in _exchanges_declared:
  37. self.declare()
  38. _exchanges_declared[self.exchange] = True
  39. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  40. countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
  41. """Delay task for execution by the celery nodes."""
  42. task_id = task_id or gen_unique_id()
  43. task_args = task_args or []
  44. task_kwargs = task_kwargs or {}
  45. if countdown: # Convert countdown to ETA.
  46. eta = datetime.now() + timedelta(seconds=countdown)
  47. if not isinstance(task_args, (list, tuple)):
  48. raise ValueError("task args must be a list or tuple")
  49. if not isinstance(task_kwargs, dict):
  50. raise ValueError("task kwargs must be a dictionary")
  51. message_data = {
  52. "task": task_name,
  53. "id": task_id,
  54. "args": task_args or [],
  55. "kwargs": task_kwargs or {},
  56. "retries": kwargs.get("retries", 0),
  57. "eta": eta and eta.isoformat(),
  58. }
  59. if taskset_id:
  60. message_data["taskset"] = taskset_id
  61. self.send(message_data, **extract_msg_options(kwargs))
  62. signals.task_sent.send(sender=task_name, **message_data)
  63. return task_id
  64. class ConsumerSet(_ConsumerSet):
  65. """ConsumerSet with an optional decode error callback.
  66. For more information see :class:`carrot.messaging.ConsumerSet`.
  67. .. attribute:: on_decode_error
  68. Callback called if a message had decoding errors.
  69. The callback is called with the signature::
  70. callback(message, exception)
  71. """
  72. on_decode_error = None
  73. def _receive_callback(self, raw_message):
  74. message = self.backend.message_to_python(raw_message)
  75. if self.auto_ack and not message.acknowledged:
  76. message.ack()
  77. try:
  78. decoded = message.decode()
  79. except Exception, exc:
  80. if self.on_decode_error:
  81. return self.on_decode_error(message, exc)
  82. else:
  83. raise
  84. self.receive(decoded, message)
  85. class TaskConsumer(Consumer):
  86. """Consume tasks"""
  87. queue = conf.DEFAULT_QUEUE
  88. exchange = default_queue["exchange"]
  89. routing_key = default_queue["binding_key"]
  90. exchange_type = default_queue["exchange_type"]
  91. class EventPublisher(Publisher):
  92. """Publish events"""
  93. exchange = conf.EVENT_EXCHANGE
  94. exchange_type = conf.EVENT_EXCHANGE_TYPE
  95. routing_key = conf.EVENT_ROUTING_KEY
  96. serializer = conf.EVENT_SERIALIZER
  97. class EventConsumer(Consumer):
  98. """Consume events"""
  99. queue = conf.EVENT_QUEUE
  100. exchange = conf.EVENT_EXCHANGE
  101. exchange_type = conf.EVENT_EXCHANGE_TYPE
  102. routing_key = conf.EVENT_ROUTING_KEY
  103. no_ack = True
  104. class ControlReplyConsumer(Consumer):
  105. exchange = "celerycrq"
  106. exchange_type = "direct"
  107. durable = False
  108. exclusive = False
  109. auto_delete = True
  110. no_ack = True
  111. def __init__(self, connection, ticket, **kwargs):
  112. self.ticket = ticket
  113. queue = "%s.%s" % (self.exchange, ticket)
  114. super(ControlReplyConsumer, self).__init__(connection,
  115. queue=queue,
  116. routing_key=ticket,
  117. **kwargs)
  118. def collect(self, limit=None, timeout=1):
  119. responses = []
  120. def callback(message_data, message):
  121. responses.append(message_data)
  122. self.callbacks = [callback]
  123. self.consume()
  124. for i in limit and range(limit) or count():
  125. try:
  126. self.connection.drain_events(timeout=timeout)
  127. except socket.timeout:
  128. break
  129. return responses
  130. class ControlReplyPublisher(Publisher):
  131. exchange = "celerycrq"
  132. exchange_type = "direct"
  133. delivery_mode = "non-persistent"
  134. class BroadcastPublisher(Publisher):
  135. """Publish broadcast commands"""
  136. ReplyTo = ControlReplyConsumer
  137. exchange = conf.BROADCAST_EXCHANGE
  138. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  139. def send(self, type, arguments, destination=None, reply_ticket=None):
  140. """Send broadcast command."""
  141. arguments["command"] = type
  142. arguments["destination"] = destination
  143. if reply_ticket:
  144. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  145. "routing_key": reply_ticket}
  146. super(BroadcastPublisher, self).send({"control": arguments})
  147. class BroadcastConsumer(Consumer):
  148. """Consume broadcast commands"""
  149. queue = conf.BROADCAST_QUEUE
  150. exchange = conf.BROADCAST_EXCHANGE
  151. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  152. no_ack = True
  153. def __init__(self, *args, **kwargs):
  154. hostname = kwargs.pop("hostname", None) or socket.gethostname()
  155. self.queue = "%s_%s" % (self.queue, hostname)
  156. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  157. def establish_connection(hostname=None, userid=None, password=None,
  158. virtual_host=None, port=None, ssl=None, insist=None,
  159. connect_timeout=None, backend_cls=None):
  160. """Establish a connection to the message broker."""
  161. if insist is None:
  162. insist = conf.BROKER_INSIST
  163. if ssl is None:
  164. ssl = conf.BROKER_USE_SSL
  165. if connect_timeout is None:
  166. connect_timeout = conf.BROKER_CONNECTION_TIMEOUT
  167. return BrokerConnection(hostname or conf.BROKER_HOST,
  168. userid or conf.BROKER_USER,
  169. password or conf.BROKER_PASSWORD,
  170. virtual_host or conf.BROKER_VHOST,
  171. port or conf.BROKER_PORT,
  172. backend_cls=backend_cls or conf.BROKER_BACKEND,
  173. insist=insist, ssl=ssl,
  174. connect_timeout=connect_timeout)
  175. def with_connection(fun):
  176. """Decorator for providing default message broker connection for functions
  177. supporting the ``connection`` and ``connect_timeout`` keyword
  178. arguments."""
  179. @wraps(fun)
  180. def _inner(*args, **kwargs):
  181. connection = kwargs.get("connection")
  182. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  183. kwargs["connection"] = conn = connection or \
  184. establish_connection(connect_timeout=timeout)
  185. close_connection = not connection and conn.close or noop
  186. try:
  187. return fun(*args, **kwargs)
  188. finally:
  189. close_connection()
  190. return _inner
  191. def get_consumer_set(connection, queues=None, **options):
  192. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  193. configuration.
  194. Defaults to the queues in ``CELERY_QUEUES``.
  195. """
  196. queues = queues or conf.get_queues()
  197. cset = ConsumerSet(connection)
  198. for queue_name, queue_options in queues.items():
  199. queue_options = dict(queue_options)
  200. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  201. consumer = Consumer(connection, queue=queue_name,
  202. backend=cset.backend, **queue_options)
  203. cset.consumers.append(consumer)
  204. return cset
  205. @with_connection
  206. def reply(data, exchange, routing_key, connection=None, connect_timeout=None,
  207. **kwargs):
  208. pub = Publisher(connection, exchange=exchange,
  209. routing_key=routing_key, **kwargs)
  210. pub.send(data)