test_amqp.py 9.1 KB


  1. from __future__ import absolute_import
  2. import pickle
  3. from contextlib import contextmanager
  4. from datetime import timedelta
  5. from pickle import dumps, loads
  6. from billiard.einfo import ExceptionInfo
  7. from celery import states
  8. from celery.backends.amqp import AMQPBackend
  9. from celery.five import Empty, Queue, range
  10. from celery.result import AsyncResult
  11. from celery.utils import uuid
  12. from celery.tests.case import (
  13. AppCase, Mock, depends_on_current_app, sleepdeprived,
  14. )
  15. class SomeClass(object):
  16. def __init__(self, data):
  17. self.data = data
  18. class test_AMQPBackend(AppCase):
  19. def setup(self):
  20. self.app.conf.result_cache_max = 100
  21. def create_backend(self, **opts):
  22. opts = dict(dict(serializer='pickle', persistent=True), **opts)
  23. return AMQPBackend(self.app, **opts)
  24. def test_destination_for(self):
  25. b = self.create_backend()
  26. request = Mock()
  27. self.assertTupleEqual(
  28. b.destination_for('id', request),
  29. (b.rkey('id'), request.correlation_id),
  30. )
  31. def test_store_result__no_routing_key(self):
  32. b = self.create_backend()
  33. b.destination_for = Mock()
  34. b.destination_for.return_value = None, None
  35. b.store_result('id', None, states.SUCCESS)
  36. def test_mark_as_done(self):
  37. tb1 = self.create_backend(max_cached_results=1)
  38. tb2 = self.create_backend(max_cached_results=1)
  39. tid = uuid()
  40. tb1.mark_as_done(tid, 42)
  41. self.assertEqual(tb2.get_state(tid), states.SUCCESS)
  42. self.assertEqual(tb2.get_result(tid), 42)
  43. self.assertTrue(tb2._cache.get(tid))
  44. self.assertTrue(tb2.get_result(tid), 42)
  45. @depends_on_current_app
  46. def test_pickleable(self):
  47. self.assertTrue(loads(dumps(self.create_backend())))
  48. def test_revive(self):
  49. tb = self.create_backend()
  50. tb.revive(None)
  51. def test_is_pickled(self):
  52. tb1 = self.create_backend()
  53. tb2 = self.create_backend()
  54. tid2 = uuid()
  55. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  56. tb1.mark_as_done(tid2, result)
  57. # is serialized properly.
  58. rindb = tb2.get_result(tid2)
  59. self.assertEqual(rindb.get('foo'), 'baz')
  60. self.assertEqual(rindb.get('bar').data, 12345)
  61. def test_mark_as_failure(self):
  62. tb1 = self.create_backend()
  63. tb2 = self.create_backend()
  64. tid3 = uuid()
  65. try:
  66. raise KeyError('foo')
  67. except KeyError as exception:
  68. einfo = ExceptionInfo()
  69. tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
  70. self.assertEqual(tb2.get_state(tid3), states.FAILURE)
  71. self.assertIsInstance(tb2.get_result(tid3), KeyError)
  72. self.assertEqual(tb2.get_traceback(tid3), einfo.traceback)
  73. def test_repair_uuid(self):
  74. from celery.backends.amqp import repair_uuid
  75. for i in range(10):
  76. tid = uuid()
  77. self.assertEqual(repair_uuid(tid.replace('-', '')), tid)
  78. def test_expires_is_int(self):
  79. b = self.create_backend(expires=48)
  80. self.assertEqual(b.queue_arguments.get('x-expires'), 48 * 1000.0)
  81. def test_expires_is_float(self):
  82. b = self.create_backend(expires=48.3)
  83. self.assertEqual(b.queue_arguments.get('x-expires'), 48.3 * 1000.0)
  84. def test_expires_is_timedelta(self):
  85. b = self.create_backend(expires=timedelta(minutes=1))
  86. self.assertEqual(b.queue_arguments.get('x-expires'), 60 * 1000.0)
  87. @sleepdeprived()
  88. def test_store_result_retries(self):
  89. iterations = [0]
  90. stop_raising_at = [5]
  91. def publish(*args, **kwargs):
  92. if iterations[0] > stop_raising_at[0]:
  93. return
  94. iterations[0] += 1
  95. raise KeyError('foo')
  96. backend = AMQPBackend(self.app)
  97. from celery.app.amqp import Producer
  98. prod, Producer.publish = Producer.publish, publish
  99. try:
  100. with self.assertRaises(KeyError):
  101. backend.retry_policy['max_retries'] = None
  102. backend.store_result('foo', 'bar', 'STARTED')
  103. with self.assertRaises(KeyError):
  104. backend.retry_policy['max_retries'] = 10
  105. backend.store_result('foo', 'bar', 'STARTED')
  106. finally:
  107. Producer.publish = prod
  108. def assertState(self, retval, state):
  109. self.assertEqual(retval['status'], state)
  110. def test_poll_no_messages(self):
  111. b = self.create_backend()
  112. self.assertState(b.get_task_meta(uuid()), states.PENDING)
  113. @contextmanager
  114. def _result_context(self):
  115. results = Queue()
  116. class Message(object):
  117. acked = 0
  118. requeued = 0
  119. def __init__(self, **merge):
  120. self.payload = dict({'status': states.STARTED,
  121. 'result': None}, **merge)
  122. self.properties = {'correlation_id': merge.get('task_id')}
  123. self.body = pickle.dumps(self.payload)
  124. self.content_type = 'application/x-python-serialize'
  125. self.content_encoding = 'binary'
  126. def ack(self, *args, **kwargs):
  127. self.acked += 1
  128. def requeue(self, *args, **kwargs):
  129. self.requeued += 1
  130. class MockBinding(object):
  131. def __init__(self, *args, **kwargs):
  132. self.channel = Mock()
  133. def __call__(self, *args, **kwargs):
  134. return self
  135. def declare(self):
  136. pass
  137. def get(self, no_ack=False, accept=None):
  138. try:
  139. m = results.get(block=False)
  140. if m:
  141. m.accept = accept
  142. return m
  143. except Empty:
  144. pass
  145. def is_bound(self):
  146. return True
  147. class MockBackend(AMQPBackend):
  148. Queue = MockBinding
  149. backend = MockBackend(self.app, max_cached_results=100)
  150. backend._republish = Mock()
  151. yield results, backend, Message
  152. def test_backlog_limit_exceeded(self):
  153. with self._result_context() as (results, backend, Message):
  154. for i in range(1001):
  155. results.put(Message(task_id='id', status=states.RECEIVED))
  156. with self.assertRaises(backend.BacklogLimitExceeded):
  157. backend.get_task_meta('id')
  158. def test_poll_result(self):
  159. with self._result_context() as (results, backend, Message):
  160. tid = uuid()
  161. # FFWD's to the latest state.
  162. state_messages = [
  163. Message(task_id=tid, status=states.RECEIVED, seq=1),
  164. Message(task_id=tid, status=states.STARTED, seq=2),
  165. Message(task_id=tid, status=states.FAILURE, seq=3),
  166. ]
  167. for state_message in state_messages:
  168. results.put(state_message)
  169. r1 = backend.get_task_meta(tid)
  170. self.assertDictContainsSubset(
  171. {'status': states.FAILURE, 'seq': 3}, r1,
  172. 'FFWDs to the last state',
  173. )
  174. # Caches last known state.
  175. tid = uuid()
  176. results.put(Message(task_id=tid))
  177. backend.get_task_meta(tid)
  178. self.assertIn(tid, backend._cache, 'Caches last known state')
  179. self.assertTrue(state_messages[-1].requeued)
  180. # Returns cache if no new states.
  181. results.queue.clear()
  182. assert not results.qsize()
  183. backend._cache[tid] = 'hello'
  184. self.assertEqual(
  185. backend.get_task_meta(tid), 'hello',
  186. 'Returns cache if no new states',
  187. )
  188. def test_drain_events_decodes_exceptions_in_meta(self):
  189. tid = uuid()
  190. b = self.create_backend(serializer='json')
  191. b.store_result(tid, RuntimeError('aap'), states.FAILURE)
  192. result = AsyncResult(tid, backend=b)
  193. with self.assertRaises(Exception) as cm:
  194. result.get()
  195. self.assertEqual(cm.exception.__class__.__name__, 'RuntimeError')
  196. self.assertEqual(str(cm.exception), 'aap')
  197. def test_no_expires(self):
  198. b = self.create_backend(expires=None)
  199. app = self.app
  200. app.conf.result_expires = None
  201. b = self.create_backend(expires=None)
  202. with self.assertRaises(KeyError):
  203. b.queue_arguments['x-expires']
  204. def test_process_cleanup(self):
  205. self.create_backend().process_cleanup()
  206. def test_reload_task_result(self):
  207. with self.assertRaises(NotImplementedError):
  208. self.create_backend().reload_task_result('x')
  209. def test_reload_group_result(self):
  210. with self.assertRaises(NotImplementedError):
  211. self.create_backend().reload_group_result('x')
  212. def test_save_group(self):
  213. with self.assertRaises(NotImplementedError):
  214. self.create_backend().save_group('x', 'x')
  215. def test_restore_group(self):
  216. with self.assertRaises(NotImplementedError):
  217. self.create_backend().restore_group('x')
  218. def test_delete_group(self):
  219. with self.assertRaises(NotImplementedError):
  220. self.create_backend().delete_group('x')