messaging.py 10 KB


  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. import warnings
  6. from datetime import datetime, timedelta
  7. from itertools import count
  8. from carrot.connection import BrokerConnection
  9. from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
  10. from celery import conf
  11. from celery import signals
  12. from celery.utils import gen_unique_id, mitemgetter, noop
  13. from celery.utils.functional import wraps
  14. MSG_OPTIONS = ("mandatory", "priority", "immediate",
  15. "routing_key", "serializer", "delivery_mode")
  16. get_msg_options = mitemgetter(*MSG_OPTIONS)
  17. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  18. default_queue = conf.get_queues()[conf.DEFAULT_QUEUE]
  19. _queues_declared = False
  20. _exchanges_declared = set()
  21. class TaskPublisher(Publisher):
  22. """Publish tasks."""
  23. exchange = default_queue["exchange"]
  24. exchange_type = default_queue["exchange_type"]
  25. routing_key = conf.DEFAULT_ROUTING_KEY
  26. serializer = conf.TASK_SERIALIZER
  27. auto_declare = False
  28. def __init__(self, *args, **kwargs):
  29. super(TaskPublisher, self).__init__(*args, **kwargs)
  30. # Make sure all queues are declared.
  31. global _queues_declared
  32. if not _queues_declared:
  33. consumers = get_consumer_set(self.connection)
  34. consumers.close()
  35. _queues_declared = True
  36. self.declare()
  37. def declare(self):
  38. if self.exchange and self.exchange not in _exchanges_declared:
  39. super(TaskPublisher, self).declare()
  40. _exchanges_declared.add(self.exchange)
  41. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  42. countdown=None, eta=None, task_id=None, taskset_id=None,
  43. exchange=None, exchange_type=None, expires=None, **kwargs):
  44. """Delay task for execution by the celery nodes."""
  45. task_id = task_id or gen_unique_id()
  46. task_args = task_args or []
  47. task_kwargs = task_kwargs or {}
  48. now = None
  49. if countdown: # convert countdown to ETA.
  50. now = datetime.now()
  51. eta = now + timedelta(seconds=countdown)
  52. if not isinstance(task_args, (list, tuple)):
  53. raise ValueError("task args must be a list or tuple")
  54. if not isinstance(task_kwargs, dict):
  55. raise ValueError("task kwargs must be a dictionary")
  56. if isinstance(expires, int):
  57. now = now or datetime.now()
  58. expires = now + timedelta(seconds=expires)
  59. message_data = {
  60. "task": task_name,
  61. "id": task_id,
  62. "args": task_args or [],
  63. "kwargs": task_kwargs or {},
  64. "retries": kwargs.get("retries", 0),
  65. "eta": eta and eta.isoformat(),
  66. "expires": expires and expires.isoformat(),
  67. }
  68. if taskset_id:
  69. message_data["taskset"] = taskset_id
  70. # custom exchange passed, need to declare it
  71. if exchange and exchange not in _exchanges_declared:
  72. exchange_type = exchange_type or self.exchange_type
  73. self.backend.exchange_declare(exchange=exchange,
  74. type=exchange_type,
  75. durable=self.durable,
  76. auto_delete=self.auto_delete)
  77. self.send(message_data, exchange=exchange,
  78. **extract_msg_options(kwargs))
  79. signals.task_sent.send(sender=task_name, **message_data)
  80. return task_id
  81. class ConsumerSet(_ConsumerSet):
  82. """ConsumerSet with an optional decode error callback.
  83. For more information see :class:`carrot.messaging.ConsumerSet`.
  84. .. attribute:: on_decode_error
  85. Callback called if a message had decoding errors.
  86. The callback is called with the signature::
  87. callback(message, exception)
  88. """
  89. on_decode_error = None
  90. def _receive_callback(self, raw_message):
  91. message = self.backend.message_to_python(raw_message)
  92. if self.auto_ack and not message.acknowledged:
  93. message.ack()
  94. try:
  95. decoded = message.decode()
  96. except Exception, exc:
  97. if self.on_decode_error:
  98. return self.on_decode_error(message, exc)
  99. else:
  100. raise
  101. self.receive(decoded, message)
  102. class TaskConsumer(Consumer):
  103. """Consume tasks"""
  104. queue = conf.DEFAULT_QUEUE
  105. exchange = default_queue["exchange"]
  106. routing_key = default_queue["binding_key"]
  107. exchange_type = default_queue["exchange_type"]
  108. class EventPublisher(Publisher):
  109. """Publish events"""
  110. exchange = conf.EVENT_EXCHANGE
  111. exchange_type = conf.EVENT_EXCHANGE_TYPE
  112. routing_key = conf.EVENT_ROUTING_KEY
  113. serializer = conf.EVENT_SERIALIZER
  114. auto_delete = not conf.EVENT_PERSISTENT
  115. delivery_mode = conf.EVENT_PERSISTENT and 2 or 1
  116. durable = conf.EVENT_PERSISTENT
  117. class EventConsumer(Consumer):
  118. """Consume events"""
  119. queue = conf.EVENT_QUEUE
  120. exchange = conf.EVENT_EXCHANGE
  121. exchange_type = conf.EVENT_EXCHANGE_TYPE
  122. routing_key = conf.EVENT_ROUTING_KEY
  123. auto_delete = not conf.EVENT_PERSISTENT
  124. durable = conf.EVENT_PERSISTENT
  125. no_ack = True
  126. class ControlReplyConsumer(Consumer):
  127. exchange = "celerycrq"
  128. exchange_type = "direct"
  129. durable = False
  130. exclusive = False
  131. auto_delete = True
  132. no_ack = True
  133. def __init__(self, connection, ticket, **kwargs):
  134. self.ticket = ticket
  135. queue = "%s.%s" % (self.exchange, ticket)
  136. super(ControlReplyConsumer, self).__init__(connection,
  137. queue=queue,
  138. routing_key=ticket,
  139. **kwargs)
  140. def collect(self, limit=None, timeout=1, callback=None):
  141. responses = []
  142. def on_message(message_data, message):
  143. if callback:
  144. callback(message_data)
  145. responses.append(message_data)
  146. self.callbacks = [on_message]
  147. self.consume()
  148. for i in limit and range(limit) or count():
  149. try:
  150. self.connection.drain_events(timeout=timeout)
  151. except socket.timeout:
  152. break
  153. return responses
  154. class ControlReplyPublisher(Publisher):
  155. exchange = "celerycrq"
  156. exchange_type = "direct"
  157. delivery_mode = "non-persistent"
  158. durable = False
  159. auto_delete = True
  160. class BroadcastPublisher(Publisher):
  161. """Publish broadcast commands"""
  162. ReplyTo = ControlReplyConsumer
  163. exchange = conf.BROADCAST_EXCHANGE
  164. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  165. def send(self, type, arguments, destination=None, reply_ticket=None):
  166. """Send broadcast command."""
  167. arguments["command"] = type
  168. arguments["destination"] = destination
  169. if reply_ticket:
  170. arguments["reply_to"] = {"exchange": self.ReplyTo.exchange,
  171. "routing_key": reply_ticket}
  172. super(BroadcastPublisher, self).send({"control": arguments})
  173. class BroadcastConsumer(Consumer):
  174. """Consume broadcast commands"""
  175. queue = conf.BROADCAST_QUEUE
  176. exchange = conf.BROADCAST_EXCHANGE
  177. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  178. no_ack = True
  179. def __init__(self, *args, **kwargs):
  180. self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
  181. self.queue = "%s_%s" % (self.queue, self.hostname)
  182. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  183. def verify_exclusive(self):
  184. # XXX Kombu material
  185. channel = getattr(self.backend, "channel")
  186. if channel and hasattr(channel, "queue_declare"):
  187. try:
  188. _, _, consumers = channel.queue_declare(self.queue,
  189. passive=True)
  190. except ValueError:
  191. pass
  192. else:
  193. if consumers:
  194. warnings.warn(UserWarning(
  195. "A node named %s is already using this process "
  196. "mailbox. Maybe you should specify a custom name "
  197. "for this node with the -n argument?" % self.hostname))
  198. def consume(self, *args, **kwargs):
  199. self.verify_exclusive()
  200. return super(BroadcastConsumer, self).consume(*args, **kwargs)
  201. def establish_connection(hostname=None, userid=None, password=None,
  202. virtual_host=None, port=None, ssl=None, insist=None,
  203. connect_timeout=None, backend_cls=None, defaults=conf):
  204. """Establish a connection to the message broker."""
  205. if insist is None:
  206. insist = defaults.BROKER_INSIST
  207. if ssl is None:
  208. ssl = defaults.BROKER_USE_SSL
  209. if connect_timeout is None:
  210. connect_timeout = defaults.BROKER_CONNECTION_TIMEOUT
  211. return BrokerConnection(hostname or defaults.BROKER_HOST,
  212. userid or defaults.BROKER_USER,
  213. password or defaults.BROKER_PASSWORD,
  214. virtual_host or defaults.BROKER_VHOST,
  215. port or defaults.BROKER_PORT,
  216. backend_cls=backend_cls or defaults.BROKER_BACKEND,
  217. insist=insist, ssl=ssl,
  218. connect_timeout=connect_timeout)
  219. def with_connection(fun):
  220. """Decorator for providing default message broker connection for functions
  221. supporting the ``connection`` and ``connect_timeout`` keyword
  222. arguments."""
  223. @wraps(fun)
  224. def _inner(*args, **kwargs):
  225. connection = kwargs.get("connection")
  226. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  227. kwargs["connection"] = conn = connection or \
  228. establish_connection(connect_timeout=timeout)
  229. close_connection = not connection and conn.close or noop
  230. try:
  231. return fun(*args, **kwargs)
  232. finally:
  233. close_connection()
  234. return _inner
  235. def get_consumer_set(connection, queues=None, **options):
  236. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  237. configuration.
  238. Defaults to the queues in :const:`CELERY_QUEUES`.
  239. """
  240. queues = queues or conf.get_queues()
  241. cset = ConsumerSet(connection)
  242. for queue_name, queue_options in queues.items():
  243. queue_options = dict(queue_options)
  244. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  245. consumer = Consumer(connection, queue=queue_name,
  246. backend=cset.backend, **queue_options)
  247. cset.consumers.append(consumer)
  248. return cset