test_strategy.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import pytest
  2. from collections import defaultdict
  3. from contextlib import contextmanager
  4. from case import Mock, patch
  5. from kombu.utils.limits import TokenBucket
  6. from celery.exceptions import InvalidTaskError
  7. from celery.worker import state
  8. from celery.worker.strategy import proto1_to_proto2
  9. from celery.utils.time import rate
  10. class test_proto1_to_proto2:
  11. def setup(self):
  12. self.message = Mock(name='message')
  13. self.body = {
  14. 'args': (1,),
  15. 'kwargs': {'foo': 'baz'},
  16. 'utc': False,
  17. 'taskset': '123',
  18. }
  19. def test_message_without_args(self):
  20. self.body.pop('args')
  21. with pytest.raises(InvalidTaskError):
  22. proto1_to_proto2(self.message, self.body)
  23. def test_message_without_kwargs(self):
  24. self.body.pop('kwargs')
  25. with pytest.raises(InvalidTaskError):
  26. proto1_to_proto2(self.message, self.body)
  27. def test_message_kwargs_not_mapping(self):
  28. self.body['kwargs'] = (2,)
  29. with pytest.raises(InvalidTaskError):
  30. proto1_to_proto2(self.message, self.body)
  31. def test_message_no_taskset_id(self):
  32. self.body.pop('taskset')
  33. assert proto1_to_proto2(self.message, self.body)
  34. def test_message(self):
  35. body, headers, decoded, utc = proto1_to_proto2(self.message, self.body)
  36. assert body == ((1,), {'foo': 'baz'}, {
  37. 'callbacks': None, 'errbacks': None, 'chord': None, 'chain': None,
  38. })
  39. assert headers == dict(self.body, group='123')
  40. assert decoded
  41. assert not utc
  42. class test_default_strategy_proto2:
  43. def setup(self):
  44. @self.app.task(shared=False)
  45. def add(x, y):
  46. return x + y
  47. self.add = add
  48. def get_message_class(self):
  49. return self.TaskMessage
  50. def prepare_message(self, message):
  51. return message
  52. class Context:
  53. def __init__(self, sig, s, reserved, consumer, message):
  54. self.sig = sig
  55. self.s = s
  56. self.reserved = reserved
  57. self.consumer = consumer
  58. self.message = message
  59. def __call__(self, callbacks=[], **kwargs):
  60. return self.s(
  61. self.message,
  62. (self.message.payload
  63. if not self.message.headers.get('id') else None),
  64. self.message.ack, self.message.reject, callbacks, **kwargs
  65. )
  66. def was_reserved(self):
  67. return self.reserved.called
  68. def was_rate_limited(self):
  69. assert not self.was_reserved()
  70. return self.consumer._limit_task.called
  71. def was_scheduled(self):
  72. assert not self.was_reserved()
  73. assert not self.was_rate_limited()
  74. return self.consumer.timer.call_at.called
  75. def event_sent(self):
  76. return self.consumer.event_dispatcher.send.call_args
  77. def get_request(self):
  78. if self.was_reserved():
  79. return self.reserved.call_args[0][0]
  80. if self.was_rate_limited():
  81. return self.consumer._limit_task.call_args[0][0]
  82. if self.was_scheduled():
  83. return self.consumer.timer.call_at.call_args[0][0]
  84. raise ValueError('request not handled')
  85. @contextmanager
  86. def _context(self, sig,
  87. rate_limits=True, events=True, utc=True, limit=None):
  88. assert sig.type.Strategy
  89. reserved = Mock()
  90. consumer = Mock()
  91. consumer.task_buckets = defaultdict(lambda: None)
  92. if limit:
  93. bucket = TokenBucket(rate(limit), capacity=1)
  94. consumer.task_buckets[sig.task] = bucket
  95. consumer.controller.state.revoked = set()
  96. consumer.disable_rate_limits = not rate_limits
  97. consumer.event_dispatcher.enabled = events
  98. s = sig.type.start_strategy(self.app, consumer, task_reserved=reserved)
  99. assert s
  100. message = self.task_message_from_sig(
  101. self.app, sig, utc=utc, TaskMessage=self.get_message_class(),
  102. )
  103. message = self.prepare_message(message)
  104. yield self.Context(sig, s, reserved, consumer, message)
  105. def test_when_logging_disabled(self):
  106. with patch('celery.worker.strategy.logger') as logger:
  107. logger.isEnabledFor.return_value = False
  108. with self._context(self.add.s(2, 2)) as C:
  109. C()
  110. logger.info.assert_not_called()
  111. def test_task_strategy(self):
  112. with self._context(self.add.s(2, 2)) as C:
  113. C()
  114. assert C.was_reserved()
  115. req = C.get_request()
  116. C.consumer.on_task_request.assert_called_with(req)
  117. assert C.event_sent()
  118. def test_callbacks(self):
  119. with self._context(self.add.s(2, 2)) as C:
  120. callbacks = [Mock(name='cb1'), Mock(name='cb2')]
  121. C(callbacks=callbacks)
  122. req = C.get_request()
  123. for callback in callbacks:
  124. callback.assert_called_with(req)
  125. def test_when_events_disabled(self):
  126. with self._context(self.add.s(2, 2), events=False) as C:
  127. C()
  128. assert C.was_reserved()
  129. assert not C.event_sent()
  130. def test_eta_task(self):
  131. with self._context(self.add.s(2, 2).set(countdown=10)) as C:
  132. C()
  133. assert C.was_scheduled()
  134. C.consumer.qos.increment_eventually.assert_called_with()
  135. def test_eta_task_utc_disabled(self):
  136. with self._context(self.add.s(2, 2).set(countdown=10), utc=False) as C:
  137. C()
  138. assert C.was_scheduled()
  139. C.consumer.qos.increment_eventually.assert_called_with()
  140. def test_when_rate_limited(self):
  141. task = self.add.s(2, 2)
  142. with self._context(task, rate_limits=True, limit='1/m') as C:
  143. C()
  144. assert C.was_rate_limited()
  145. def test_when_rate_limited__limits_disabled(self):
  146. task = self.add.s(2, 2)
  147. with self._context(task, rate_limits=False, limit='1/m') as C:
  148. C()
  149. assert C.was_reserved()
  150. def test_when_revoked(self):
  151. task = self.add.s(2, 2)
  152. task.freeze()
  153. try:
  154. with self._context(task) as C:
  155. C.consumer.controller.state.revoked.add(task.id)
  156. state.revoked.add(task.id)
  157. C()
  158. with pytest.raises(ValueError):
  159. C.get_request()
  160. finally:
  161. state.revoked.discard(task.id)
  162. class test_default_strategy_proto1(test_default_strategy_proto2):
  163. def get_message_class(self):
  164. return self.TaskMessage1
  165. class test_default_strategy_proto1__no_utc(test_default_strategy_proto2):
  166. def get_message_class(self):
  167. return self.TaskMessage1
  168. def prepare_message(self, message):
  169. message.payload['utc'] = False
  170. return message