listener.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import socket
  2. from datetime import datetime
  3. from dateutil.parser import parse as parse_iso8601
  4. from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
  5. from celery import conf
  6. from celery.utils import retry_over_time
  7. from celery.worker.job import TaskWrapper
  8. from celery.worker.revoke import revoked
  9. from celery.worker.heartbeat import Heart
  10. from celery.events import EventDispatcher
  11. from celery.messaging import get_consumer_set, BroadcastConsumer
  12. from celery.exceptions import NotRegistered
  13. from celery.datastructures import SharedCounter
  14. RUN = 0x0
  15. CLOSE = 0x1
  16. class CarrotListener(object):
  17. """Listen for messages received from the broker and
  18. move them the the ready queue for task processing.
  19. :param ready_queue: See :attr:`ready_queue`.
  20. :param eta_scheduler: See :attr:`eta_scheduler`.
  21. .. attribute:: ready_queue
  22. The queue that holds tasks ready for processing immediately.
  23. .. attribute:: eta_scheduler
  24. Scheduler for paused tasks. Reasons for being paused include
  25. a countdown/eta or that it's waiting for retry.
  26. .. attribute:: logger
  27. The logger used.
  28. """
  29. def __init__(self, ready_queue, eta_scheduler, logger,
  30. send_events=False, initial_prefetch_count=2):
  31. self.amqp_connection = None
  32. self.task_consumer = None
  33. self.ready_queue = ready_queue
  34. self.eta_scheduler = eta_scheduler
  35. self.send_events = send_events
  36. self.logger = logger
  37. self.prefetch_count = SharedCounter(initial_prefetch_count)
  38. self.event_dispatcher = None
  39. self.heart = None
  40. self._state = None
  41. def start(self):
  42. """Start the consumer.
  43. If the connection is lost, it tries to re-establish the connection
  44. over time and restart consuming messages.
  45. """
  46. while 1:
  47. self.reset_connection()
  48. try:
  49. self.consume_messages()
  50. except (socket.error, AMQPConnectionException, IOError):
  51. self.logger.error("CarrotListener: Connection to broker lost."
  52. + " Trying to re-establish connection...")
  53. def consume_messages(self):
  54. """Consume messages forever (or until an exception is raised)."""
  55. task_consumer = self.task_consumer
  56. self.logger.debug("CarrotListener: Starting message consumer...")
  57. wait_for_message = task_consumer.iterconsume(limit=None).next
  58. self.logger.debug("CarrotListener: Ready to accept tasks!")
  59. prev_pcount = None
  60. while 1:
  61. pcount = int(self.prefetch_count) # Convert SharedCounter to int
  62. if not prev_pcount or pcount != prev_pcount:
  63. task_consumer.qos(prefetch_count=pcount)
  64. prev_pcount = pcount
  65. wait_for_message()
  66. def on_control_command(self, command):
  67. if command["command"] == "revoke":
  68. revoke_uuid = command["task_id"]
  69. revoked.add(revoke_uuid)
  70. self.logger.warn("Task %s marked as revoked." % revoke_uuid)
  71. return
  72. def on_task(self, task, eta=None):
  73. """Handle received task.
  74. If the task has an ``eta`` we enter it into the ETA schedule,
  75. otherwise we move it the ready queue for immediate processing.
  76. """
  77. if task.task_id in revoked:
  78. self.logger.warn("Got revoked task from broker: %s[%s]" % (
  79. task.task_name, task.task_id))
  80. return task.on_ack()
  81. self.event_dispatcher.send("task-received", uuid=task.task_id,
  82. name=task.task_name, args=task.args, kwargs=task.kwargs,
  83. retries=task.retries, eta=eta)
  84. if eta:
  85. if not isinstance(eta, datetime):
  86. eta = parse_iso8601(eta)
  87. self.prefetch_count.increment()
  88. self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
  89. task.task_name, task.task_id, eta))
  90. self.eta_scheduler.enter(task, eta=eta,
  91. callback=self.prefetch_count.decrement)
  92. else:
  93. self.logger.info("Got task from broker: %s[%s]" % (
  94. task.task_name, task.task_id))
  95. self.ready_queue.put(task)
  96. def receive_message(self, message_data, message):
  97. """The callback called when a new message is received. """
  98. # Handle task
  99. if message_data.get("task"):
  100. try:
  101. task = TaskWrapper.from_message(message, message_data,
  102. logger=self.logger,
  103. eventer=self.event_dispatcher)
  104. except NotRegistered, exc:
  105. self.logger.error("Unknown task ignored: %s" % (exc))
  106. else:
  107. self.on_task(task, eta=message_data.get("eta"))
  108. return
  109. # Handle control command
  110. control = message_data.get("control")
  111. if control:
  112. self.on_control_command(control)
  113. return
  114. def close_connection(self):
  115. if not self._state == RUN:
  116. return
  117. self._state = CLOSE
  118. self.logger.debug("Heart: Going into cardiac arrest...")
  119. self.heart = self.heart and self.heart.stop()
  120. self.logger.debug("TaskConsumer: Shutting down...")
  121. self.task_consumer = self.task_consumer and self.task_consumer.close()
  122. self.logger.debug("EventDispatcher: Shutting down...")
  123. self.event_dispatcher = self.event_dispatcher and \
  124. self.event_dispatcher.close()
  125. self.logger.debug(
  126. "CarrotListener: Closing connection to broker...")
  127. self.amqp_connection = self.amqp_connection and \
  128. self.amqp_connection.close()
  129. def reset_connection(self):
  130. self.logger.debug(
  131. "CarrotListener: Re-establishing connection to the broker...")
  132. self.close_connection()
  133. self.amqp_connection = self._open_connection()
  134. self.logger.debug("CarrotListener: Connection Established.")
  135. self.task_consumer = get_consumer_set(connection=self.amqp_connection)
  136. self.broadcast_consumer = BroadcastConsumer(self.amqp_connection)
  137. self.task_consumer.add_consumer(self.broadcast_consumer)
  138. self.task_consumer.register_callback(self.receive_message)
  139. self.event_dispatcher = EventDispatcher(self.amqp_connection,
  140. enabled=self.send_events)
  141. self.heart = Heart(self.event_dispatcher)
  142. self.heart.start()
  143. self._state = RUN
  144. def _open_connection(self):
  145. """Retries connecting to the AMQP broker over time.
  146. See :func:`celery.utils.retry_over_time`.
  147. """
  148. def _connection_error_handler(exc, interval):
  149. """Callback handler for connection errors."""
  150. self.logger.error("AMQP Listener: Connection Error: %s. " % exc
  151. + "Trying again in %d seconds..." % interval)
  152. def _establish_connection():
  153. """Establish a connection to the AMQP broker."""
  154. conn = DjangoBrokerConnection()
  155. connected = conn.connection # Connection is established lazily.
  156. return conn
  157. if not conf.AMQP_CONNECTION_RETRY:
  158. return _establish_connection()
  159. conn = retry_over_time(_establish_connection, (socket.error, IOError),
  160. errback=_connection_error_handler,
  161. max_retries=conf.AMQP_CONNECTION_MAX_RETRIES)
  162. return conn
  163. def stop(self):
  164. self.close_connection()