listener.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. from __future__ import generators
  2. import socket
  3. import warnings
  4. from datetime import datetime
  5. from dateutil.parser import parse as parse_iso8601
  6. from carrot.connection import AMQPConnectionException
  7. from celery import conf
  8. from celery.utils import noop, retry_over_time
  9. from celery.worker.job import TaskWrapper, InvalidTaskError
  10. from celery.worker.revoke import revoked
  11. from celery.worker.control import ControlDispatch
  12. from celery.worker.heartbeat import Heart
  13. from celery.events import EventDispatcher
  14. from celery.messaging import establish_connection
  15. from celery.messaging import get_consumer_set, BroadcastConsumer
  16. from celery.exceptions import NotRegistered
  17. from celery.datastructures import SharedCounter
  18. RUN = 0x0
  19. CLOSE = 0x1
  20. class CarrotListener(object):
  21. """Listen for messages received from the broker and
  22. move them the the ready queue for task processing.
  23. :param ready_queue: See :attr:`ready_queue`.
  24. :param eta_schedule: See :attr:`eta_schedule`.
  25. .. attribute:: ready_queue
  26. The queue that holds tasks ready for processing immediately.
  27. .. attribute:: eta_schedule
  28. Scheduler for paused tasks. Reasons for being paused include
  29. a countdown/eta or that it's waiting for retry.
  30. .. attribute:: logger
  31. The logger used.
  32. """
  33. def __init__(self, ready_queue, eta_schedule, logger,
  34. init_callback=noop, send_events=False, hostname=None,
  35. initial_prefetch_count=2):
  36. self.connection = None
  37. self.task_consumer = None
  38. self.ready_queue = ready_queue
  39. self.eta_schedule = eta_schedule
  40. self.send_events = send_events
  41. self.init_callback = init_callback
  42. self.logger = logger
  43. self.hostname = hostname or socket.gethostname()
  44. self.initial_prefetch_count = initial_prefetch_count
  45. self.control_dispatch = ControlDispatch(logger=logger,
  46. hostname=self.hostname,
  47. listener=self)
  48. self.prefetch_count = None
  49. self.prev_pcount = None
  50. self.event_dispatcher = None
  51. self.heart = None
  52. self._state = None
  53. def start(self):
  54. """Start the consumer.
  55. If the connection is lost, it tries to re-establish the connection
  56. and restarts consuming messages.
  57. """
  58. self.init_callback(self)
  59. while 1:
  60. self.reset_connection()
  61. try:
  62. self.consume_messages()
  63. except (socket.error, AMQPConnectionException, IOError):
  64. self.logger.error("CarrotListener: Connection to broker lost."
  65. + " Trying to re-establish connection...")
  66. def consume_messages(self):
  67. """Consume messages forever (or until an exception is raised)."""
  68. task_consumer = self.task_consumer
  69. self.logger.debug("CarrotListener: Starting message consumer...")
  70. wait_for_message = self._detect_wait_method()(limit=None).next
  71. self.logger.debug("CarrotListener: Ready to accept tasks!")
  72. while 1:
  73. pcount = int(self.prefetch_count) # SharedCounter() -> int()
  74. if not self.prev_pcount or pcount != self.prev_pcount:
  75. self.logger.debug("basic.qos: prefetch_count->%s" % pcount)
  76. task_consumer.qos(prefetch_count=pcount)
  77. self.prev_pcount = pcount
  78. wait_for_message()
  79. def on_task(self, task, eta=None):
  80. """Handle received task.
  81. If the task has an ``eta`` we enter it into the ETA schedule,
  82. otherwise we move it the ready queue for immediate processing.
  83. """
  84. if task.task_id in revoked:
  85. self.logger.warn("Got revoked task from broker: %s[%s]" % (
  86. task.task_name, task.task_id))
  87. return task.on_ack()
  88. self.event_dispatcher.send("task-received", uuid=task.task_id,
  89. name=task.task_name, args=repr(task.args),
  90. kwargs=repr(task.kwargs), retries=task.retries, eta=eta)
  91. if eta:
  92. if not isinstance(eta, datetime):
  93. eta = parse_iso8601(eta)
  94. self.prefetch_count.increment()
  95. self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
  96. task.task_name, task.task_id, eta))
  97. self.eta_schedule.enter(task, eta=eta,
  98. callback=self.prefetch_count.decrement)
  99. else:
  100. self.logger.info("Got task from broker: %s[%s]" % (
  101. task.task_name, task.task_id))
  102. self.ready_queue.put(task)
  103. def receive_message(self, message_data, message):
  104. """The callback called when a new message is received. """
  105. # Handle task
  106. if message_data.get("task"):
  107. try:
  108. task = TaskWrapper.from_message(message, message_data,
  109. logger=self.logger,
  110. eventer=self.event_dispatcher)
  111. except NotRegistered, exc:
  112. self.logger.error("Unknown task ignored: %s: %s" % (
  113. str(exc), message_data))
  114. message.ack()
  115. except InvalidTaskError, exc:
  116. self.logger.error("Invalid task ignored: %s: %s" % (
  117. str(exc), message_data))
  118. message.ack()
  119. else:
  120. self.on_task(task, eta=message_data.get("eta"))
  121. return
  122. # Handle control command
  123. control = message_data.get("control")
  124. if control:
  125. self.control_dispatch.dispatch_from_message(control)
  126. return
  127. warnings.warn(RuntimeWarning(
  128. "Received and deleted unknown message. Wrong destination?!? \
  129. the message was: %s" % message_data))
  130. message.ack()
  131. def close_connection(self):
  132. self.logger.debug("CarrotListener: "
  133. "Closing consumer channel...")
  134. self.task_consumer = self.task_consumer and self.task_consumer.close()
  135. self.logger.debug("CarrotListener: "
  136. "Closing connection to broker...")
  137. self.connection = self.connection and self.connection.close()
  138. def stop_consumers(self, close=True):
  139. if not self._state == RUN:
  140. return
  141. self._state = CLOSE
  142. if self.heart:
  143. self.logger.debug("Heart: Going into cardiac arrest...")
  144. self.heart = self.heart.stop()
  145. self.logger.debug("TaskConsumer: Cancelling consumers...")
  146. if self.task_consumer:
  147. self.task_consumer.cancel()
  148. if self.event_dispatcher:
  149. self.logger.debug("EventDispatcher: Shutting down...")
  150. self.event_dispatcher = self.event_dispatcher.close()
  151. if close:
  152. self.close_connection()
  153. def on_decode_error(self, message, exc):
  154. """Callback called if the message had decoding errors.
  155. :param message: The message with errors.
  156. :param exc: The original exception instance.
  157. """
  158. self.logger.critical("Message decoding error: %s "
  159. "(type:%s encoding:%s raw:'%s')" % (
  160. exc, message.content_type,
  161. message.content_encoding, message.body))
  162. message.ack()
  163. def reset_connection(self):
  164. self.logger.debug(
  165. "CarrotListener: Re-establishing connection to the broker...")
  166. self.stop_consumers()
  167. # Clear internal queues.
  168. self.ready_queue.clear()
  169. self.eta_schedule.clear()
  170. # Reset prefetch window.
  171. self.prefetch_count = SharedCounter(self.initial_prefetch_count)
  172. self.prev_pcount = None
  173. self.connection = self._open_connection()
  174. self.logger.debug("CarrotListener: Connection Established.")
  175. self.task_consumer = get_consumer_set(connection=self.connection)
  176. self.task_consumer.on_decode_error = self.on_decode_error
  177. self.broadcast_consumer = BroadcastConsumer(self.connection,
  178. hostname=self.hostname)
  179. self.task_consumer.register_callback(self.receive_message)
  180. self.event_dispatcher = EventDispatcher(self.connection,
  181. enabled=self.send_events)
  182. self.heart = Heart(self.event_dispatcher)
  183. self.heart.start()
  184. self._state = RUN
  185. def _mainloop(self, **kwargs):
  186. while 1:
  187. yield self.connection.drain_events()
  188. def _detect_wait_method(self):
  189. if hasattr(self.connection.connection, "drain_events"):
  190. self.broadcast_consumer.register_callback(self.receive_message)
  191. self.task_consumer.iterconsume()
  192. self.broadcast_consumer.iterconsume()
  193. return self._mainloop
  194. else:
  195. self.task_consumer.add_consumer(self.broadcast_consumer)
  196. return self.task_consumer.iterconsume
  197. def _open_connection(self):
  198. """Retries connecting to the AMQP broker over time.
  199. See :func:`celery.utils.retry_over_time`.
  200. """
  201. def _connection_error_handler(exc, interval):
  202. """Callback handler for connection errors."""
  203. self.logger.error("CarrotListener: Connection Error: %s. " % exc
  204. + "Trying again in %d seconds..." % interval)
  205. def _establish_connection():
  206. """Establish a connection to the broker."""
  207. conn = establish_connection()
  208. conn.connect() # Connection is established lazily, so connect.
  209. return conn
  210. if not conf.BROKER_CONNECTION_RETRY:
  211. return _establish_connection()
  212. conn = retry_over_time(_establish_connection, (socket.error, IOError),
  213. errback=_connection_error_handler,
  214. max_retries=conf.BROKER_CONNECTION_MAX_RETRIES)
  215. return conn
  216. def stop(self):
  217. self.logger.debug("CarrotListener: Stopping consumers...")
  218. self.stop_consumers(close=False)