messaging.py 10 KB


  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. import warnings
  6. from datetime import datetime, timedelta
  7. from itertools import count
  8. from carrot.connection import BrokerConnection
  9. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  10. from celery import conf
  11. from celery import signals
  12. from celery.utils import gen_unique_id, mitemgetter, noop
  13. from celery.utils.functional import wraps
  14. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  15. "routing_key", "serializer", "delivery_mode")
  16. get_msg_options = mitemgetter(*MSG_OPTIONS)
  17. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  18. default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
  19. _queues_declared = False
  20. _exchanges_declared = set()
  21. class TaskPublisher(Publisher):
  22. """Publish tasks."""
  23. exchange = default_queue["exchange"]
  24. exchange_type = default_queue["exchange_type"]
  25. routing_key = conf.DEFAULT_ROUTING_KEY
  26. serializer = conf.TASK_SERIALIZER
  27. auto_declare = False
  28. def __init__(self, *args, **kwargs):
  29. super(TaskPublisher, self).__init__(*args, **kwargs)
  30. # Make sure all queues are declared.
  31. global _queues_declared
  32. if not _queues_declared:
  33. consumers = get_consumer_set(self.connection)
  34. consumers.close()
  35. _queues_declared = True
  36. self.declare()
  37. def declare(self):
  38. if self.exchange and self.exchange not in _exchanges_declared:
  39. super(TaskPublisher, self).declare()
  40. _exchanges_declared.add(self.exchange)
  41. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  42. countdown=None, eta=None, task_id=None, taskset_id=None,
  43. exchange=None, exchange_type=None, expires=None, **kwargs):
  44. """Delay task for execution by the celery nodes."""
  45. task_id = task_id or gen_unique_id()
  46. task_args = task_args or []
  47. task_kwargs = task_kwargs or {}
  48. now = None
  49. if countdown: # convert countdown to ETA.
  50. now = datetime.now()
  51. eta = now + timedelta(seconds=countdown)
  52. if not isinstance(task_args, (list, tuple)):
  53. raise ValueError("task args must be a list or tuple")
  54. if not isinstance(task_kwargs, dict):
  55. raise ValueError("task kwargs must be a dictionary")
  56. if isinstance(expires, int):
  57. now = now or datetime.now()
  58. expires = now + timedelta(seconds=expires)
  59. message_data = {
  60. "task": task_name,
  61. "id": task_id,
  62. "args": task_args or [],
  63. "kwargs": task_kwargs or {},
  64. "retries": kwargs.get("retries", 0),
  65. "eta": eta and eta.isoformat(),
  66. "expires": expires and expires.isoformat(),
  67. }
  68. if taskset_id:
  69. message_data["taskset"] = taskset_id
  70. # custom exchange passed, need to declare it
  71. if exchange and exchange not in _exchanges_declared:
  72. exchange_type = exchange_type or self.exchange_type
  73. self.backend.exchange_declare(exchange=exchange,
  74. type=exchange_type,
  75. durable=self.durable,
  76. auto_delete=self.auto_delete)
  77. self.send(message_data, exchange=exchange,
  78. **extract_msg_options(kwargs))
  79. signals.task_sent.send(sender=task_name, **message_data)
  80. return task_id
  81. class ConsumerSet(_ConsumerSet):
  82. """ConsumerSet with an optional decode error callback.
  83. For more information see :class:`carrot.messaging.ConsumerSet`.
  84. .. attribute:: on_decode_error
  85. Callback called if a message had decoding errors.
  86. The callback is called with the signature::
  87. callback(message, exception)
  88. """
  89. on_decode_error = None
  90. def _receive_callback(self, raw_message):
  91. message = self.backend.message_to_python(raw_message)
  92. if self.auto_ack and not message.acknowledged:
  93. message.ack()
  94. try:
  95. decoded = message.decode()
  96. except Exception, exc:
  97. if self.on_decode_error:
  98. return self.on_decode_error(message, exc)
  99. else:
  100. raise
  101. self.receive(decoded, message)
  102. class TaskConsumer(Consumer):
  103. """Consume tasks"""
  104. queue = conf.DEFAULT_QUEUE
  105. exchange = default_queue["exchange"]
  106. routing_key = default_queue["binding_key"]
  107. exchange_type = default_queue["exchange_type"]
  108. class EventPublisher(Publisher):
  109. """Publish events"""
  110. exchange = conf.EVENT_EXCHANGE
  111. exchange_type = conf.EVENT_EXCHANGE_TYPE
  112. routing_key = conf.EVENT_ROUTING_KEY
  113. serializer = conf.EVENT_SERIALIZER
  114. class EventConsumer(Consumer):
  115. """Consume events"""
  116. queue = conf.EVENT_QUEUE
  117. exchange = conf.EVENT_EXCHANGE
  118. exchange_type = conf.EVENT_EXCHANGE_TYPE
  119. routing_key = conf.EVENT_ROUTING_KEY
  120. no_ack = True
  121. class ControlReplyConsumer(Consumer):
  122. exchange = "celerycrq"
  123. exchange_type = "direct"
  124. durable = False
  125. exclusive = False
  126. auto_delete = True
  127. no_ack = True
  128. def __init__(self, connection, ticket, **kwargs):
  129. self.ticket = ticket
  130. queue = "%s.%s" % (self.exchange, ticket)
  131. super(ControlReplyConsumer, self).__init__(connection,
  132. queue=queue,
  133. routing_key=ticket,
  134. **kwargs)
  135. def collect(self, limit=None, timeout=1, callback=None):
  136. responses = []
  137. def on_message(message_data, message):
  138. if callback:
  139. callback(message_data)
  140. responses.append(message_data)
  141. self.callbacks = [on_message]
  142. self.consume()
  143. for i in limit and range(limit) or count():
  144. try:
  145. self.connection.drain_events(timeout=timeout)
  146. except socket.timeout:
  147. break
  148. return responses
  149. class ControlReplyPublisher(Publisher):
  150. exchange = "celerycrq"
  151. exchange_type = "direct"
  152. delivery_mode = "non-persistent"
  153. durable = False
  154. auto_delete = True
  155. class BroadcastPublisher(Publisher):
  156. """Publish broadcast commands"""
  157. ReplyTo = ControlReplyConsumer
  158. exchange = conf.BROADCAST_EXCHANGE
  159. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  160. def send(self, type, arguments, destination=None, reply_ticket=None):
  161. """Send broadcast command."""
  162. arguments["command"] = type
  163. arguments["destination"] = destination
  164. if reply_ticket:
  165. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  166. "routing_key": reply_ticket}
  167. super(BroadcastPublisher, self).send({"control": arguments})
  168. class BroadcastConsumer(Consumer):
  169. """Consume broadcast commands"""
  170. queue = conf.BROADCAST_QUEUE
  171. exchange = conf.BROADCAST_EXCHANGE
  172. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  173. no_ack = True
  174. def __init__(self, *args, **kwargs):
  175. self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
  176. self.queue = "%s_%s" % (self.queue, self.hostname)
  177. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  178. def verify_exclusive(self):
  179. # XXX Kombu material
  180. channel = getattr(self.backend, "channel")
  181. if channel and hasattr(channel, "queue_declare"):
  182. try:
  183. _, _, consumers = channel.queue_declare(self.queue,
  184. passive=True)
  185. except ValueError:
  186. pass
  187. else:
  188. if consumers:
  189. warnings.warn(UserWarning(
  190. "A node named %s is already using this process "
  191. "mailbox. Maybe you should specify a custom name "
  192. "for this node with the -n argument?" % self.hostname))
  193. def consume(self, *args, **kwargs):
  194. self.verify_exclusive()
  195. return super(BroadcastConsumer, self).consume(*args, **kwargs)
  196. def establish_connection(hostname=None, userid=None, password=None,
  197. virtual_host=None, port=None, ssl=None, insist=None,
  198. connect_timeout=None, backend_cls=None, defaults=conf):
  199. """Establish a connection to the message broker."""
  200. if insist is None:
  201. insist = defaults.BROKER_INSIST
  202. if ssl is None:
  203. ssl = defaults.BROKER_USE_SSL
  204. if connect_timeout is None:
  205. connect_timeout = defaults.BROKER_CONNECTION_TIMEOUT
  206. return BrokerConnection(hostname or defaults.BROKER_HOST,
  207. userid or defaults.BROKER_USER,
  208. password or defaults.BROKER_PASSWORD,
  209. virtual_host or defaults.BROKER_VHOST,
  210. port or defaults.BROKER_PORT,
  211. backend_cls=backend_cls or defaults.BROKER_BACKEND,
  212. insist=insist, ssl=ssl,
  213. connect_timeout=connect_timeout)
  214. def with_connection(fun):
  215. """Decorator for providing default message broker connection for functions
  216. supporting the ``connection`` and ``connect_timeout`` keyword
  217. arguments."""
  218. @wraps(fun)
  219. def _inner(*args, **kwargs):
  220. connection = kwargs.get("connection")
  221. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  222. kwargs["connection"] = conn = connection or \
  223. establish_connection(connect_timeout=timeout)
  224. close_connection = not connection and conn.close or noop
  225. try:
  226. return fun(*args, **kwargs)
  227. finally:
  228. close_connection()
  229. return _inner
  230. def get_consumer_set(connection, queues=None, **options):
  231. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  232. configuration.
  233. Defaults to the queues in ``CELERY_QUEUES``.
  234. """
  235. queues = queues or conf.get_queues()
  236. cset = ConsumerSet(connection)
  237. for queue_name, queue_options in queues.items():
  238. queue_options = dict(queue_options)
  239. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  240. consumer = Consumer(connection, queue=queue_name,
  241. backend=cset.backend, **queue_options)
  242. cset.consumers.append(consumer)
  243. return cset