test_strategy.py 8.4 KB


  1. from __future__ import absolute_import, unicode_literals
  2. from collections import defaultdict
  3. from contextlib import contextmanager
  4. import pytest
  5. from case import Mock, patch
  6. from kombu.utils.limits import TokenBucket
  7. from celery import Task
  8. from celery.exceptions import InvalidTaskError
  9. from celery.utils.time import rate
  10. from celery.worker import state
  11. from celery.worker.request import Request
  12. from celery.worker.strategy import default as default_strategy
  13. from celery.worker.strategy import proto1_to_proto2
  14. class test_proto1_to_proto2:
  15. def setup(self):
  16. self.message = Mock(name='message')
  17. self.body = {
  18. 'args': (1,),
  19. 'kwargs': {'foo': 'baz'},
  20. 'utc': False,
  21. 'taskset': '123',
  22. }
  23. def test_message_without_args(self):
  24. self.body.pop('args')
  25. body, _, _, _ = proto1_to_proto2(self.message, self.body)
  26. assert body[:2] == ((), {'foo': 'baz'})
  27. def test_message_without_kwargs(self):
  28. self.body.pop('kwargs')
  29. body, _, _, _ = proto1_to_proto2(self.message, self.body)
  30. assert body[:2] == ((1,), {})
  31. def test_message_kwargs_not_mapping(self):
  32. self.body['kwargs'] = (2,)
  33. with pytest.raises(InvalidTaskError):
  34. proto1_to_proto2(self.message, self.body)
  35. def test_message_no_taskset_id(self):
  36. self.body.pop('taskset')
  37. assert proto1_to_proto2(self.message, self.body)
  38. def test_message(self):
  39. body, headers, decoded, utc = proto1_to_proto2(self.message, self.body)
  40. assert body == ((1,), {'foo': 'baz'}, {
  41. 'callbacks': None, 'errbacks': None, 'chord': None, 'chain': None,
  42. })
  43. assert headers == dict(self.body, group='123')
  44. assert decoded
  45. assert not utc
  46. class test_default_strategy_proto2:
  47. def setup(self):
  48. @self.app.task(shared=False)
  49. def add(x, y):
  50. return x + y
  51. self.add = add
  52. def get_message_class(self):
  53. return self.TaskMessage
  54. def prepare_message(self, message):
  55. return message
  56. class Context(object):
  57. def __init__(self, sig, s, reserved, consumer, message):
  58. self.sig = sig
  59. self.s = s
  60. self.reserved = reserved
  61. self.consumer = consumer
  62. self.message = message
  63. def __call__(self, callbacks=[], **kwargs):
  64. return self.s(
  65. self.message,
  66. (self.message.payload
  67. if not self.message.headers.get('id') else None),
  68. self.message.ack, self.message.reject, callbacks, **kwargs
  69. )
  70. def was_reserved(self):
  71. return self.reserved.called
  72. def was_rate_limited(self):
  73. assert not self.was_reserved()
  74. return self.consumer._limit_task.called
  75. def was_limited_with_eta(self):
  76. assert not self.was_reserved()
  77. called = self.consumer.timer.call_at.called
  78. if called:
  79. assert self.consumer.timer.call_at.call_args[0][1] == \
  80. self.consumer._limit_post_eta
  81. return called
  82. def was_scheduled(self):
  83. assert not self.was_reserved()
  84. assert not self.was_rate_limited()
  85. return self.consumer.timer.call_at.called
  86. def event_sent(self):
  87. return self.consumer.event_dispatcher.send.call_args
  88. def get_request(self):
  89. if self.was_reserved():
  90. return self.reserved.call_args[0][0]
  91. if self.was_rate_limited():
  92. return self.consumer._limit_task.call_args[0][0]
  93. if self.was_scheduled():
  94. return self.consumer.timer.call_at.call_args[0][0]
  95. raise ValueError('request not handled')
  96. @contextmanager
  97. def _context(self, sig,
  98. rate_limits=True, events=True, utc=True, limit=None):
  99. assert sig.type.Strategy
  100. assert sig.type.Request
  101. reserved = Mock()
  102. consumer = Mock()
  103. consumer.task_buckets = defaultdict(lambda: None)
  104. if limit:
  105. bucket = TokenBucket(rate(limit), capacity=1)
  106. consumer.task_buckets[sig.task] = bucket
  107. consumer.controller.state.revoked = set()
  108. consumer.disable_rate_limits = not rate_limits
  109. consumer.event_dispatcher.enabled = events
  110. s = sig.type.start_strategy(self.app, consumer, task_reserved=reserved)
  111. assert s
  112. message = self.task_message_from_sig(
  113. self.app, sig, utc=utc, TaskMessage=self.get_message_class(),
  114. )
  115. message = self.prepare_message(message)
  116. yield self.Context(sig, s, reserved, consumer, message)
  117. def test_when_logging_disabled(self):
  118. with patch('celery.worker.strategy.logger') as logger:
  119. logger.isEnabledFor.return_value = False
  120. with self._context(self.add.s(2, 2)) as C:
  121. C()
  122. logger.info.assert_not_called()
  123. def test_task_strategy(self):
  124. with self._context(self.add.s(2, 2)) as C:
  125. C()
  126. assert C.was_reserved()
  127. req = C.get_request()
  128. C.consumer.on_task_request.assert_called_with(req)
  129. assert C.event_sent()
  130. def test_callbacks(self):
  131. with self._context(self.add.s(2, 2)) as C:
  132. callbacks = [Mock(name='cb1'), Mock(name='cb2')]
  133. C(callbacks=callbacks)
  134. req = C.get_request()
  135. for callback in callbacks:
  136. callback.assert_called_with(req)
  137. def test_when_events_disabled(self):
  138. with self._context(self.add.s(2, 2), events=False) as C:
  139. C()
  140. assert C.was_reserved()
  141. assert not C.event_sent()
  142. def test_eta_task(self):
  143. with self._context(self.add.s(2, 2).set(countdown=10)) as C:
  144. C()
  145. assert C.was_scheduled()
  146. C.consumer.qos.increment_eventually.assert_called_with()
  147. def test_eta_task_utc_disabled(self):
  148. with self._context(self.add.s(2, 2).set(countdown=10), utc=False) as C:
  149. C()
  150. assert C.was_scheduled()
  151. C.consumer.qos.increment_eventually.assert_called_with()
  152. def test_when_rate_limited(self):
  153. task = self.add.s(2, 2)
  154. with self._context(task, rate_limits=True, limit='1/m') as C:
  155. C()
  156. assert C.was_rate_limited()
  157. def test_when_rate_limited_with_eta(self):
  158. task = self.add.s(2, 2).set(countdown=10)
  159. with self._context(task, rate_limits=True, limit='1/m') as C:
  160. C()
  161. assert C.was_limited_with_eta()
  162. C.consumer.qos.increment_eventually.assert_called_with()
  163. def test_when_rate_limited__limits_disabled(self):
  164. task = self.add.s(2, 2)
  165. with self._context(task, rate_limits=False, limit='1/m') as C:
  166. C()
  167. assert C.was_reserved()
  168. def test_when_revoked(self):
  169. task = self.add.s(2, 2)
  170. task.freeze()
  171. try:
  172. with self._context(task) as C:
  173. C.consumer.controller.state.revoked.add(task.id)
  174. state.revoked.add(task.id)
  175. C()
  176. with pytest.raises(ValueError):
  177. C.get_request()
  178. finally:
  179. state.revoked.discard(task.id)
  180. class test_default_strategy_proto1(test_default_strategy_proto2):
  181. def get_message_class(self):
  182. return self.TaskMessage1
  183. class test_default_strategy_proto1__no_utc(test_default_strategy_proto2):
  184. def get_message_class(self):
  185. return self.TaskMessage1
  186. def prepare_message(self, message):
  187. message.payload['utc'] = False
  188. return message
  189. class test_custom_request_for_default_strategy(test_default_strategy_proto2):
  190. def test_custom_request_gets_instantiated(self):
  191. _MyRequest = Mock(name='MyRequest')
  192. class MyRequest(Request):
  193. def __init__(self, *args, **kwargs):
  194. Request.__init__(self, *args, **kwargs)
  195. _MyRequest()
  196. class MyTask(Task):
  197. Request = MyRequest
  198. @self.app.task(base=MyTask)
  199. def failed():
  200. raise AssertionError
  201. sig = failed.s()
  202. with self._context(sig) as C:
  203. task_message_handler = default_strategy(
  204. failed,
  205. self.app,
  206. C.consumer
  207. )
  208. task_message_handler(C.message, None, None, None, None)
  209. _MyRequest.assert_called()