loops.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """
  2. celery.worker.loop
  3. ~~~~~~~~~~~~~~~~~~
  4. The consumers highly-optimized inner loop.
  5. """
  6. from __future__ import absolute_import
  7. import socket
  8. from time import sleep
  9. from celery.bootsteps import CLOSE
  10. from celery.exceptions import SystemTerminate, WorkerLostError
  11. from celery.five import Empty
  12. from celery.utils.log import get_logger
  13. from . import state
  14. __all__ = ['asynloop', 'synloop']
  15. logger = get_logger(__name__)
  16. error = logger.error
  17. def asynloop(obj, connection, consumer, blueprint, hub, qos,
  18. heartbeat, clock, hbrate=2.0,
  19. sleep=sleep, min=min, Empty=Empty):
  20. """Non-blocking event loop consuming messages until connection is lost,
  21. or shutdown is requested."""
  22. update_qos = qos.update
  23. readers, writers = hub.readers, hub.writers
  24. hbtick = connection.heartbeat_check
  25. errors = connection.connection_errors
  26. hub_add, hub_remove = hub.add, hub.remove
  27. on_task_received = obj.create_task_handler()
  28. if heartbeat and connection.supports_heartbeats:
  29. hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
  30. consumer.callbacks = [on_task_received]
  31. consumer.consume()
  32. obj.on_ready()
  33. obj.controller.register_with_event_loop(hub)
  34. obj.register_with_event_loop(hub)
  35. # did_start_ok will verify that pool processes were able to start,
  36. # but this will only work the first time we start, as
  37. # maxtasksperchild will mess up metrics.
  38. if not obj.restart_count and not obj.pool.did_start_ok():
  39. raise WorkerLostError('Could not start worker processes')
  40. loop = hub._loop(propagate=errors)
  41. try:
  42. while blueprint.state != CLOSE and obj.connection:
  43. # shutdown if signal handlers told us to.
  44. if state.should_stop:
  45. raise SystemExit()
  46. elif state.should_terminate:
  47. raise SystemTerminate()
  48. # We only update QoS when there is no more messages to read.
  49. # This groups together qos calls, and makes sure that remote
  50. # control commands will be prioritized over task messages.
  51. if qos.prev != qos.value:
  52. update_qos()
  53. next(loop, None)
  54. finally:
  55. try:
  56. hub.close()
  57. except Exception as exc:
  58. error(
  59. 'Error cleaning up after event loop: %r', exc, exc_info=1,
  60. )
  61. def synloop(obj, connection, consumer, blueprint, hub, qos,
  62. heartbeat, clock, hbrate=2.0, **kwargs):
  63. """Fallback blocking event loop for transports that doesn't support AIO."""
  64. on_task_received = obj.create_task_handler()
  65. consumer.register_callback(on_task_received)
  66. consumer.consume()
  67. obj.on_ready()
  68. while blueprint.state != CLOSE and obj.connection:
  69. state.maybe_shutdown()
  70. if qos.prev != qos.value:
  71. qos.update()
  72. try:
  73. connection.drain_events(timeout=2.0)
  74. except socket.timeout:
  75. pass
  76. except socket.error:
  77. if blueprint.state != CLOSE:
  78. raise