loops.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. celery.worker.loops
  3. ~~~~~~~~~~~~~~~~~~~
  4. The consumers highly-optimized inner loop.
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. import errno
  8. import socket
  9. from celery.bootsteps import RUN
  10. from celery.exceptions import WorkerShutdown, WorkerTerminate, WorkerLostError
  11. from celery.utils.log import get_logger
  12. from . import state
  13. __all__ = ['asynloop', 'synloop']
  14. logger = get_logger(__name__)
  15. error = logger.error
  16. def _quick_drain(connection, timeout=0.1):
  17. try:
  18. connection.drain_events(timeout=timeout)
  19. except Exception as exc:
  20. exc_errno = getattr(exc, 'errno', None)
  21. if exc_errno is not None and exc_errno != errno.EAGAIN:
  22. raise
  23. def asynloop(obj, connection, consumer, blueprint, hub, qos,
  24. heartbeat, clock, hbrate=2.0, RUN=RUN):
  25. """Non-blocking event loop consuming messages until connection is lost,
  26. or shutdown is requested."""
  27. update_qos = qos.update
  28. hbtick = connection.heartbeat_check
  29. errors = connection.connection_errors
  30. heartbeat = connection.get_heartbeat_interval() # negotiated
  31. on_task_received = obj.create_task_handler()
  32. if heartbeat and connection.supports_heartbeats:
  33. hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
  34. consumer.on_message = on_task_received
  35. consumer.consume()
  36. obj.on_ready()
  37. obj.controller.register_with_event_loop(hub)
  38. obj.register_with_event_loop(hub)
  39. # did_start_ok will verify that pool processes were able to start,
  40. # but this will only work the first time we start, as
  41. # maxtasksperchild will mess up metrics.
  42. if not obj.restart_count and not obj.pool.did_start_ok():
  43. raise WorkerLostError('Could not start worker processes')
  44. # consumer.consume() may have prefetched up to our
  45. # limit - drain an event so we are in a clean state
  46. # prior to starting our event loop.
  47. if connection.transport.driver_type == 'amqp':
  48. hub.call_soon(_quick_drain, connection)
  49. # FIXME: Use loop.run_forever
  50. # Tried and works, but no time to test properly before release.
  51. hub.propagate_errors = errors
  52. loop = hub.create_loop()
  53. try:
  54. while blueprint.state == RUN and obj.connection:
  55. # shutdown if signal handlers told us to.
  56. should_stop, should_terminate = (
  57. state.should_stop, state.should_terminate,
  58. )
  59. # False == EX_OK, so must use is not False
  60. if should_stop is not None and should_stop is not False:
  61. raise WorkerShutdown(should_stop)
  62. elif should_terminate is not None and should_stop is not False:
  63. raise WorkerTerminate(should_terminate)
  64. # We only update QoS when there is no more messages to read.
  65. # This groups together qos calls, and makes sure that remote
  66. # control commands will be prioritized over task messages.
  67. if qos.prev != qos.value:
  68. update_qos()
  69. try:
  70. next(loop)
  71. except StopIteration:
  72. loop = hub.create_loop()
  73. finally:
  74. try:
  75. hub.reset()
  76. except Exception as exc:
  77. error(
  78. 'Error cleaning up after event loop: %r', exc, exc_info=1,
  79. )
  80. def synloop(obj, connection, consumer, blueprint, hub, qos,
  81. heartbeat, clock, hbrate=2.0, **kwargs):
  82. """Fallback blocking event loop for transports that doesn't support AIO."""
  83. on_task_received = obj.create_task_handler()
  84. perform_pending_operations = obj.perform_pending_operations
  85. consumer.on_message = on_task_received
  86. consumer.consume()
  87. obj.on_ready()
  88. while blueprint.state == RUN and obj.connection:
  89. state.maybe_shutdown()
  90. if qos.prev != qos.value:
  91. qos.update()
  92. try:
  93. perform_pending_operations()
  94. connection.drain_events(timeout=2.0)
  95. except socket.timeout:
  96. pass
  97. except socket.error:
  98. if blueprint.state == RUN:
  99. raise