listener.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import socket
  2. import warnings
  3. from datetime import datetime
  4. from dateutil.parser import parse as parse_iso8601
  5. from carrot.connection import AMQPConnectionException
  6. from celery import conf
  7. from celery import signals
  8. from celery.utils import noop, retry_over_time
  9. from celery.worker.job import TaskWrapper, InvalidTaskError
  10. from celery.worker.revoke import revoked
  11. from celery.worker.control import ControlDispatch
  12. from celery.worker.heartbeat import Heart
  13. from celery.events import EventDispatcher
  14. from celery.messaging import establish_connection
  15. from celery.messaging import get_consumer_set, BroadcastConsumer
  16. from celery.exceptions import NotRegistered
  17. from celery.datastructures import SharedCounter
  18. RUN = 0x0
  19. CLOSE = 0x1
  20. class CarrotListener(object):
  21. """Listen for messages received from the broker and
  22. move them the the ready queue for task processing.
  23. :param ready_queue: See :attr:`ready_queue`.
  24. :param eta_schedule: See :attr:`eta_schedule`.
  25. .. attribute:: ready_queue
  26. The queue that holds tasks ready for processing immediately.
  27. .. attribute:: eta_schedule
  28. Scheduler for paused tasks. Reasons for being paused include
  29. a countdown/eta or that it's waiting for retry.
  30. .. attribute:: logger
  31. The logger used.
  32. """
  33. def __init__(self, ready_queue, eta_schedule, logger,
  34. init_callback=noop, send_events=False, hostname=None,
  35. initial_prefetch_count=2):
  36. self.connection = None
  37. self.task_consumer = None
  38. self.ready_queue = ready_queue
  39. self.eta_schedule = eta_schedule
  40. self.send_events = send_events
  41. self.init_callback = init_callback
  42. self.logger = logger
  43. self.hostname = hostname or socket.gethostname()
  44. self.control_dispatch = ControlDispatch(logger=logger,
  45. hostname=self.hostname)
  46. self.prefetch_count = SharedCounter(initial_prefetch_count)
  47. self.event_dispatcher = None
  48. self.heart = None
  49. self._state = None
  50. def start(self):
  51. """Start the consumer.
  52. If the connection is lost, it tries to re-establish the connection
  53. and restarts consuming messages.
  54. """
  55. self.init_callback(self)
  56. while 1:
  57. self.reset_connection()
  58. try:
  59. self.consume_messages()
  60. except (socket.error, AMQPConnectionException, IOError):
  61. self.logger.error("CarrotListener: Connection to broker lost."
  62. + " Trying to re-establish connection...")
  63. def consume_messages(self):
  64. """Consume messages forever (or until an exception is raised)."""
  65. task_consumer = self.task_consumer
  66. self.logger.debug("CarrotListener: Starting message consumer...")
  67. wait_for_message = self._detect_wait_method()(limit=None).next
  68. self.logger.debug("CarrotListener: Ready to accept tasks!")
  69. prev_pcount = None
  70. while 1:
  71. pcount = int(self.prefetch_count) # SharedCounter() -> int()
  72. if not prev_pcount or pcount != prev_pcount:
  73. task_consumer.qos(prefetch_count=pcount)
  74. prev_pcount = pcount
  75. wait_for_message()
  76. def on_task(self, task, eta=None):
  77. """Handle received task.
  78. If the task has an ``eta`` we enter it into the ETA schedule,
  79. otherwise we move it the ready queue for immediate processing.
  80. """
  81. if task.task_id in revoked:
  82. self.logger.warn("Got revoked task from broker: %s[%s]" % (
  83. task.task_name, task.task_id))
  84. return task.on_ack()
  85. self.event_dispatcher.send("task-received", uuid=task.task_id,
  86. name=task.task_name, args=task.args, kwargs=task.kwargs,
  87. retries=task.retries, eta=eta)
  88. if eta:
  89. if not isinstance(eta, datetime):
  90. eta = parse_iso8601(eta)
  91. self.prefetch_count.increment()
  92. self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
  93. task.task_name, task.task_id, eta))
  94. self.eta_schedule.enter(task, eta=eta,
  95. callback=self.prefetch_count.decrement)
  96. else:
  97. self.logger.info("Got task from broker: %s[%s]" % (
  98. task.task_name, task.task_id))
  99. self.ready_queue.put(task)
  100. def receive_message(self, message_data, message):
  101. """The callback called when a new message is received. """
  102. # Handle task
  103. if message_data.get("task"):
  104. try:
  105. task = TaskWrapper.from_message(message, message_data,
  106. logger=self.logger,
  107. eventer=self.event_dispatcher)
  108. except NotRegistered, exc:
  109. self.logger.error("Unknown task ignored: %s: %s" % (
  110. str(exc), message_data))
  111. message.ack()
  112. except InvalidTaskError, exc:
  113. self.logger.error("Invalid task ignored: %s: %s" % (
  114. str(exc), message_data))
  115. message.ack()
  116. else:
  117. self.on_task(task, eta=message_data.get("eta"))
  118. return
  119. # Handle control command
  120. control = message_data.get("control")
  121. if control:
  122. self.control_dispatch.dispatch_from_message(control)
  123. return
  124. warnings.warn(RuntimeWarning(
  125. "Received and deleted unknown message. Wrong destination?!? \
  126. the message was: %s" % message_data))
  127. message.ack()
  128. def close_connection(self):
  129. if not self._state == RUN:
  130. return
  131. self._state = CLOSE
  132. if self.heart:
  133. self.logger.debug("Heart: Going into cardiac arrest...")
  134. self.heart = self.heart.stop()
  135. self.logger.debug("TaskConsumer: Shutting down...")
  136. self.task_consumer = self.task_consumer and self.task_consumer.close()
  137. if self.event_dispatcher:
  138. self.logger.debug("EventDispatcher: Shutting down...")
  139. self.event_dispatcher = self.event_dispatcher.close()
  140. self.logger.debug("CarrotListener: "
  141. "Closing connection to broker...")
  142. self.connection = self.connection and self.connection.close()
  143. def reset_connection(self):
  144. self.logger.debug(
  145. "CarrotListener: Re-establishing connection to the broker...")
  146. self.close_connection()
  147. self.connection = self._open_connection()
  148. self.logger.debug("CarrotListener: Connection Established.")
  149. self.task_consumer = get_consumer_set(connection=self.connection)
  150. self.broadcast_consumer = BroadcastConsumer(self.connection,
  151. hostname=self.hostname)
  152. self.task_consumer.register_callback(self.receive_message)
  153. self.event_dispatcher = EventDispatcher(self.connection,
  154. enabled=self.send_events)
  155. self.heart = Heart(self.event_dispatcher)
  156. self.heart.start()
  157. self._state = RUN
  158. def _mainloop(self, **kwargs):
  159. while 1:
  160. yield self.connection.connection.drain_events()
  161. def _detect_wait_method(self):
  162. if hasattr(self.connection.connection, "drain_events"):
  163. self.broadcast_consumer.register_callback(self.receive_message)
  164. self.task_consumer.iterconsume()
  165. self.broadcast_consumer.iterconsume()
  166. return self._mainloop
  167. else:
  168. self.task_consumer.add_consumer(self.broadcast_consumer)
  169. return self.task_consumer.iterconsume
  170. def _open_connection(self):
  171. """Retries connecting to the AMQP broker over time.
  172. See :func:`celery.utils.retry_over_time`.
  173. """
  174. def _connection_error_handler(exc, interval):
  175. """Callback handler for connection errors."""
  176. self.logger.error("AMQP Listener: Connection Error: %s. " % exc
  177. + "Trying again in %d seconds..." % interval)
  178. def _establish_connection():
  179. """Establish a connection to the AMQP broker."""
  180. conn = establish_connection()
  181. conn.connect() # Connection is established lazily, so connect.
  182. return conn
  183. if not conf.BROKER_CONNECTION_RETRY:
  184. return _establish_connection()
  185. conn = retry_over_time(_establish_connection, (socket.error, IOError),
  186. errback=_connection_error_handler,
  187. max_retries=conf.BROKER_CONNECTION_MAX_RETRIES)
  188. return conn
  189. def stop(self):
  190. self.close_connection()