test_database.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from __future__ import absolute_import, unicode_literals
  2. from datetime import datetime
  3. from pickle import dumps, loads
  4. import pytest
  5. from case import Mock, patch, skip
  6. from celery import states, uuid
  7. from celery.exceptions import ImproperlyConfigured
  8. try:
  9. import sqlalchemy # noqa
  10. except ImportError:
  11. DatabaseBackend = Task = TaskSet = retry = None # noqa
  12. SessionManager = session_cleanup = None # noqa
  13. else:
  14. from celery.backends.database import (
  15. DatabaseBackend, retry, session_cleanup,
  16. )
  17. from celery.backends.database import session
  18. from celery.backends.database.session import SessionManager
  19. from celery.backends.database.models import Task, TaskSet
  20. class SomeClass(object):
  21. def __init__(self, data):
  22. self.data = data
  23. @skip.unless_module('sqlalchemy')
  24. class test_session_cleanup:
  25. def test_context(self):
  26. session = Mock(name='session')
  27. with session_cleanup(session):
  28. pass
  29. session.close.assert_called_with()
  30. def test_context_raises(self):
  31. session = Mock(name='session')
  32. with pytest.raises(KeyError):
  33. with session_cleanup(session):
  34. raise KeyError()
  35. session.rollback.assert_called_with()
  36. session.close.assert_called_with()
  37. @skip.unless_module('sqlalchemy')
  38. @skip.if_pypy()
  39. @skip.if_jython()
  40. class test_DatabaseBackend:
  41. def setup(self):
  42. self.uri = 'sqlite:///test.db'
  43. self.app.conf.result_serializer = 'pickle'
  44. def test_retry_helper(self):
  45. from celery.backends.database import DatabaseError
  46. calls = [0]
  47. @retry
  48. def raises():
  49. calls[0] += 1
  50. raise DatabaseError(1, 2, 3)
  51. with pytest.raises(DatabaseError):
  52. raises(max_retries=5)
  53. assert calls[0] == 5
  54. def test_missing_dburi_raises_ImproperlyConfigured(self):
  55. self.app.conf.database_url = None
  56. with pytest.raises(ImproperlyConfigured):
  57. DatabaseBackend(app=self.app)
  58. def test_missing_task_id_is_PENDING(self):
  59. tb = DatabaseBackend(self.uri, app=self.app)
  60. assert tb.get_state('xxx-does-not-exist') == states.PENDING
  61. def test_missing_task_meta_is_dict_with_pending(self):
  62. tb = DatabaseBackend(self.uri, app=self.app)
  63. meta = tb.get_task_meta('xxx-does-not-exist-at-all')
  64. assert meta['status'] == states.PENDING
  65. assert meta['task_id'] == 'xxx-does-not-exist-at-all'
  66. assert meta['result'] is None
  67. assert meta['traceback'] is None
  68. def test_mark_as_done(self):
  69. tb = DatabaseBackend(self.uri, app=self.app)
  70. tid = uuid()
  71. assert tb.get_state(tid) == states.PENDING
  72. assert tb.get_result(tid) is None
  73. tb.mark_as_done(tid, 42)
  74. assert tb.get_state(tid) == states.SUCCESS
  75. assert tb.get_result(tid) == 42
  76. def test_is_pickled(self):
  77. tb = DatabaseBackend(self.uri, app=self.app)
  78. tid2 = uuid()
  79. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  80. tb.mark_as_done(tid2, result)
  81. # is serialized properly.
  82. rindb = tb.get_result(tid2)
  83. assert rindb.get('foo') == 'baz'
  84. assert rindb.get('bar').data == 12345
  85. def test_mark_as_started(self):
  86. tb = DatabaseBackend(self.uri, app=self.app)
  87. tid = uuid()
  88. tb.mark_as_started(tid)
  89. assert tb.get_state(tid) == states.STARTED
  90. def test_mark_as_revoked(self):
  91. tb = DatabaseBackend(self.uri, app=self.app)
  92. tid = uuid()
  93. tb.mark_as_revoked(tid)
  94. assert tb.get_state(tid) == states.REVOKED
  95. def test_mark_as_retry(self):
  96. tb = DatabaseBackend(self.uri, app=self.app)
  97. tid = uuid()
  98. try:
  99. raise KeyError('foo')
  100. except KeyError as exception:
  101. import traceback
  102. trace = '\n'.join(traceback.format_stack())
  103. tb.mark_as_retry(tid, exception, traceback=trace)
  104. assert tb.get_state(tid) == states.RETRY
  105. assert isinstance(tb.get_result(tid), KeyError)
  106. assert tb.get_traceback(tid) == trace
  107. def test_mark_as_failure(self):
  108. tb = DatabaseBackend(self.uri, app=self.app)
  109. tid3 = uuid()
  110. try:
  111. raise KeyError('foo')
  112. except KeyError as exception:
  113. import traceback
  114. trace = '\n'.join(traceback.format_stack())
  115. tb.mark_as_failure(tid3, exception, traceback=trace)
  116. assert tb.get_state(tid3) == states.FAILURE
  117. assert isinstance(tb.get_result(tid3), KeyError)
  118. assert tb.get_traceback(tid3) == trace
  119. def test_forget(self):
  120. tb = DatabaseBackend(self.uri, backend='memory://', app=self.app)
  121. tid = uuid()
  122. tb.mark_as_done(tid, {'foo': 'bar'})
  123. tb.mark_as_done(tid, {'foo': 'bar'})
  124. x = self.app.AsyncResult(tid, backend=tb)
  125. x.forget()
  126. assert x.result is None
  127. def test_process_cleanup(self):
  128. tb = DatabaseBackend(self.uri, app=self.app)
  129. tb.process_cleanup()
  130. @pytest.mark.usefixtures('depends_on_current_app')
  131. def test_reduce(self):
  132. tb = DatabaseBackend(self.uri, app=self.app)
  133. assert loads(dumps(tb))
  134. def test_save__restore__delete_group(self):
  135. tb = DatabaseBackend(self.uri, app=self.app)
  136. tid = uuid()
  137. res = {'something': 'special'}
  138. assert tb.save_group(tid, res) == res
  139. res2 = tb.restore_group(tid)
  140. assert res2 == res
  141. tb.delete_group(tid)
  142. assert tb.restore_group(tid) is None
  143. assert tb.restore_group('xxx-nonexisting-id') is None
  144. def test_cleanup(self):
  145. tb = DatabaseBackend(self.uri, app=self.app)
  146. for i in range(10):
  147. tb.mark_as_done(uuid(), 42)
  148. tb.save_group(uuid(), {'foo': 'bar'})
  149. s = tb.ResultSession()
  150. for t in s.query(Task).all():
  151. t.date_done = datetime.now() - tb.expires * 2
  152. for t in s.query(TaskSet).all():
  153. t.date_done = datetime.now() - tb.expires * 2
  154. s.commit()
  155. s.close()
  156. tb.cleanup()
  157. def test_Task__repr__(self):
  158. assert 'foo' in repr(Task('foo'))
  159. def test_TaskSet__repr__(self):
  160. assert 'foo', repr(TaskSet('foo' in None))
  161. @skip.unless_module('sqlalchemy')
  162. class test_SessionManager:
  163. def test_after_fork(self):
  164. s = SessionManager()
  165. assert not s.forked
  166. s._after_fork()
  167. assert s.forked
  168. @patch('celery.backends.database.session.create_engine')
  169. def test_get_engine_forked(self, create_engine):
  170. s = SessionManager()
  171. s._after_fork()
  172. engine = s.get_engine('dburi', foo=1)
  173. create_engine.assert_called_with('dburi', foo=1)
  174. assert engine is create_engine()
  175. engine2 = s.get_engine('dburi', foo=1)
  176. assert engine2 is engine
  177. @patch('celery.backends.database.session.sessionmaker')
  178. def test_create_session_forked(self, sessionmaker):
  179. s = SessionManager()
  180. s.get_engine = Mock(name='get_engine')
  181. s._after_fork()
  182. engine, session = s.create_session('dburi', short_lived_sessions=True)
  183. sessionmaker.assert_called_with(bind=s.get_engine())
  184. assert session is sessionmaker()
  185. sessionmaker.return_value = Mock(name='new')
  186. engine, session2 = s.create_session('dburi', short_lived_sessions=True)
  187. sessionmaker.assert_called_with(bind=s.get_engine())
  188. assert session2 is not session
  189. sessionmaker.return_value = Mock(name='new2')
  190. engine, session3 = s.create_session(
  191. 'dburi', short_lived_sessions=False)
  192. sessionmaker.assert_called_with(bind=s.get_engine())
  193. assert session3 is session2
  194. def test_coverage_madness(self):
  195. prev, session.register_after_fork = (
  196. session.register_after_fork, None,
  197. )
  198. try:
  199. SessionManager()
  200. finally:
  201. session.register_after_fork = prev