worker.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. """Embedded workers for integration tests."""
  2. from __future__ import absolute_import, unicode_literals
  3. import os
  4. import threading
  5. from contextlib import contextmanager
  6. from celery import worker
  7. from celery.result import _set_task_join_will_block, allow_join_result
  8. from celery.utils.dispatch import Signal
  9. from celery.utils.nodenames import anon_nodename
  10. WORKER_LOGLEVEL = os.environ.get('WORKER_LOGLEVEL', 'error')
  11. test_worker_starting = Signal(
  12. name='test_worker_starting',
  13. providing_args={},
  14. )
  15. test_worker_started = Signal(
  16. name='test_worker_started',
  17. providing_args={'worker', 'consumer'},
  18. )
  19. test_worker_stopped = Signal(
  20. name='test_worker_stopped',
  21. providing_args={'worker'},
  22. )
  23. class TestWorkController(worker.WorkController):
  24. """Worker that can synchronize on being fully started."""
  25. def __init__(self, *args, **kwargs):
  26. # type: (*Any, **Any) -> None
  27. self._on_started = threading.Event()
  28. super(TestWorkController, self).__init__(*args, **kwargs)
  29. def on_consumer_ready(self, consumer):
  30. # type: (celery.worker.consumer.Consumer) -> None
  31. """Callback called when the Consumer blueprint is fully started."""
  32. self._on_started.set()
  33. test_worker_started.send(
  34. sender=self.app, worker=self, consumer=consumer)
  35. def ensure_started(self):
  36. # type: () -> None
  37. """Wait for worker to be fully up and running.
  38. Warning:
  39. Worker must be started within a thread for this to work,
  40. or it will block forever.
  41. """
  42. self._on_started.wait()
  43. @contextmanager
  44. def start_worker(app,
  45. concurrency=1,
  46. pool='solo',
  47. loglevel=WORKER_LOGLEVEL,
  48. logfile=None,
  49. perform_ping_check=True,
  50. ping_task_timeout=10.0,
  51. **kwargs):
  52. # type: (Celery, int, str, Union[str, int],
  53. # str, bool, float, **Any) -> # Iterable
  54. """Start embedded worker.
  55. Yields:
  56. celery.app.worker.Worker: worker instance.
  57. """
  58. test_worker_starting.send(sender=app)
  59. with _start_worker_thread(app,
  60. concurrency=concurrency,
  61. pool=pool,
  62. loglevel=loglevel,
  63. logfile=logfile,
  64. **kwargs) as worker:
  65. if perform_ping_check:
  66. from .tasks import ping
  67. with allow_join_result():
  68. assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
  69. yield worker
  70. test_worker_stopped.send(sender=app, worker=worker)
  71. @contextmanager
  72. def _start_worker_thread(app,
  73. concurrency=1,
  74. pool='solo',
  75. loglevel=WORKER_LOGLEVEL,
  76. logfile=None,
  77. WorkController=TestWorkController,
  78. **kwargs):
  79. # type: (Celery, int, str, Union[str, int], str, Any, **Any) -> Iterable
  80. """Start Celery worker in a thread.
  81. Yields:
  82. celery.worker.Worker: worker instance.
  83. """
  84. setup_app_for_worker(app, loglevel, logfile)
  85. assert 'celery.ping' in app.tasks
  86. # Make sure we can connect to the broker
  87. with app.connection() as conn:
  88. conn.default_channel.queue_declare
  89. worker = WorkController(
  90. app=app,
  91. concurrency=concurrency,
  92. hostname=anon_nodename(),
  93. pool=pool,
  94. loglevel=loglevel,
  95. logfile=logfile,
  96. # not allowed to override TestWorkController.on_consumer_ready
  97. ready_callback=None,
  98. without_heartbeat=True,
  99. without_mingle=True,
  100. without_gossip=True,
  101. **kwargs)
  102. t = threading.Thread(target=worker.start)
  103. t.start()
  104. worker.ensure_started()
  105. _set_task_join_will_block(False)
  106. yield worker
  107. from celery.worker import state
  108. state.should_terminate = 0
  109. t.join(10)
  110. state.should_terminate = None
  111. @contextmanager
  112. def _start_worker_process(app,
  113. concurrency=1,
  114. pool='solo',
  115. loglevel=WORKER_LOGLEVEL,
  116. logfile=None,
  117. **kwargs):
  118. # type (Celery, int, str, Union[int, str], str, **Any) -> Iterable
  119. """Start worker in separate process.
  120. Yields:
  121. celery.app.worker.Worker: worker instance.
  122. """
  123. from celery.apps.multi import Cluster, Node
  124. app.set_current()
  125. cluster = Cluster([Node('testworker1@%h')])
  126. cluster.start()
  127. yield
  128. cluster.stopwait()
  129. def setup_app_for_worker(app, loglevel, logfile):
  130. # type: (Celery, Union[str, int], str) -> None
  131. """Setup the app to be used for starting an embedded worker."""
  132. app.finalize()
  133. app.set_current()
  134. app.set_default()
  135. type(app.log)._setup = False
  136. app.log.setup(loglevel=loglevel, logfile=logfile)