pidbox.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import socket
  2. import warnings
  3. from itertools import count
  4. from carrot.messaging import Consumer, Publisher
  5. from celery.app import app_or_default
  6. class ControlReplyConsumer(Consumer):
  7. exchange = "celerycrq"
  8. exchange_type = "direct"
  9. durable = False
  10. exclusive = False
  11. auto_delete = True
  12. no_ack = True
  13. def __init__(self, connection, ticket, **kwargs):
  14. self.ticket = ticket
  15. queue = "%s.%s" % (self.exchange, ticket)
  16. super(ControlReplyConsumer, self).__init__(connection,
  17. queue=queue,
  18. routing_key=ticket,
  19. **kwargs)
  20. def collect(self, limit=None, timeout=1, callback=None):
  21. responses = []
  22. def on_message(message_data, message):
  23. if callback:
  24. callback(message_data)
  25. responses.append(message_data)
  26. self.callbacks = [on_message]
  27. self.consume()
  28. for i in limit and range(limit) or count():
  29. try:
  30. self.connection.drain_events(timeout=timeout)
  31. except socket.timeout:
  32. break
  33. return responses
  34. class ControlReplyPublisher(Publisher):
  35. exchange = "celerycrq"
  36. exchange_type = "direct"
  37. delivery_mode = "non-persistent"
  38. durable = False
  39. auto_delete = True
  40. class BroadcastPublisher(Publisher):
  41. """Publish broadcast commands"""
  42. ReplyTo = ControlReplyConsumer
  43. def __init__(self, *args, **kwargs):
  44. app = self.app = app_or_default(kwargs.get("app"))
  45. kwargs["exchange"] = kwargs.get("exchange") or \
  46. app.conf.CELERY_BROADCAST_EXCHANGE
  47. kwargs["exchange_type"] = kwargs.get("exchange_type") or \
  48. app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
  49. super(BroadcastPublisher, self).__init__(*args, **kwargs)
  50. def send(self, type, arguments, destination=None, reply_ticket=None):
  51. """Send broadcast command."""
  52. arguments["command"] = type
  53. arguments["destination"] = destination
  54. reply_to = self.ReplyTo(self.connection, None, app=self.app,
  55. auto_declare=False)
  56. if reply_ticket:
  57. arguments["reply_to"] = {"exchange": reply_to.exchange,
  58. "routing_key": reply_ticket}
  59. super(BroadcastPublisher, self).send({"control": arguments})
  60. class BroadcastConsumer(Consumer):
  61. """Consume broadcast commands"""
  62. no_ack = True
  63. def __init__(self, *args, **kwargs):
  64. self.app = app = app_or_default(kwargs.get("app"))
  65. kwargs["queue"] = kwargs.get("queue") or \
  66. app.conf.CELERY_BROADCAST_QUEUE
  67. kwargs["exchange"] = kwargs.get("exchange") or \
  68. app.conf.CELERY_BROADCAST_EXCHANGE
  69. kwargs["exchange_type"] = kwargs.get("exchange_type") or \
  70. app.conf.CELERY_BROADCAST_EXCHANGE_TYPE
  71. self.hostname = kwargs.pop("hostname", None) or socket.gethostname()
  72. self.queue = "%s_%s" % (self.queue, self.hostname)
  73. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  74. def verify_exclusive(self):
  75. # XXX Kombu material
  76. channel = getattr(self.backend, "channel")
  77. if channel and hasattr(channel, "queue_declare"):
  78. try:
  79. _, _, consumers = channel.queue_declare(self.queue,
  80. passive=True)
  81. except ValueError:
  82. pass
  83. else:
  84. if consumers:
  85. warnings.warn(UserWarning(
  86. "A node named %s is already using this process "
  87. "mailbox. Maybe you should specify a custom name "
  88. "for this node with the -n argument?" % self.hostname))
  89. def consume(self, *args, **kwargs):
  90. self.verify_exclusive()
  91. return super(BroadcastConsumer, self).consume(*args, **kwargs)