loops.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. """
  2. celery.worker.loop
  3. ~~~~~~~~~~~~~~~~~~
  4. The consumers highly-optimized inner loop.
  5. """
  6. from __future__ import absolute_import
  7. import socket
  8. from celery.bootsteps import RUN
  9. from celery.exceptions import WorkerShutdown, WorkerTerminate, WorkerLostError
  10. from celery.utils.log import get_logger
  11. from . import state
  12. __all__ = ['asynloop', 'synloop']
  13. logger = get_logger(__name__)
  14. error = logger.error
  15. def asynloop(obj, connection, consumer, blueprint, hub, qos,
  16. heartbeat, clock, hbrate=2.0, RUN=RUN):
  17. """Non-blocking event loop consuming messages until connection is lost,
  18. or shutdown is requested."""
  19. update_qos = qos.update
  20. readers, writers = hub.readers, hub.writers
  21. hbtick = connection.heartbeat_check
  22. errors = connection.connection_errors
  23. heartbeat = connection.get_heartbeat_interval() # negotiated
  24. hub_add, hub_remove = hub.add, hub.remove
  25. on_task_received = obj.create_task_handler()
  26. if heartbeat and connection.supports_heartbeats:
  27. hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
  28. consumer.on_message = on_task_received
  29. consumer.consume()
  30. obj.on_ready()
  31. obj.controller.register_with_event_loop(hub)
  32. obj.register_with_event_loop(hub)
  33. # did_start_ok will verify that pool processes were able to start,
  34. # but this will only work the first time we start, as
  35. # maxtasksperchild will mess up metrics.
  36. if not obj.restart_count and not obj.pool.did_start_ok():
  37. raise WorkerLostError('Could not start worker processes')
  38. # FIXME: Use loop.run_forever
  39. # Tried and works, but no time to test properly before release.
  40. hub.propagate_errors = errors
  41. loop = hub.create_loop()
  42. try:
  43. while blueprint.state == RUN and obj.connection:
  44. # shutdown if signal handlers told us to.
  45. if state.should_stop:
  46. raise WorkerShutdown()
  47. elif state.should_terminate:
  48. raise WorkerTerminate()
  49. # We only update QoS when there is no more messages to read.
  50. # This groups together qos calls, and makes sure that remote
  51. # control commands will be prioritized over task messages.
  52. if qos.prev != qos.value:
  53. update_qos()
  54. try:
  55. next(loop)
  56. except StopIteration:
  57. loop = hub.create_loop()
  58. finally:
  59. try:
  60. hub.reset()
  61. except Exception as exc:
  62. error(
  63. 'Error cleaning up after event loop: %r', exc, exc_info=1,
  64. )
  65. def synloop(obj, connection, consumer, blueprint, hub, qos,
  66. heartbeat, clock, hbrate=2.0, **kwargs):
  67. """Fallback blocking event loop for transports that doesn't support AIO."""
  68. on_task_received = obj.create_task_handler()
  69. consumer.on_message = on_task_received
  70. consumer.consume()
  71. obj.on_ready()
  72. while blueprint.state == RUN and obj.connection:
  73. state.maybe_shutdown()
  74. if qos.prev != qos.value:
  75. qos.update()
  76. try:
  77. connection.drain_events(timeout=2.0)
  78. except socket.timeout:
  79. pass
  80. except socket.error:
  81. if blueprint.state == RUN:
  82. raise