pidbox.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import socket
  2. import warnings
  3. from itertools import count
  4. from kombu.entity import Exchange, Queue
  5. from kombu.messaging import Consumer, Producer
  6. from celery.app import app_or_default
  7. from celery.utils import gen_unique_id
  8. class Mailbox(object):
  9. def __init__(self, namespace, connection):
  10. self.namespace = namespace
  11. self.connection = connection
  12. self.exchange = Exchange("%s.pidbox" % (self.namespace, ),
  13. type="fanout",
  14. durable=False,
  15. auto_delete=True,
  16. delivery_mode="transient")
  17. self.reply_exchange = Exchange("reply.%s.pidbox" % (self.namespace, ),
  18. type="direct",
  19. durable=False,
  20. auto_delete=True,
  21. delivery_mode="transient")
  22. def publish_reply(self, reply, exchange, routing_key, channel=None):
  23. chan = channel or self.connection.channel()
  24. try:
  25. exchange = Exchange(exchange, exchange_type="direct",
  26. delivery_mode="transient",
  27. durable=False,
  28. auto_delete=True)
  29. producer = Producer(chan, exchange=exchange)
  30. producer.publish(reply, routing_key=routing_key)
  31. finally:
  32. channel or chan.close()
  33. def get_reply_queue(self, ticket):
  34. return Queue("%s.%s" % (ticket, self.reply_exchange.name),
  35. exchange=self.reply_exchange,
  36. routing_key=ticket,
  37. durable=False,
  38. auto_delete=True)
  39. def get_queue(self, hostname):
  40. return Queue("%s.%s.pidbox" % (hostname, self.namespace),
  41. exchange=self.exchange)
  42. def collect_reply(self, ticket, limit=None, timeout=1,
  43. callback=None, channel=None):
  44. chan = channel or self.connection.channel()
  45. queue = self.get_reply_queue(ticket)
  46. consumer = Consumer(channel, [queue], no_ack=True)
  47. responses = []
  48. def on_message(message_data, message):
  49. if callback:
  50. callback(message_data)
  51. responses.append(message_data)
  52. try:
  53. consumer.register_callback(on_message)
  54. consumer.consume()
  55. for i in limit and range(limit) or count():
  56. try:
  57. self.connection.drain_events(timeout=timeout)
  58. except socket.timeout:
  59. break
  60. return responses
  61. finally:
  62. channel or chan.close()
  63. def publish(self, type, arguments, destination=None, reply_ticket=None,
  64. channel=None):
  65. arguments["command"] = type
  66. arguments["destination"] = destination
  67. if reply_ticket:
  68. arguments["reply_to"] = {"exchange": self.reply_exchange.name,
  69. "routing_key": reply_ticket}
  70. chan = channel or self.connection.channel()
  71. producer = Producer(chan, exchange=self.exchange)
  72. try:
  73. producer.publish({"control": arguments})
  74. finally:
  75. channel or chan.close()
  76. def Node(self, hostname, channel=None):
  77. return Consumer(channel or self.connection.channel(),
  78. [self.get_queue(hostname)],
  79. no_ack=True)
  80. def call(self, destination, command, kwargs={}, timeout=None,
  81. callback=None, channel=None):
  82. return self._broadcast(command, kwargs, destination,
  83. reply=True, timeout=timeout,
  84. callback=callback,
  85. channel=channel)
  86. def cast(self, destination, command, kwargs={}):
  87. return self._broadcast(command, kwargs, destination, reply=False)
  88. def abcast(self, command, kwargs={}):
  89. return self._broadcast(command, kwargs, reply=False)
  90. def multi_call(self, command, kwargs={}, timeout=1,
  91. limit=None, callback=None, channel=None):
  92. return self._broadcast(command, kwargs, reply=True,
  93. timeout=timeout, limit=limit,
  94. callback=callback,
  95. channel=channel)
  96. def _broadcast(self, command, arguments=None, destination=None,
  97. reply=False, timeout=1, limit=None, callback=None, channel=None):
  98. arguments = arguments or {}
  99. reply_ticket = reply and gen_unique_id() or None
  100. if destination is not None and \
  101. not isinstance(destination, (list, tuple)):
  102. raise ValueError("destination must be a list/tuple not %s" % (
  103. type(destination)))
  104. # Set reply limit to number of destinations (if specificed)
  105. if limit is None and destination:
  106. limit = destination and len(destination) or None
  107. chan = channel or self.connection.channel()
  108. try:
  109. if reply_ticket:
  110. self.get_reply_queue(reply_ticket)(chan).declare()
  111. self.publish(command, arguments, destination=destination,
  112. reply_ticket=reply_ticket,
  113. channel=chan)
  114. if reply_ticket:
  115. return self.collect_reply(reply_ticket, limit=limit,
  116. timeout=timeout,
  117. callback=callback,
  118. channel=chan)
  119. finally:
  120. channel or chan.close()
  121. def mailbox(connection):
  122. return Mailbox("celeryd", connection)