test_database.py 8.2 KB


  1. from __future__ import absolute_import, unicode_literals
  2. from datetime import datetime
  3. from pickle import loads, dumps
  4. from celery import states
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery.utils import uuid
  7. from celery.tests.case import (
  8. AppCase,
  9. Mock,
  10. depends_on_current_app,
  11. patch,
  12. skip_if_pypy,
  13. skip_if_jython,
  14. skip_unless_module,
  15. )
  16. try:
  17. import sqlalchemy # noqa
  18. except ImportError:
  19. DatabaseBackend = Task = TaskSet = retry = None # noqa
  20. SessionManager = session_cleanup = None # noqa
  21. else:
  22. from celery.backends.database import (
  23. DatabaseBackend, retry, session_cleanup,
  24. )
  25. from celery.backends.database import session
  26. from celery.backends.database.session import SessionManager
  27. from celery.backends.database.models import Task, TaskSet
  28. class SomeClass(object):
  29. def __init__(self, data):
  30. self.data = data
  31. @skip_unless_module('sqlalchemy')
  32. class test_session_cleanup(AppCase):
  33. def test_context(self):
  34. session = Mock(name='session')
  35. with session_cleanup(session):
  36. pass
  37. session.close.assert_called_with()
  38. def test_context_raises(self):
  39. session = Mock(name='session')
  40. with self.assertRaises(KeyError):
  41. with session_cleanup(session):
  42. raise KeyError()
  43. session.rollback.assert_called_with()
  44. session.close.assert_called_with()
  45. @skip_unless_module('sqlalchemy')
  46. @skip_if_pypy()
  47. @skip_if_jython()
  48. class test_DatabaseBackend(AppCase):
  49. def setup(self):
  50. self.uri = 'sqlite:///test.db'
  51. self.app.conf.result_serializer = 'pickle'
  52. def test_retry_helper(self):
  53. from celery.backends.database import DatabaseError
  54. calls = [0]
  55. @retry
  56. def raises():
  57. calls[0] += 1
  58. raise DatabaseError(1, 2, 3)
  59. with self.assertRaises(DatabaseError):
  60. raises(max_retries=5)
  61. self.assertEqual(calls[0], 5)
  62. def test_missing_dburi_raises_ImproperlyConfigured(self):
  63. self.app.conf.sqlalchemy_dburi = None
  64. with self.assertRaises(ImproperlyConfigured):
  65. DatabaseBackend(app=self.app)
  66. def test_missing_task_id_is_PENDING(self):
  67. tb = DatabaseBackend(self.uri, app=self.app)
  68. self.assertEqual(tb.get_state('xxx-does-not-exist'), states.PENDING)
  69. def test_missing_task_meta_is_dict_with_pending(self):
  70. tb = DatabaseBackend(self.uri, app=self.app)
  71. self.assertDictContainsSubset({
  72. 'status': states.PENDING,
  73. 'task_id': 'xxx-does-not-exist-at-all',
  74. 'result': None,
  75. 'traceback': None
  76. }, tb.get_task_meta('xxx-does-not-exist-at-all'))
  77. def test_mark_as_done(self):
  78. tb = DatabaseBackend(self.uri, app=self.app)
  79. tid = uuid()
  80. self.assertEqual(tb.get_state(tid), states.PENDING)
  81. self.assertIsNone(tb.get_result(tid))
  82. tb.mark_as_done(tid, 42)
  83. self.assertEqual(tb.get_state(tid), states.SUCCESS)
  84. self.assertEqual(tb.get_result(tid), 42)
  85. def test_is_pickled(self):
  86. tb = DatabaseBackend(self.uri, app=self.app)
  87. tid2 = uuid()
  88. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  89. tb.mark_as_done(tid2, result)
  90. # is serialized properly.
  91. rindb = tb.get_result(tid2)
  92. self.assertEqual(rindb.get('foo'), 'baz')
  93. self.assertEqual(rindb.get('bar').data, 12345)
  94. def test_mark_as_started(self):
  95. tb = DatabaseBackend(self.uri, app=self.app)
  96. tid = uuid()
  97. tb.mark_as_started(tid)
  98. self.assertEqual(tb.get_state(tid), states.STARTED)
  99. def test_mark_as_revoked(self):
  100. tb = DatabaseBackend(self.uri, app=self.app)
  101. tid = uuid()
  102. tb.mark_as_revoked(tid)
  103. self.assertEqual(tb.get_state(tid), states.REVOKED)
  104. def test_mark_as_retry(self):
  105. tb = DatabaseBackend(self.uri, app=self.app)
  106. tid = uuid()
  107. try:
  108. raise KeyError('foo')
  109. except KeyError as exception:
  110. import traceback
  111. trace = '\n'.join(traceback.format_stack())
  112. tb.mark_as_retry(tid, exception, traceback=trace)
  113. self.assertEqual(tb.get_state(tid), states.RETRY)
  114. self.assertIsInstance(tb.get_result(tid), KeyError)
  115. self.assertEqual(tb.get_traceback(tid), trace)
  116. def test_mark_as_failure(self):
  117. tb = DatabaseBackend(self.uri, app=self.app)
  118. tid3 = uuid()
  119. try:
  120. raise KeyError('foo')
  121. except KeyError as exception:
  122. import traceback
  123. trace = '\n'.join(traceback.format_stack())
  124. tb.mark_as_failure(tid3, exception, traceback=trace)
  125. self.assertEqual(tb.get_state(tid3), states.FAILURE)
  126. self.assertIsInstance(tb.get_result(tid3), KeyError)
  127. self.assertEqual(tb.get_traceback(tid3), trace)
  128. def test_forget(self):
  129. tb = DatabaseBackend(self.uri, backend='memory://', app=self.app)
  130. tid = uuid()
  131. tb.mark_as_done(tid, {'foo': 'bar'})
  132. tb.mark_as_done(tid, {'foo': 'bar'})
  133. x = self.app.AsyncResult(tid, backend=tb)
  134. x.forget()
  135. self.assertIsNone(x.result)
  136. def test_process_cleanup(self):
  137. tb = DatabaseBackend(self.uri, app=self.app)
  138. tb.process_cleanup()
  139. @depends_on_current_app
  140. def test_reduce(self):
  141. tb = DatabaseBackend(self.uri, app=self.app)
  142. self.assertTrue(loads(dumps(tb)))
  143. def test_save__restore__delete_group(self):
  144. tb = DatabaseBackend(self.uri, app=self.app)
  145. tid = uuid()
  146. res = {'something': 'special'}
  147. self.assertEqual(tb.save_group(tid, res), res)
  148. res2 = tb.restore_group(tid)
  149. self.assertEqual(res2, res)
  150. tb.delete_group(tid)
  151. self.assertIsNone(tb.restore_group(tid))
  152. self.assertIsNone(tb.restore_group('xxx-nonexisting-id'))
  153. def test_cleanup(self):
  154. tb = DatabaseBackend(self.uri, app=self.app)
  155. for i in range(10):
  156. tb.mark_as_done(uuid(), 42)
  157. tb.save_group(uuid(), {'foo': 'bar'})
  158. s = tb.ResultSession()
  159. for t in s.query(Task).all():
  160. t.date_done = datetime.now() - tb.expires * 2
  161. for t in s.query(TaskSet).all():
  162. t.date_done = datetime.now() - tb.expires * 2
  163. s.commit()
  164. s.close()
  165. tb.cleanup()
  166. def test_Task__repr__(self):
  167. self.assertIn('foo', repr(Task('foo')))
  168. def test_TaskSet__repr__(self):
  169. self.assertIn('foo', repr(TaskSet('foo', None)))
  170. @skip_unless_module('sqlalchemy')
  171. class test_SessionManager(AppCase):
  172. def test_after_fork(self):
  173. s = SessionManager()
  174. self.assertFalse(s.forked)
  175. s._after_fork()
  176. self.assertTrue(s.forked)
  177. @patch('celery.backends.database.session.create_engine')
  178. def test_get_engine_forked(self, create_engine):
  179. s = SessionManager()
  180. s._after_fork()
  181. engine = s.get_engine('dburi', foo=1)
  182. create_engine.assert_called_with('dburi', foo=1)
  183. self.assertIs(engine, create_engine())
  184. engine2 = s.get_engine('dburi', foo=1)
  185. self.assertIs(engine2, engine)
  186. @patch('celery.backends.database.session.sessionmaker')
  187. def test_create_session_forked(self, sessionmaker):
  188. s = SessionManager()
  189. s.get_engine = Mock(name='get_engine')
  190. s._after_fork()
  191. engine, session = s.create_session('dburi', short_lived_sessions=True)
  192. sessionmaker.assert_called_with(bind=s.get_engine())
  193. self.assertIs(session, sessionmaker())
  194. sessionmaker.return_value = Mock(name='new')
  195. engine, session2 = s.create_session('dburi', short_lived_sessions=True)
  196. sessionmaker.assert_called_with(bind=s.get_engine())
  197. self.assertIsNot(session2, session)
  198. sessionmaker.return_value = Mock(name='new2')
  199. engine, session3 = s.create_session(
  200. 'dburi', short_lived_sessions=False)
  201. sessionmaker.assert_called_with(bind=s.get_engine())
  202. self.assertIs(session3, session2)
  203. def test_coverage_madness(self):
  204. prev, session.register_after_fork = (
  205. session.register_after_fork, None,
  206. )
  207. try:
  208. SessionManager()
  209. finally:
  210. session.register_after_fork = prev