listener.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import socket
  2. import warnings
  3. from datetime import datetime
  4. from dateutil.parser import parse as parse_iso8601
  5. from celery import conf
  6. from celery import signals
  7. from celery.utils import retry_over_time
  8. from celery.worker.job import TaskWrapper
  9. from celery.worker.revoke import revoked
  10. from celery.worker.control import ControlDispatch
  11. from celery.worker.heartbeat import Heart
  12. from celery.events import EventDispatcher
  13. from celery.messaging import establish_connection, AMQPConnectionException
  14. from celery.messaging import get_consumer_set, BroadcastConsumer
  15. from celery.exceptions import NotRegistered
  16. from celery.datastructures import SharedCounter
  17. RUN = 0x0
  18. CLOSE = 0x1
  19. class CarrotListener(object):
  20. """Listen for messages received from the broker and
  21. move them the the ready queue for task processing.
  22. :param ready_queue: See :attr:`ready_queue`.
  23. :param eta_schedule: See :attr:`eta_schedule`.
  24. .. attribute:: ready_queue
  25. The queue that holds tasks ready for processing immediately.
  26. .. attribute:: eta_schedule
  27. Scheduler for paused tasks. Reasons for being paused include
  28. a countdown/eta or that it's waiting for retry.
  29. .. attribute:: logger
  30. The logger used.
  31. """
  32. def __init__(self, ready_queue, eta_schedule, logger,
  33. send_events=False, initial_prefetch_count=2):
  34. self.connection = None
  35. self.task_consumer = None
  36. self.ready_queue = ready_queue
  37. self.eta_schedule = eta_schedule
  38. self.send_events = send_events
  39. self.logger = logger
  40. self.hostname = socket.gethostname()
  41. self.control_dispatch = ControlDispatch(logger=logger,
  42. hostname=self.hostname)
  43. self.prefetch_count = SharedCounter(initial_prefetch_count)
  44. self.event_dispatcher = None
  45. self.heart = None
  46. self._state = None
  47. def start(self):
  48. """Start the consumer.
  49. If the connection is lost, it tries to re-establish the connection
  50. over time and restart consuming messages.
  51. """
  52. signals.worker_ready.send(sender=self)
  53. while 1:
  54. self.reset_connection()
  55. try:
  56. self.consume_messages()
  57. except (socket.error, AMQPConnectionException, IOError):
  58. self.logger.error("CarrotListener: Connection to broker lost."
  59. + " Trying to re-establish connection...")
  60. def consume_messages(self):
  61. """Consume messages forever (or until an exception is raised)."""
  62. task_consumer = self.task_consumer
  63. self.logger.debug("CarrotListener: Starting message consumer...")
  64. wait_for_message = self._detect_wait_method()(limit=None).next
  65. self.logger.debug("CarrotListener: Ready to accept tasks!")
  66. prev_pcount = None
  67. while 1:
  68. pcount = int(self.prefetch_count) # Convert SharedCounter to int
  69. if not prev_pcount or pcount != prev_pcount:
  70. task_consumer.qos(prefetch_count=pcount)
  71. prev_pcount = pcount
  72. wait_for_message()
  73. def on_task(self, task, eta=None):
  74. """Handle received task.
  75. If the task has an ``eta`` we enter it into the ETA schedule,
  76. otherwise we move it the ready queue for immediate processing.
  77. """
  78. if task.task_id in revoked:
  79. self.logger.warn("Got revoked task from broker: %s[%s]" % (
  80. task.task_name, task.task_id))
  81. return task.on_ack()
  82. self.event_dispatcher.send("task-received", uuid=task.task_id,
  83. name=task.task_name, args=task.args, kwargs=task.kwargs,
  84. retries=task.retries, eta=eta)
  85. if eta:
  86. if not isinstance(eta, datetime):
  87. eta = parse_iso8601(eta)
  88. self.prefetch_count.increment()
  89. self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
  90. task.task_name, task.task_id, eta))
  91. self.eta_schedule.enter(task, eta=eta,
  92. callback=self.prefetch_count.decrement)
  93. else:
  94. self.logger.info("Got task from broker: %s[%s]" % (
  95. task.task_name, task.task_id))
  96. self.ready_queue.put(task)
  97. def receive_message(self, message_data, message):
  98. """The callback called when a new message is received. """
  99. # Handle task
  100. if message_data.get("task"):
  101. try:
  102. task = TaskWrapper.from_message(message, message_data,
  103. logger=self.logger,
  104. eventer=self.event_dispatcher)
  105. except NotRegistered, exc:
  106. self.logger.error("Unknown task ignored: %s" % (exc))
  107. message.ack()
  108. else:
  109. self.on_task(task, eta=message_data.get("eta"))
  110. return
  111. # Handle control command
  112. control = message_data.get("control")
  113. if control:
  114. self.control_dispatch.dispatch_from_message(control)
  115. return
  116. warnings.warn(RuntimeWarning(
  117. "Received and deleted unknown message. Wrong destination?!? \
  118. the message was: %s" % message_data))
  119. message.ack()
  120. def close_connection(self):
  121. if not self._state == RUN:
  122. return
  123. self._state = CLOSE
  124. if self.heart:
  125. self.logger.debug("Heart: Going into cardiac arrest...")
  126. self.heart = self.heart.stop()
  127. self.logger.debug("TaskConsumer: Shutting down...")
  128. self.task_consumer = self.task_consumer and self.task_consumer.close()
  129. if self.event_dispatcher:
  130. self.logger.debug("EventDispatcher: Shutting down...")
  131. self.event_dispatcher = self.event_dispatcher.close()
  132. self.logger.debug("CarrotListener: "
  133. "Closing connection to broker...")
  134. self.connection = self.connection and self.connection.close()
  135. def reset_connection(self):
  136. self.logger.debug(
  137. "CarrotListener: Re-establishing connection to the broker...")
  138. self.close_connection()
  139. self.connection = self._open_connection()
  140. self.logger.debug("CarrotListener: Connection Established.")
  141. self.task_consumer = get_consumer_set(connection=self.connection)
  142. self.broadcast_consumer = BroadcastConsumer(self.connection)
  143. self.task_consumer.register_callback(self.receive_message)
  144. self.event_dispatcher = EventDispatcher(self.connection,
  145. enabled=self.send_events)
  146. self.heart = Heart(self.event_dispatcher)
  147. self.heart.start()
  148. self._state = RUN
  149. def _mainloop(self, **kwargs):
  150. while 1:
  151. yield self.connection.connection.drain_events()
  152. def _detect_wait_method(self):
  153. if hasattr(self.connection.connection, "drain_events"):
  154. self.broadcast_consumer.register_callback(self.receive_message)
  155. self.task_consumer.iterconsume()
  156. self.broadcast_consumer.iterconsume()
  157. return self._mainloop
  158. else:
  159. self.task_consumer.add_consumer(self.broadcast_consumer)
  160. return self.task_consumer.iterconsume
  161. def _open_connection(self):
  162. """Retries connecting to the AMQP broker over time.
  163. See :func:`celery.utils.retry_over_time`.
  164. """
  165. def _connection_error_handler(exc, interval):
  166. """Callback handler for connection errors."""
  167. self.logger.error("AMQP Listener: Connection Error: %s. " % exc
  168. + "Trying again in %d seconds..." % interval)
  169. def _establish_connection():
  170. """Establish a connection to the AMQP broker."""
  171. conn = establish_connection()
  172. conn.connection # Connection is established lazily, so connect.
  173. return conn
  174. if not conf.BROKER_CONNECTION_RETRY:
  175. return _establish_connection()
  176. conn = retry_over_time(_establish_connection, (socket.error, IOError),
  177. errback=_connection_error_handler,
  178. max_retries=conf.BROKER_CONNECTION_MAX_RETRIES)
  179. return conn
  180. def stop(self):
  181. self.close_connection()