messaging.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. """
  2. Sending and Receiving Messages
  3. """
  4. import socket
  5. from datetime import datetime, timedelta
  6. from carrot.connection import DjangoBrokerConnection
  7. from carrot.messaging import Publisher, Consumer, ConsumerSet
  8. from billiard.utils.functional import wraps
  9. from celery import conf
  10. from celery import signals
  11. from celery.utils import gen_unique_id, mitemgetter, noop
  12. MSG_OPTIONS = ("mandatory", "priority",
  13. "immediate", "routing_key",
  14. "serializer")
  15. get_msg_options = mitemgetter(*MSG_OPTIONS)
  16. extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
  17. default_queue = conf.routing_table[conf.DEFAULT_QUEUE]
  18. _queues_declared = False
  19. class TaskPublisher(Publisher):
  20. """Publish tasks."""
  21. exchange = default_queue["exchange"]
  22. exchange_type = default_queue["exchange_type"]
  23. routing_key = conf.DEFAULT_ROUTING_KEY
  24. serializer = conf.TASK_SERIALIZER
  25. def __init__(self, *args, **kwargs):
  26. super(TaskPublisher, self).__init__(*args, **kwargs)
  27. # Make sure all queues are declared.
  28. global _queues_declared
  29. if not _queues_declared:
  30. consumers = get_consumer_set(self.connection)
  31. consumers.close()
  32. _queues_declared = True
  33. def delay_task(self, task_name, task_args=None, task_kwargs=None,
  34. countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
  35. """Delay task for execution by the celery nodes."""
  36. task_id = task_id or gen_unique_id()
  37. if countdown: # Convert countdown to ETA.
  38. eta = datetime.now() + timedelta(seconds=countdown)
  39. message_data = {
  40. "task": task_name,
  41. "id": task_id,
  42. "args": task_args or [],
  43. "kwargs": task_kwargs or {},
  44. "retries": kwargs.get("retries", 0),
  45. "eta": eta and eta.isoformat(),
  46. }
  47. if taskset_id:
  48. message_data["taskset"] = taskset_id
  49. self.send(message_data, **extract_msg_options(kwargs))
  50. signals.task_sent.send(sender=task_name, **message_data)
  51. return task_id
  52. class TaskConsumer(Consumer):
  53. """Consume tasks"""
  54. queue = conf.DEFAULT_QUEUE
  55. exchange = default_queue["exchange"]
  56. routing_key = default_queue["binding_key"]
  57. exchange_type = default_queue["exchange_type"]
  58. class EventPublisher(Publisher):
  59. """Publish events"""
  60. exchange = conf.EVENT_EXCHANGE
  61. exchange_type = conf.EVENT_EXCHANGE_TYPE
  62. routing_key = conf.EVENT_ROUTING_KEY
  63. class EventConsumer(Consumer):
  64. """Consume events"""
  65. queue = conf.EVENT_QUEUE
  66. exchange = conf.EVENT_EXCHANGE
  67. exchange_type = conf.EVENT_EXCHANGE_TYPE
  68. routing_key = conf.EVENT_ROUTING_KEY
  69. no_ack = True
  70. class BroadcastPublisher(Publisher):
  71. """Publish broadcast commands"""
  72. exchange = conf.BROADCAST_EXCHANGE
  73. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  74. def send(self, type, arguments, destination=None):
  75. """Send broadcast command."""
  76. arguments["command"] = type
  77. arguments["destination"] = destination
  78. super(BroadcastPublisher, self).send({"control": arguments})
  79. class BroadcastConsumer(Consumer):
  80. """Consume broadcast commands"""
  81. queue = conf.BROADCAST_QUEUE
  82. exchange = conf.BROADCAST_EXCHANGE
  83. exchange_type = conf.BROADCAST_EXCHANGE_TYPE
  84. no_ack = True
  85. def __init__(self, *args, **kwargs):
  86. hostname = kwargs.pop("hostname", None) or socket.gethostname()
  87. self.queue = "%s_%s" % (self.queue, hostname)
  88. super(BroadcastConsumer, self).__init__(*args, **kwargs)
  89. def establish_connection(connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  90. """Establish a connection to the message broker."""
  91. return DjangoBrokerConnection(connect_timeout=connect_timeout)
  92. def with_connection(fun):
  93. """Decorator for providing default message broker connection for functions
  94. supporting the ``connection`` and ``connect_timeout`` keyword
  95. arguments."""
  96. @wraps(fun)
  97. def _inner(*args, **kwargs):
  98. connection = kwargs.get("connection")
  99. timeout = kwargs.get("connect_timeout", conf.BROKER_CONNECTION_TIMEOUT)
  100. kwargs["connection"] = conn = connection or \
  101. establish_connection(connect_timeout=timeout)
  102. close_connection = not connection and conn.close or noop
  103. try:
  104. return fun(*args, **kwargs)
  105. finally:
  106. close_connection()
  107. return _inner
  108. def get_consumer_set(connection, queues=None, **options):
  109. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  110. configuration.
  111. Defaults to the queues in ``CELERY_QUEUES``.
  112. """
  113. queues = queues or conf.routing_table
  114. cset = ConsumerSet(connection)
  115. for queue_name, queue_options in queues.items():
  116. queue_options = dict(queue_options)
  117. queue_options["routing_key"] = queue_options.pop("binding_key", None)
  118. consumer = Consumer(connection, queue=queue_name,
  119. backend=cset.backend, **queue_options)
  120. cset.consumers.append(consumer)
  121. return cset