test_amqp.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from __future__ import absolute_import, unicode_literals
  2. import pickle
  3. from contextlib import contextmanager
  4. from datetime import timedelta
  5. from pickle import dumps, loads
  6. import pytest
  7. from billiard.einfo import ExceptionInfo
  8. from case import Mock, mock
  9. from celery import states, uuid
  10. from celery.backends.amqp import AMQPBackend
  11. from celery.five import Empty, Queue, range
  12. from celery.result import AsyncResult
  13. class SomeClass(object):
  14. def __init__(self, data):
  15. self.data = data
  16. class test_AMQPBackend:
  17. def setup(self):
  18. self.app.conf.result_cache_max = 100
  19. def create_backend(self, **opts):
  20. opts = dict({'serializer': 'pickle', 'persistent': True}, **opts)
  21. return AMQPBackend(self.app, **opts)
  22. def test_destination_for(self):
  23. b = self.create_backend()
  24. request = Mock()
  25. assert b.destination_for('id', request) == (
  26. b.rkey('id'), request.correlation_id,
  27. )
  28. def test_store_result__no_routing_key(self):
  29. b = self.create_backend()
  30. b.destination_for = Mock()
  31. b.destination_for.return_value = None, None
  32. b.store_result('id', None, states.SUCCESS)
  33. def test_mark_as_done(self):
  34. tb1 = self.create_backend(max_cached_results=1)
  35. tb2 = self.create_backend(max_cached_results=1)
  36. tid = uuid()
  37. tb1.mark_as_done(tid, 42)
  38. assert tb2.get_state(tid) == states.SUCCESS
  39. assert tb2.get_result(tid) == 42
  40. assert tb2._cache.get(tid)
  41. assert tb2.get_result(tid), 42
  42. @pytest.mark.usefixtures('depends_on_current_app')
  43. def test_pickleable(self):
  44. assert loads(dumps(self.create_backend()))
  45. def test_revive(self):
  46. tb = self.create_backend()
  47. tb.revive(None)
  48. def test_is_pickled(self):
  49. tb1 = self.create_backend()
  50. tb2 = self.create_backend()
  51. tid2 = uuid()
  52. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  53. tb1.mark_as_done(tid2, result)
  54. # is serialized properly.
  55. rindb = tb2.get_result(tid2)
  56. assert rindb.get('foo') == 'baz'
  57. assert rindb.get('bar').data == 12345
  58. def test_mark_as_failure(self):
  59. tb1 = self.create_backend()
  60. tb2 = self.create_backend()
  61. tid3 = uuid()
  62. try:
  63. raise KeyError('foo')
  64. except KeyError as exception:
  65. einfo = ExceptionInfo()
  66. tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
  67. assert tb2.get_state(tid3) == states.FAILURE
  68. assert isinstance(tb2.get_result(tid3), KeyError)
  69. assert tb2.get_traceback(tid3) == einfo.traceback
  70. def test_repair_uuid(self):
  71. from celery.backends.amqp import repair_uuid
  72. for i in range(10):
  73. tid = uuid()
  74. assert repair_uuid(tid.replace('-', '')) == tid
  75. def test_expires_is_int(self):
  76. b = self.create_backend(expires=48)
  77. q = b._create_binding('x1y2z3')
  78. assert q.expires == 48
  79. def test_expires_is_float(self):
  80. b = self.create_backend(expires=48.3)
  81. q = b._create_binding('x1y2z3')
  82. assert q.expires == 48.3
  83. def test_expires_is_timedelta(self):
  84. b = self.create_backend(expires=timedelta(minutes=1))
  85. q = b._create_binding('x1y2z3')
  86. assert q.expires == 60
  87. @mock.sleepdeprived()
  88. def test_store_result_retries(self):
  89. iterations = [0]
  90. stop_raising_at = [5]
  91. def publish(*args, **kwargs):
  92. if iterations[0] > stop_raising_at[0]:
  93. return
  94. iterations[0] += 1
  95. raise KeyError('foo')
  96. backend = AMQPBackend(self.app)
  97. from celery.app.amqp import Producer
  98. prod, Producer.publish = Producer.publish, publish
  99. try:
  100. with pytest.raises(KeyError):
  101. backend.retry_policy['max_retries'] = None
  102. backend.store_result('foo', 'bar', 'STARTED')
  103. with pytest.raises(KeyError):
  104. backend.retry_policy['max_retries'] = 10
  105. backend.store_result('foo', 'bar', 'STARTED')
  106. finally:
  107. Producer.publish = prod
  108. def test_poll_no_messages(self):
  109. b = self.create_backend()
  110. assert b.get_task_meta(uuid())['status'] == states.PENDING
  111. @contextmanager
  112. def _result_context(self):
  113. results = Queue()
  114. class Message(object):
  115. acked = 0
  116. requeued = 0
  117. def __init__(self, **merge):
  118. self.payload = dict({'status': states.STARTED,
  119. 'result': None}, **merge)
  120. self.properties = {'correlation_id': merge.get('task_id')}
  121. self.body = pickle.dumps(self.payload)
  122. self.content_type = 'application/x-python-serialize'
  123. self.content_encoding = 'binary'
  124. def ack(self, *args, **kwargs):
  125. self.acked += 1
  126. def requeue(self, *args, **kwargs):
  127. self.requeued += 1
  128. class MockBinding(object):
  129. def __init__(self, *args, **kwargs):
  130. self.channel = Mock()
  131. def __call__(self, *args, **kwargs):
  132. return self
  133. def declare(self):
  134. pass
  135. def get(self, no_ack=False, accept=None):
  136. try:
  137. m = results.get(block=False)
  138. if m:
  139. m.accept = accept
  140. return m
  141. except Empty:
  142. pass
  143. def is_bound(self):
  144. return True
  145. class MockBackend(AMQPBackend):
  146. Queue = MockBinding
  147. backend = MockBackend(self.app, max_cached_results=100)
  148. backend._republish = Mock()
  149. yield results, backend, Message
  150. def test_backlog_limit_exceeded(self):
  151. with self._result_context() as (results, backend, Message):
  152. for i in range(1001):
  153. results.put(Message(task_id='id', status=states.RECEIVED))
  154. with pytest.raises(backend.BacklogLimitExceeded):
  155. backend.get_task_meta('id')
  156. def test_poll_result(self):
  157. with self._result_context() as (results, backend, Message):
  158. tid = uuid()
  159. # FFWD's to the latest state.
  160. state_messages = [
  161. Message(task_id=tid, status=states.RECEIVED, seq=1),
  162. Message(task_id=tid, status=states.STARTED, seq=2),
  163. Message(task_id=tid, status=states.FAILURE, seq=3),
  164. ]
  165. for state_message in state_messages:
  166. results.put(state_message)
  167. r1 = backend.get_task_meta(tid)
  168. # FFWDs to the last state.
  169. assert r1['status'] == states.FAILURE
  170. assert r1['seq'] == 3
  171. # Caches last known state.
  172. tid = uuid()
  173. results.put(Message(task_id=tid))
  174. backend.get_task_meta(tid)
  175. assert tid, backend._cache in 'Caches last known state'
  176. assert state_messages[-1].requeued
  177. # Returns cache if no new states.
  178. results.queue.clear()
  179. assert not results.qsize()
  180. backend._cache[tid] = 'hello'
  181. # returns cache if no new states.
  182. assert backend.get_task_meta(tid) == 'hello'
  183. def test_drain_events_decodes_exceptions_in_meta(self):
  184. tid = uuid()
  185. b = self.create_backend(serializer='json')
  186. b.store_result(tid, RuntimeError('aap'), states.FAILURE)
  187. result = AsyncResult(tid, backend=b)
  188. with pytest.raises(Exception) as excinfo:
  189. result.get()
  190. assert excinfo.value.__class__.__name__ == 'RuntimeError'
  191. assert str(excinfo.value) == 'aap'
  192. def test_no_expires(self):
  193. b = self.create_backend(expires=None)
  194. app = self.app
  195. app.conf.result_expires = None
  196. b = self.create_backend(expires=None)
  197. q = b._create_binding('foo')
  198. assert q.expires is None
  199. def test_process_cleanup(self):
  200. self.create_backend().process_cleanup()
  201. def test_reload_task_result(self):
  202. with pytest.raises(NotImplementedError):
  203. self.create_backend().reload_task_result('x')
  204. def test_reload_group_result(self):
  205. with pytest.raises(NotImplementedError):
  206. self.create_backend().reload_group_result('x')
  207. def test_save_group(self):
  208. with pytest.raises(NotImplementedError):
  209. self.create_backend().save_group('x', 'x')
  210. def test_restore_group(self):
  211. with pytest.raises(NotImplementedError):
  212. self.create_backend().restore_group('x')
  213. def test_delete_group(self):
  214. with pytest.raises(NotImplementedError):
  215. self.create_backend().delete_group('x')