loops.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """
  2. celery.worker.loop
  3. ~~~~~~~~~~~~~~~~~~
  4. The consumers highly-optimized inner loop.
  5. """
  6. from __future__ import absolute_import
  7. import socket
  8. from time import sleep
  9. from Queue import Empty
  10. from kombu.utils.eventio import READ, WRITE, ERR
  11. from celery.bootsteps import CLOSE
  12. from celery.exceptions import InvalidTaskError, SystemTerminate
  13. from . import state
  14. #: Heartbeat check is called every heartbeat_seconds' / rate'.
  15. AMQHEARTBEAT_RATE = 2.0
  16. def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
  17. heartbeat, handle_unknown_message, handle_unknown_task,
  18. handle_invalid_task, sleep=sleep, min=min, Empty=Empty,
  19. hbrate=AMQHEARTBEAT_RATE):
  20. """Non-blocking eventloop consuming messages until connection is lost,
  21. or shutdown is requested."""
  22. with hub as hub:
  23. update_qos = qos.update
  24. update_readers = hub.update_readers
  25. readers, writers = hub.readers, hub.writers
  26. poll = hub.poller.poll
  27. fire_timers = hub.fire_timers
  28. scheduled = hub.timer._queue
  29. hbtick = connection.heartbeat_check
  30. on_poll_start = connection.transport.on_poll_start
  31. on_poll_empty = connection.transport.on_poll_empty
  32. drain_nowait = connection.drain_nowait
  33. on_task_callbacks = hub.on_task
  34. keep_draining = connection.transport.nb_keep_draining
  35. if heartbeat and connection.supports_heartbeats:
  36. hub.timer.apply_interval(
  37. heartbeat * 1000.0 / hbrate, hbtick, (hbrate, ))
  38. def on_task_received(body, message):
  39. if on_task_callbacks:
  40. [callback() for callback in on_task_callbacks]
  41. try:
  42. name = body['task']
  43. except (KeyError, TypeError):
  44. return handle_unknown_message(body, message)
  45. try:
  46. strategies[name](message, body, message.ack_log_error)
  47. except KeyError as exc:
  48. handle_unknown_task(body, message, exc)
  49. except InvalidTaskError as exc:
  50. handle_invalid_task(body, message, exc)
  51. consumer.callbacks = [on_task_received]
  52. consumer.consume()
  53. obj.on_ready()
  54. while ns.state != CLOSE and obj.connection:
  55. # shutdown if signal handlers told us to.
  56. if state.should_stop:
  57. raise SystemExit()
  58. elif state.should_terminate:
  59. raise SystemTerminate()
  60. # fire any ready timers, this also returns
  61. # the number of seconds until we need to fire timers again.
  62. poll_timeout = fire_timers() if scheduled else 1
  63. # We only update QoS when there is no more messages to read.
  64. # This groups together qos calls, and makes sure that remote
  65. # control commands will be prioritized over task messages.
  66. if qos.prev != qos.value:
  67. update_qos()
  68. update_readers(on_poll_start())
  69. if readers or writers:
  70. connection.more_to_read = True
  71. while connection.more_to_read:
  72. try:
  73. events = poll(poll_timeout)
  74. except ValueError: # Issue 882
  75. return
  76. if not events:
  77. on_poll_empty()
  78. for fileno, event in events or ():
  79. try:
  80. if event & READ:
  81. readers[fileno](fileno, event)
  82. if event & WRITE:
  83. writers[fileno](fileno, event)
  84. if event & ERR:
  85. for handlermap in readers, writers:
  86. try:
  87. handlermap[fileno](fileno, event)
  88. except KeyError:
  89. pass
  90. except (KeyError, Empty):
  91. continue
  92. except socket.error:
  93. if ns.state != CLOSE: # pragma: no cover
  94. raise
  95. if keep_draining:
  96. drain_nowait()
  97. poll_timeout = 0
  98. else:
  99. connection.more_to_read = False
  100. else:
  101. # no sockets yet, startup is probably not done.
  102. sleep(min(poll_timeout, 0.1))
  103. def synloop(obj, connection, consumer, strategies, ns, hub, qos,
  104. heartbeat, handle_unknown_message, handle_unknown_task,
  105. handle_invalid_task, **kwargs):
  106. """Fallback blocking eventloop for transports that doesn't support AIO."""
  107. def on_task_received(body, message):
  108. try:
  109. name = body['task']
  110. except (KeyError, TypeError):
  111. return handle_unknown_message(body, message)
  112. try:
  113. strategies[name](message, body, message.ack_log_error)
  114. except KeyError as exc:
  115. handle_unknown_task(body, message, exc)
  116. except InvalidTaskError as exc:
  117. handle_invalid_task(body, message, exc)
  118. consumer.register_callback(on_task_received)
  119. consumer.consume()
  120. obj.on_ready()
  121. while ns.state != CLOSE and obj.connection:
  122. state.maybe_shutdown()
  123. if qos.prev != qos.value: # pragma: no cover
  124. qos.update()
  125. try:
  126. connection.drain_events(timeout=2.0)
  127. except socket.timeout:
  128. pass
  129. except socket.error:
  130. if ns.state != CLOSE: # pragma: no cover
  131. raise