test_base.py 21 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import sys
  3. import types
  4. from contextlib import contextmanager
  5. from celery.exceptions import ChordError, TimeoutError
  6. from celery.five import items, bytes_if_py2, range
  7. from celery.utils import serialization
  8. from celery.utils.serialization import subclass_exception
  9. from celery.utils.serialization import find_pickleable_exception as fnpe
  10. from celery.utils.serialization import UnpickleableExceptionWrapper
  11. from celery.utils.serialization import get_pickleable_exception as gpe
  12. from celery import states
  13. from celery import group, uuid
  14. from celery.backends.base import (
  15. BaseBackend,
  16. KeyValueStoreBackend,
  17. DisabledBackend,
  18. _nulldict,
  19. )
  20. from celery.result import result_from_tuple
  21. from celery.utils.functional import pass1
  22. from celery.tests.case import ANY, AppCase, Case, Mock, call, patch, skip
  23. class wrapobject(object):
  24. def __init__(self, *args, **kwargs):
  25. self.args = args
  26. if sys.version_info[0] == 3 or getattr(sys, 'pypy_version_info', None):
  27. Oldstyle = None
  28. else:
  29. Oldstyle = types.ClassType(bytes_if_py2('Oldstyle'), (), {})
  30. Unpickleable = subclass_exception(
  31. bytes_if_py2('Unpickleable'), KeyError, 'foo.module',
  32. )
  33. Impossible = subclass_exception(
  34. bytes_if_py2('Impossible'), object, 'foo.module',
  35. )
  36. Lookalike = subclass_exception(
  37. bytes_if_py2('Lookalike'), wrapobject, 'foo.module',
  38. )
  39. class test_nulldict(Case):
  40. def test_nulldict(self):
  41. x = _nulldict()
  42. x['foo'] = 1
  43. x.update(foo=1, bar=2)
  44. x.setdefault('foo', 3)
  45. class test_serialization(AppCase):
  46. def test_create_exception_cls(self):
  47. self.assertTrue(serialization.create_exception_cls('FooError', 'm'))
  48. self.assertTrue(serialization.create_exception_cls('FooError', 'm',
  49. KeyError))
  50. class test_BaseBackend_interface(AppCase):
  51. def setup(self):
  52. self.b = BaseBackend(self.app)
  53. def test__forget(self):
  54. with self.assertRaises(NotImplementedError):
  55. self.b._forget('SOMExx-N0Nex1stant-IDxx-')
  56. def test_forget(self):
  57. with self.assertRaises(NotImplementedError):
  58. self.b.forget('SOMExx-N0nex1stant-IDxx-')
  59. def test_on_chord_part_return(self):
  60. self.b.on_chord_part_return(None, None, None)
  61. def test_apply_chord(self, unlock='celery.chord_unlock'):
  62. self.app.tasks[unlock] = Mock()
  63. self.b.apply_chord(
  64. group(app=self.app), (), 'dakj221', None,
  65. result=[self.app.AsyncResult(x) for x in [1, 2, 3]],
  66. )
  67. self.assertTrue(self.app.tasks[unlock].apply_async.call_count)
  68. class test_exception_pickle(AppCase):
  69. @skip.if_python3(reason='does not support old style classes')
  70. @skip.if_pypy()
  71. def test_oldstyle(self):
  72. self.assertTrue(fnpe(Oldstyle()))
  73. def test_BaseException(self):
  74. self.assertIsNone(fnpe(Exception()))
  75. def test_get_pickleable_exception(self):
  76. exc = Exception('foo')
  77. self.assertEqual(gpe(exc), exc)
  78. def test_unpickleable(self):
  79. self.assertIsInstance(fnpe(Unpickleable()), KeyError)
  80. self.assertIsNone(fnpe(Impossible()))
  81. class test_prepare_exception(AppCase):
  82. def setup(self):
  83. self.b = BaseBackend(self.app)
  84. def test_unpickleable(self):
  85. self.b.serializer = 'pickle'
  86. x = self.b.prepare_exception(Unpickleable(1, 2, 'foo'))
  87. self.assertIsInstance(x, KeyError)
  88. y = self.b.exception_to_python(x)
  89. self.assertIsInstance(y, KeyError)
  90. def test_impossible(self):
  91. self.b.serializer = 'pickle'
  92. x = self.b.prepare_exception(Impossible())
  93. self.assertIsInstance(x, UnpickleableExceptionWrapper)
  94. self.assertTrue(str(x))
  95. y = self.b.exception_to_python(x)
  96. self.assertEqual(y.__class__.__name__, 'Impossible')
  97. if sys.version_info < (2, 5):
  98. self.assertTrue(y.__class__.__module__)
  99. else:
  100. self.assertEqual(y.__class__.__module__, 'foo.module')
  101. def test_regular(self):
  102. self.b.serializer = 'pickle'
  103. x = self.b.prepare_exception(KeyError('baz'))
  104. self.assertIsInstance(x, KeyError)
  105. y = self.b.exception_to_python(x)
  106. self.assertIsInstance(y, KeyError)
  107. class KVBackend(KeyValueStoreBackend):
  108. mget_returns_dict = False
  109. def __init__(self, app, *args, **kwargs):
  110. self.db = {}
  111. super(KVBackend, self).__init__(app)
  112. def get(self, key):
  113. return self.db.get(key)
  114. def set(self, key, value):
  115. self.db[key] = value
  116. def mget(self, keys):
  117. if self.mget_returns_dict:
  118. return {key: self.get(key) for key in keys}
  119. else:
  120. return [self.get(k) for k in keys]
  121. def delete(self, key):
  122. self.db.pop(key, None)
  123. class DictBackend(BaseBackend):
  124. def __init__(self, *args, **kwargs):
  125. BaseBackend.__init__(self, *args, **kwargs)
  126. self._data = {'can-delete': {'result': 'foo'}}
  127. def _restore_group(self, group_id):
  128. if group_id == 'exists':
  129. return {'result': 'group'}
  130. def _get_task_meta_for(self, task_id):
  131. if task_id == 'task-exists':
  132. return {'result': 'task'}
  133. def _delete_group(self, group_id):
  134. self._data.pop(group_id, None)
  135. class test_BaseBackend_dict(AppCase):
  136. def setup(self):
  137. self.b = DictBackend(app=self.app)
  138. def test_delete_group(self):
  139. self.b.delete_group('can-delete')
  140. self.assertNotIn('can-delete', self.b._data)
  141. def test_prepare_exception_json(self):
  142. x = DictBackend(self.app, serializer='json')
  143. e = x.prepare_exception(KeyError('foo'))
  144. self.assertIn('exc_type', e)
  145. e = x.exception_to_python(e)
  146. self.assertEqual(e.__class__.__name__, 'KeyError')
  147. self.assertEqual(str(e).strip('u'), "'foo'")
  148. def test_save_group(self):
  149. b = BaseBackend(self.app)
  150. b._save_group = Mock()
  151. b.save_group('foofoo', 'xxx')
  152. b._save_group.assert_called_with('foofoo', 'xxx')
  153. def test_add_to_chord_interface(self):
  154. b = BaseBackend(self.app)
  155. with self.assertRaises(NotImplementedError):
  156. b.add_to_chord('group_id', 'sig')
  157. def test_forget_interface(self):
  158. b = BaseBackend(self.app)
  159. with self.assertRaises(NotImplementedError):
  160. b.forget('foo')
  161. def test_restore_group(self):
  162. self.assertIsNone(self.b.restore_group('missing'))
  163. self.assertIsNone(self.b.restore_group('missing'))
  164. self.assertEqual(self.b.restore_group('exists'), 'group')
  165. self.assertEqual(self.b.restore_group('exists'), 'group')
  166. self.assertEqual(self.b.restore_group('exists', cache=False), 'group')
  167. def test_reload_group_result(self):
  168. self.b._cache = {}
  169. self.b.reload_group_result('exists')
  170. self.b._cache['exists'] = {'result': 'group'}
  171. def test_reload_task_result(self):
  172. self.b._cache = {}
  173. self.b.reload_task_result('task-exists')
  174. self.b._cache['task-exists'] = {'result': 'task'}
  175. def test_fail_from_current_stack(self):
  176. self.b.mark_as_failure = Mock()
  177. try:
  178. raise KeyError('foo')
  179. except KeyError as exc:
  180. self.b.fail_from_current_stack('task_id')
  181. self.b.mark_as_failure.assert_called()
  182. args = self.b.mark_as_failure.call_args[0]
  183. self.assertEqual(args[0], 'task_id')
  184. self.assertIs(args[1], exc)
  185. self.assertTrue(args[2])
  186. def test_prepare_value_serializes_group_result(self):
  187. self.b.serializer = 'json'
  188. g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')])
  189. v = self.b.prepare_value(g)
  190. self.assertIsInstance(v, (list, tuple))
  191. self.assertEqual(result_from_tuple(v, app=self.app), g)
  192. v2 = self.b.prepare_value(g[0])
  193. self.assertIsInstance(v2, (list, tuple))
  194. self.assertEqual(result_from_tuple(v2, app=self.app), g[0])
  195. self.b.serializer = 'pickle'
  196. self.assertIsInstance(self.b.prepare_value(g), self.app.GroupResult)
  197. def test_is_cached(self):
  198. b = BaseBackend(app=self.app, max_cached_results=1)
  199. b._cache['foo'] = 1
  200. self.assertTrue(b.is_cached('foo'))
  201. self.assertFalse(b.is_cached('false'))
  202. def test_mark_as_done__chord(self):
  203. b = BaseBackend(app=self.app)
  204. b._store_result = Mock()
  205. request = Mock(name='request')
  206. b.on_chord_part_return = Mock()
  207. b.mark_as_done('id', 10, request=request)
  208. b.on_chord_part_return.assert_called_with(request, states.SUCCESS, 10)
  209. def test_mark_as_failure__chord(self):
  210. b = BaseBackend(app=self.app)
  211. b._store_result = Mock()
  212. request = Mock(name='request')
  213. request.errbacks = []
  214. b.on_chord_part_return = Mock()
  215. exc = KeyError()
  216. b.mark_as_failure('id', exc, request=request)
  217. b.on_chord_part_return.assert_called_with(request, states.FAILURE, exc)
  218. def test_mark_as_revoked__chord(self):
  219. b = BaseBackend(app=self.app)
  220. b._store_result = Mock()
  221. request = Mock(name='request')
  222. request.errbacks = []
  223. b.on_chord_part_return = Mock()
  224. b.mark_as_revoked('id', 'revoked', request=request)
  225. b.on_chord_part_return.assert_called_with(request, states.REVOKED, ANY)
  226. def test_chord_error_from_stack_raises(self):
  227. b = BaseBackend(app=self.app)
  228. exc = KeyError()
  229. callback = Mock(name='callback')
  230. callback.options = {'link_error': []}
  231. task = self.app.tasks[callback.task] = Mock()
  232. b.fail_from_current_stack = Mock()
  233. group = self.patch('celery.group')
  234. group.side_effect = exc
  235. b.chord_error_from_stack(callback, exc=ValueError())
  236. task.backend.fail_from_current_stack.assert_called_with(
  237. callback.id, exc=exc)
  238. def test_exception_to_python_when_None(self):
  239. b = BaseBackend(app=self.app)
  240. self.assertIsNone(b.exception_to_python(None))
  241. def test_wait_for__on_interval(self):
  242. self.patch('time.sleep')
  243. b = BaseBackend(app=self.app)
  244. b._get_task_meta_for = Mock()
  245. b._get_task_meta_for.return_value = {'status': states.PENDING}
  246. callback = Mock(name='callback')
  247. with self.assertRaises(TimeoutError):
  248. b.wait_for(task_id='1', on_interval=callback, timeout=1)
  249. callback.assert_called_with()
  250. b._get_task_meta_for.return_value = {'status': states.SUCCESS}
  251. b.wait_for(task_id='1', timeout=None)
  252. def test_get_children(self):
  253. b = BaseBackend(app=self.app)
  254. b._get_task_meta_for = Mock()
  255. b._get_task_meta_for.return_value = {}
  256. self.assertIsNone(b.get_children('id'))
  257. b._get_task_meta_for.return_value = {'children': 3}
  258. self.assertEqual(b.get_children('id'), 3)
  259. class test_KeyValueStoreBackend(AppCase):
  260. def setup(self):
  261. self.b = KVBackend(app=self.app)
  262. def test_on_chord_part_return(self):
  263. assert not self.b.implements_incr
  264. self.b.on_chord_part_return(None, None, None)
  265. def test_get_store_delete_result(self):
  266. tid = uuid()
  267. self.b.mark_as_done(tid, 'Hello world')
  268. self.assertEqual(self.b.get_result(tid), 'Hello world')
  269. self.assertEqual(self.b.get_state(tid), states.SUCCESS)
  270. self.b.forget(tid)
  271. self.assertEqual(self.b.get_state(tid), states.PENDING)
  272. def test_strip_prefix(self):
  273. x = self.b.get_key_for_task('x1b34')
  274. self.assertEqual(self.b._strip_prefix(x), 'x1b34')
  275. self.assertEqual(self.b._strip_prefix('x1b34'), 'x1b34')
  276. def test_get_many(self):
  277. for is_dict in True, False:
  278. self.b.mget_returns_dict = is_dict
  279. ids = {uuid(): i for i in range(10)}
  280. for id, i in items(ids):
  281. self.b.mark_as_done(id, i)
  282. it = self.b.get_many(list(ids))
  283. for i, (got_id, got_state) in enumerate(it):
  284. self.assertEqual(got_state['result'], ids[got_id])
  285. self.assertEqual(i, 9)
  286. self.assertTrue(list(self.b.get_many(list(ids))))
  287. self.b._cache.clear()
  288. callback = Mock(name='callback')
  289. it = self.b.get_many(list(ids), on_message=callback)
  290. for i, (got_id, got_state) in enumerate(it):
  291. self.assertEqual(got_state['result'], ids[got_id])
  292. self.assertEqual(i, 9)
  293. self.assertTrue(list(self.b.get_many(list(ids))))
  294. callback.assert_has_calls([
  295. call(ANY) for id in ids
  296. ])
  297. def test_get_many_times_out(self):
  298. tasks = [uuid() for _ in range(4)]
  299. self.b._cache[tasks[1]] = {'status': 'PENDING'}
  300. with self.assertRaises(self.b.TimeoutError):
  301. list(self.b.get_many(tasks, timeout=0.01, interval=0.01))
  302. def test_chord_part_return_no_gid(self):
  303. self.b.implements_incr = True
  304. task = Mock()
  305. state = 'SUCCESS'
  306. result = 10
  307. task.request.group = None
  308. self.b.get_key_for_chord = Mock()
  309. self.b.get_key_for_chord.side_effect = AssertionError(
  310. 'should not get here',
  311. )
  312. self.assertIsNone(
  313. self.b.on_chord_part_return(task.request, state, result),
  314. )
  315. @patch('celery.backends.base.GroupResult')
  316. @patch('celery.backends.base.maybe_signature')
  317. def test_chord_part_return_restore_raises(self, maybe_signature,
  318. GroupResult):
  319. self.b.implements_incr = True
  320. GroupResult.restore.side_effect = KeyError()
  321. self.b.chord_error_from_stack = Mock()
  322. callback = Mock(name='callback')
  323. request = Mock(name='request')
  324. request.group = 'gid'
  325. maybe_signature.return_value = callback
  326. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  327. self.b.chord_error_from_stack.assert_called_with(
  328. callback, ANY,
  329. )
  330. @patch('celery.backends.base.GroupResult')
  331. @patch('celery.backends.base.maybe_signature')
  332. def test_chord_part_return_restore_empty(self, maybe_signature,
  333. GroupResult):
  334. self.b.implements_incr = True
  335. GroupResult.restore.return_value = None
  336. self.b.chord_error_from_stack = Mock()
  337. callback = Mock(name='callback')
  338. request = Mock(name='request')
  339. request.group = 'gid'
  340. maybe_signature.return_value = callback
  341. self.b.on_chord_part_return(request, states.SUCCESS, 10)
  342. self.b.chord_error_from_stack.assert_called_with(
  343. callback, ANY,
  344. )
  345. def test_filter_ready(self):
  346. self.b.decode_result = Mock()
  347. self.b.decode_result.side_effect = pass1
  348. self.assertEqual(
  349. len(list(self.b._filter_ready([
  350. (1, {'status': states.RETRY}),
  351. (2, {'status': states.FAILURE}),
  352. (3, {'status': states.SUCCESS}),
  353. ]))),
  354. 2,
  355. )
  356. @contextmanager
  357. def _chord_part_context(self, b):
  358. @self.app.task(shared=False)
  359. def callback(result):
  360. pass
  361. b.implements_incr = True
  362. b.client = Mock()
  363. with patch('celery.backends.base.GroupResult') as GR:
  364. deps = GR.restore.return_value = Mock(name='DEPS')
  365. deps.__len__ = Mock()
  366. deps.__len__.return_value = 10
  367. b.incr = Mock()
  368. b.incr.return_value = 10
  369. b.expire = Mock()
  370. task = Mock()
  371. task.request.group = 'grid'
  372. cb = task.request.chord = callback.s()
  373. task.request.chord.freeze()
  374. callback.backend = b
  375. callback.backend.fail_from_current_stack = Mock()
  376. yield task, deps, cb
  377. def test_chord_part_return_propagate_set(self):
  378. with self._chord_part_context(self.b) as (task, deps, _):
  379. self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
  380. self.b.expire.assert_not_called()
  381. deps.delete.assert_called_with()
  382. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  383. def test_chord_part_return_propagate_default(self):
  384. with self._chord_part_context(self.b) as (task, deps, _):
  385. self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
  386. self.b.expire.assert_not_called()
  387. deps.delete.assert_called_with()
  388. deps.join_native.assert_called_with(propagate=True, timeout=3.0)
  389. def test_chord_part_return_join_raises_internal(self):
  390. with self._chord_part_context(self.b) as (task, deps, callback):
  391. deps._failed_join_report = lambda: iter([])
  392. deps.join_native.side_effect = KeyError('foo')
  393. self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
  394. self.b.fail_from_current_stack.assert_called()
  395. args = self.b.fail_from_current_stack.call_args
  396. exc = args[1]['exc']
  397. self.assertIsInstance(exc, ChordError)
  398. self.assertIn('foo', str(exc))
  399. def test_chord_part_return_join_raises_task(self):
  400. b = KVBackend(serializer='pickle', app=self.app)
  401. with self._chord_part_context(b) as (task, deps, callback):
  402. deps._failed_join_report = lambda: iter([
  403. self.app.AsyncResult('culprit'),
  404. ])
  405. deps.join_native.side_effect = KeyError('foo')
  406. b.on_chord_part_return(task.request, 'SUCCESS', 10)
  407. b.fail_from_current_stack.assert_called()
  408. args = b.fail_from_current_stack.call_args
  409. exc = args[1]['exc']
  410. self.assertIsInstance(exc, ChordError)
  411. self.assertIn('Dependency culprit raised', str(exc))
  412. def test_restore_group_from_json(self):
  413. b = KVBackend(serializer='json', app=self.app)
  414. g = self.app.GroupResult(
  415. 'group_id',
  416. [self.app.AsyncResult('a'), self.app.AsyncResult('b')],
  417. )
  418. b._save_group(g.id, g)
  419. g2 = b._restore_group(g.id)['result']
  420. self.assertEqual(g2, g)
  421. def test_restore_group_from_pickle(self):
  422. b = KVBackend(serializer='pickle', app=self.app)
  423. g = self.app.GroupResult(
  424. 'group_id',
  425. [self.app.AsyncResult('a'), self.app.AsyncResult('b')],
  426. )
  427. b._save_group(g.id, g)
  428. g2 = b._restore_group(g.id)['result']
  429. self.assertEqual(g2, g)
  430. def test_chord_apply_fallback(self):
  431. self.b.implements_incr = False
  432. self.b.fallback_chord_unlock = Mock()
  433. self.b.apply_chord(
  434. group(app=self.app), (), 'group_id', 'body',
  435. result='result', foo=1,
  436. )
  437. self.b.fallback_chord_unlock.assert_called_with(
  438. 'group_id', 'body', result='result', foo=1,
  439. )
  440. def test_get_missing_meta(self):
  441. self.assertIsNone(self.b.get_result('xxx-missing'))
  442. self.assertEqual(self.b.get_state('xxx-missing'), states.PENDING)
  443. def test_save_restore_delete_group(self):
  444. tid = uuid()
  445. tsr = self.app.GroupResult(
  446. tid, [self.app.AsyncResult(uuid()) for _ in range(10)],
  447. )
  448. self.b.save_group(tid, tsr)
  449. self.b.restore_group(tid)
  450. self.assertEqual(self.b.restore_group(tid), tsr)
  451. self.b.delete_group(tid)
  452. self.assertIsNone(self.b.restore_group(tid))
  453. def test_restore_missing_group(self):
  454. self.assertIsNone(self.b.restore_group('xxx-nonexistant'))
  455. class test_KeyValueStoreBackend_interface(AppCase):
  456. def test_get(self):
  457. with self.assertRaises(NotImplementedError):
  458. KeyValueStoreBackend(self.app).get('a')
  459. def test_set(self):
  460. with self.assertRaises(NotImplementedError):
  461. KeyValueStoreBackend(self.app).set('a', 1)
  462. def test_incr(self):
  463. with self.assertRaises(NotImplementedError):
  464. KeyValueStoreBackend(self.app).incr('a')
  465. def test_cleanup(self):
  466. self.assertFalse(KeyValueStoreBackend(self.app).cleanup())
  467. def test_delete(self):
  468. with self.assertRaises(NotImplementedError):
  469. KeyValueStoreBackend(self.app).delete('a')
  470. def test_mget(self):
  471. with self.assertRaises(NotImplementedError):
  472. KeyValueStoreBackend(self.app).mget(['a'])
  473. def test_forget(self):
  474. with self.assertRaises(NotImplementedError):
  475. KeyValueStoreBackend(self.app).forget('a')
  476. class test_DisabledBackend(AppCase):
  477. def test_store_result(self):
  478. DisabledBackend(self.app).store_result()
  479. def test_is_disabled(self):
  480. with self.assertRaises(NotImplementedError):
  481. DisabledBackend(self.app).get_state('foo')
  482. def test_as_uri(self):
  483. self.assertEqual(DisabledBackend(self.app).as_uri(), 'disabled://')
  484. class test_as_uri(AppCase):
  485. def setup(self):
  486. self.b = BaseBackend(
  487. app=self.app,
  488. url='sch://uuuu:pwpw@hostname.dom'
  489. )
  490. def test_as_uri_include_password(self):
  491. self.assertEqual(self.b.as_uri(True), self.b.url)
  492. def test_as_uri_exclude_password(self):
  493. self.assertEqual(self.b.as_uri(), 'sch://uuuu:**@hostname.dom/')