test_database.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from __future__ import absolute_import, unicode_literals
  2. from datetime import datetime
  3. from nose import SkipTest
  4. from pickle import loads, dumps
  5. from celery import states
  6. from celery.exceptions import ImproperlyConfigured
  7. from celery.result import AsyncResult
  8. from celery.utils import uuid
  9. from celery.tests.case import (
  10. AppCase,
  11. mask_modules,
  12. skip_if_pypy,
  13. skip_if_jython,
  14. )
  15. try:
  16. import sqlalchemy # noqa
  17. except ImportError:
  18. DatabaseBackend = Task = TaskSet = retry = None # noqa
  19. else:
  20. from celery.backends.database import DatabaseBackend, retry
  21. from celery.backends.database.models import Task, TaskSet
  22. class SomeClass(object):
  23. def __init__(self, data):
  24. self.data = data
  25. class test_DatabaseBackend(AppCase):
  26. @skip_if_pypy
  27. @skip_if_jython
  28. def setup(self):
  29. if DatabaseBackend is None:
  30. raise SkipTest('sqlalchemy not installed')
  31. def test_retry_helper(self):
  32. from celery.backends.database import OperationalError
  33. calls = [0]
  34. @retry
  35. def raises():
  36. calls[0] += 1
  37. raise OperationalError(1, 2, 3)
  38. with self.assertRaises(OperationalError):
  39. raises(max_retries=5)
  40. self.assertEqual(calls[0], 5)
  41. def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self):
  42. with mask_modules('sqlalchemy'):
  43. from celery.backends.database import _sqlalchemy_installed
  44. with self.assertRaises(ImproperlyConfigured):
  45. _sqlalchemy_installed()
  46. def test_missing_dburi_raises_ImproperlyConfigured(self):
  47. conf = self.app.conf
  48. prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
  49. try:
  50. with self.assertRaises(ImproperlyConfigured):
  51. DatabaseBackend(app=self.app)
  52. finally:
  53. conf.CELERY_RESULT_DBURI = prev
  54. def test_missing_task_id_is_PENDING(self):
  55. tb = DatabaseBackend(app=self.app)
  56. self.assertEqual(tb.get_status('xxx-does-not-exist'), states.PENDING)
  57. def test_missing_task_meta_is_dict_with_pending(self):
  58. tb = DatabaseBackend(app=self.app)
  59. self.assertDictContainsSubset({
  60. 'status': states.PENDING,
  61. 'task_id': 'xxx-does-not-exist-at-all',
  62. 'result': None,
  63. 'traceback': None
  64. }, tb.get_task_meta('xxx-does-not-exist-at-all'))
  65. def test_mark_as_done(self):
  66. tb = DatabaseBackend(app=self.app)
  67. tid = uuid()
  68. self.assertEqual(tb.get_status(tid), states.PENDING)
  69. self.assertIsNone(tb.get_result(tid))
  70. tb.mark_as_done(tid, 42)
  71. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  72. self.assertEqual(tb.get_result(tid), 42)
  73. def test_is_pickled(self):
  74. tb = DatabaseBackend(app=self.app)
  75. tid2 = uuid()
  76. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  77. tb.mark_as_done(tid2, result)
  78. # is serialized properly.
  79. rindb = tb.get_result(tid2)
  80. self.assertEqual(rindb.get('foo'), 'baz')
  81. self.assertEqual(rindb.get('bar').data, 12345)
  82. def test_mark_as_started(self):
  83. tb = DatabaseBackend(app=self.app)
  84. tid = uuid()
  85. tb.mark_as_started(tid)
  86. self.assertEqual(tb.get_status(tid), states.STARTED)
  87. def test_mark_as_revoked(self):
  88. tb = DatabaseBackend(app=self.app)
  89. tid = uuid()
  90. tb.mark_as_revoked(tid)
  91. self.assertEqual(tb.get_status(tid), states.REVOKED)
  92. def test_mark_as_retry(self):
  93. tb = DatabaseBackend(app=self.app)
  94. tid = uuid()
  95. try:
  96. raise KeyError('foo')
  97. except KeyError as exception:
  98. import traceback
  99. trace = '\n'.join(traceback.format_stack())
  100. tb.mark_as_retry(tid, exception, traceback=trace)
  101. self.assertEqual(tb.get_status(tid), states.RETRY)
  102. self.assertIsInstance(tb.get_result(tid), KeyError)
  103. self.assertEqual(tb.get_traceback(tid), trace)
  104. def test_mark_as_failure(self):
  105. tb = DatabaseBackend(app=self.app)
  106. tid3 = uuid()
  107. try:
  108. raise KeyError('foo')
  109. except KeyError as exception:
  110. import traceback
  111. trace = '\n'.join(traceback.format_stack())
  112. tb.mark_as_failure(tid3, exception, traceback=trace)
  113. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  114. self.assertIsInstance(tb.get_result(tid3), KeyError)
  115. self.assertEqual(tb.get_traceback(tid3), trace)
  116. def test_forget(self):
  117. tb = DatabaseBackend(backend='memory://', app=self.app)
  118. tid = uuid()
  119. tb.mark_as_done(tid, {'foo': 'bar'})
  120. tb.mark_as_done(tid, {'foo': 'bar'})
  121. x = AsyncResult(tid, backend=tb)
  122. x.forget()
  123. self.assertIsNone(x.result)
  124. def test_process_cleanup(self):
  125. tb = DatabaseBackend(app=self.app)
  126. tb.process_cleanup()
  127. def test_reduce(self):
  128. tb = DatabaseBackend(app=self.app)
  129. self.assertTrue(loads(dumps(tb)))
  130. def test_save__restore__delete_group(self):
  131. tb = DatabaseBackend(app=self.app)
  132. tid = uuid()
  133. res = {'something': 'special'}
  134. self.assertEqual(tb.save_group(tid, res), res)
  135. res2 = tb.restore_group(tid)
  136. self.assertEqual(res2, res)
  137. tb.delete_group(tid)
  138. self.assertIsNone(tb.restore_group(tid))
  139. self.assertIsNone(tb.restore_group('xxx-nonexisting-id'))
  140. def test_cleanup(self):
  141. tb = DatabaseBackend(app=self.app)
  142. for i in range(10):
  143. tb.mark_as_done(uuid(), 42)
  144. tb.save_group(uuid(), {'foo': 'bar'})
  145. s = tb.ResultSession()
  146. for t in s.query(Task).all():
  147. t.date_done = datetime.now() - tb.expires * 2
  148. for t in s.query(TaskSet).all():
  149. t.date_done = datetime.now() - tb.expires * 2
  150. s.commit()
  151. s.close()
  152. tb.cleanup()
  153. def test_Task__repr__(self):
  154. self.assertIn('foo', repr(Task('foo')))
  155. def test_TaskSet__repr__(self):
  156. self.assertIn('foo', repr(TaskSet('foo', None)))