worker.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. """Embedded workers for integration tests."""
  2. import os
  3. import threading
  4. from contextlib import contextmanager
  5. from celery import worker
  6. from celery.result import allow_join_result, _set_task_join_will_block
  7. from celery.utils.dispatch import Signal
  8. from celery.utils.nodenames import anon_nodename
  9. test_worker_starting = Signal(providing_args=[])
  10. test_worker_started = Signal(providing_args=['worker', 'consumer'])
  11. test_worker_stopped = Signal(providing_args=['worker'])
  12. WORKER_LOGLEVEL = os.environ.get('WORKER_LOGLEVEL', 'error')
  13. class TestWorkController(worker.WorkController):
  14. """Worker that can synchronize on being fully started."""
  15. def __init__(self, *args, **kwargs):
  16. # type: (*Any, **Any) -> None
  17. self._on_started = threading.Event()
  18. super(TestWorkController, self).__init__(*args, **kwargs)
  19. def on_consumer_ready(self, consumer):
  20. # type: (celery.worker.consumer.Consumer) -> None
  21. """Callback called when the Consumer blueprint is fully started."""
  22. self._on_started.set()
  23. test_worker_started.send(
  24. sender=self.app, worker=self, consumer=consumer)
  25. def ensure_started(self):
  26. # type: () -> None
  27. """Wait for worker to be fully up and running.
  28. Warning:
  29. Worker must be started within a thread for this to work,
  30. or it will block forever.
  31. """
  32. self._on_started.wait()
  33. @contextmanager
  34. def start_worker(app,
  35. concurrency=1,
  36. pool='solo',
  37. loglevel=WORKER_LOGLEVEL,
  38. logfile=None,
  39. perform_ping_check=True,
  40. ping_task_timeout=10.0,
  41. **kwargs):
  42. # type: (Celery, int, str, Union[str, int],
  43. # str, bool, float, **Any) -> # Iterable
  44. """Start embedded worker.
  45. Yields:
  46. celery.app.worker.Worker: worker instance.
  47. """
  48. test_worker_starting.send(sender=app)
  49. with _start_worker_thread(app,
  50. concurrency=concurrency,
  51. pool=pool,
  52. loglevel=loglevel,
  53. logfile=logfile,
  54. **kwargs) as worker:
  55. if perform_ping_check:
  56. from .tasks import ping
  57. with allow_join_result():
  58. assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
  59. yield worker
  60. test_worker_stopped.send(sender=app, worker=worker)
  61. @contextmanager
  62. def _start_worker_thread(app,
  63. concurrency=1,
  64. pool='solo',
  65. loglevel=WORKER_LOGLEVEL,
  66. logfile=None,
  67. WorkController=TestWorkController,
  68. **kwargs):
  69. # type: (Celery, int, str, Union[str, int], str, Any, **Any) -> Iterable
  70. """Start Celery worker in a thread.
  71. Yields:
  72. celery.worker.Worker: worker instance.
  73. """
  74. setup_app_for_worker(app, loglevel, logfile)
  75. assert 'celery.ping' in app.tasks
  76. # Make sure we can connect to the broker
  77. with app.connection() as conn:
  78. conn.default_channel.queue_declare
  79. worker = WorkController(
  80. app=app,
  81. concurrency=concurrency,
  82. hostname=anon_nodename(),
  83. pool=pool,
  84. loglevel=loglevel,
  85. logfile=logfile,
  86. # not allowed to override TestWorkController.on_consumer_ready
  87. ready_callback=None,
  88. without_heartbeat=True,
  89. without_mingle=True,
  90. without_gossip=True,
  91. **kwargs)
  92. t = threading.Thread(target=worker.start)
  93. t.start()
  94. worker.ensure_started()
  95. _set_task_join_will_block(False)
  96. yield worker
  97. from celery.worker import state
  98. state.should_terminate = 0
  99. t.join(10)
  100. state.should_terminate = None
  101. @contextmanager
  102. def _start_worker_process(app,
  103. concurrency=1,
  104. pool='solo',
  105. loglevel=WORKER_LOGLEVEL,
  106. logfile=None,
  107. **kwargs):
  108. # type (Celery, int, str, Union[int, str], str, **Any) -> Iterable
  109. """Start worker in separate process.
  110. Yields:
  111. celery.app.worker.Worker: worker instance.
  112. """
  113. from celery.apps.multi import Cluster, Node
  114. app.set_current()
  115. cluster = Cluster([Node('testworker1@%h')])
  116. cluster.start()
  117. yield
  118. cluster.stopwait()
  119. def setup_app_for_worker(app, loglevel, logfile):
  120. # type: (Celery, Union[str, int], str) -> None
  121. """Setup the app to be used for starting an embedded worker."""
  122. app.finalize()
  123. app.set_current()
  124. app.set_default()
  125. type(app.log)._setup = False
  126. app.log.setup(loglevel=loglevel, logfile=logfile)