test_database.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import sys
  4. from datetime import datetime
  5. from nose import SkipTest
  6. from pickle import loads, dumps
  7. from celery import states
  8. from celery.app import app_or_default
  9. from celery.exceptions import ImproperlyConfigured
  10. from celery.result import AsyncResult
  11. from celery.utils import uuid
  12. from celery.tests.utils import (
  13. Case,
  14. mask_modules,
  15. skip_if_pypy,
  16. skip_if_jython,
  17. )
  18. try:
  19. import sqlalchemy # noqa
  20. except ImportError:
  21. DatabaseBackend = Task = TaskSet = None
  22. else:
  23. from celery.backends.database import DatabaseBackend
  24. from celery.backends.database.models import Task, TaskSet
  25. class SomeClass(object):
  26. def __init__(self, data):
  27. self.data = data
  28. class test_DatabaseBackend(Case):
  29. @skip_if_pypy
  30. @skip_if_jython
  31. def setUp(self):
  32. if DatabaseBackend is None:
  33. raise SkipTest('sqlalchemy not installed')
  34. def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self):
  35. with mask_modules('sqlalchemy'):
  36. from celery.backends.database import _sqlalchemy_installed
  37. with self.assertRaises(ImproperlyConfigured):
  38. _sqlalchemy_installed()
  39. def test_pickle_hack_for_sqla_05(self):
  40. import sqlalchemy as sa
  41. from celery.backends.database import session
  42. prev_base = session.ResultModelBase
  43. prev_ver, sa.__version__ = sa.__version__, '0.5.0'
  44. prev_models = sys.modules.pop('celery.backends.database.models', None)
  45. try:
  46. from sqlalchemy.ext.declarative import declarative_base
  47. session.ResultModelBase = declarative_base()
  48. from celery.backends.database.dfd042c7 import PickleType as Type1
  49. from celery.backends.database.models import PickleType as Type2
  50. self.assertIs(Type1, Type2)
  51. finally:
  52. sys.modules['celery.backends.database.models'] = prev_models
  53. sa.__version__ = prev_ver
  54. session.ResultModelBase = prev_base
  55. def test_missing_dburi_raises_ImproperlyConfigured(self):
  56. conf = app_or_default().conf
  57. prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
  58. try:
  59. with self.assertRaises(ImproperlyConfigured):
  60. DatabaseBackend()
  61. finally:
  62. conf.CELERY_RESULT_DBURI = prev
  63. def test_missing_task_id_is_PENDING(self):
  64. tb = DatabaseBackend()
  65. self.assertEqual(tb.get_status('xxx-does-not-exist'), states.PENDING)
  66. def test_missing_task_meta_is_dict_with_pending(self):
  67. tb = DatabaseBackend()
  68. self.assertDictContainsSubset({
  69. 'status': states.PENDING,
  70. 'task_id': 'xxx-does-not-exist-at-all',
  71. 'result': None,
  72. 'traceback': None,
  73. }, tb.get_task_meta('xxx-does-not-exist-at-all'))
  74. def test_mark_as_done(self):
  75. tb = DatabaseBackend()
  76. tid = uuid()
  77. self.assertEqual(tb.get_status(tid), states.PENDING)
  78. self.assertIsNone(tb.get_result(tid))
  79. tb.mark_as_done(tid, 42)
  80. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  81. self.assertEqual(tb.get_result(tid), 42)
  82. def test_is_pickled(self):
  83. tb = DatabaseBackend()
  84. tid2 = uuid()
  85. result = {'foo': 'baz', 'bar': SomeClass(12345)}
  86. tb.mark_as_done(tid2, result)
  87. # is serialized properly.
  88. rindb = tb.get_result(tid2)
  89. self.assertEqual(rindb.get('foo'), 'baz')
  90. self.assertEqual(rindb.get('bar').data, 12345)
  91. def test_mark_as_started(self):
  92. tb = DatabaseBackend()
  93. tid = uuid()
  94. tb.mark_as_started(tid)
  95. self.assertEqual(tb.get_status(tid), states.STARTED)
  96. def test_mark_as_revoked(self):
  97. tb = DatabaseBackend()
  98. tid = uuid()
  99. tb.mark_as_revoked(tid)
  100. self.assertEqual(tb.get_status(tid), states.REVOKED)
  101. def test_mark_as_retry(self):
  102. tb = DatabaseBackend()
  103. tid = uuid()
  104. try:
  105. raise KeyError('foo')
  106. except KeyError, exception:
  107. import traceback
  108. trace = '\n'.join(traceback.format_stack())
  109. tb.mark_as_retry(tid, exception, traceback=trace)
  110. self.assertEqual(tb.get_status(tid), states.RETRY)
  111. self.assertIsInstance(tb.get_result(tid), KeyError)
  112. self.assertEqual(tb.get_traceback(tid), trace)
  113. def test_mark_as_failure(self):
  114. tb = DatabaseBackend()
  115. tid3 = uuid()
  116. try:
  117. raise KeyError('foo')
  118. except KeyError, exception:
  119. import traceback
  120. trace = '\n'.join(traceback.format_stack())
  121. tb.mark_as_failure(tid3, exception, traceback=trace)
  122. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  123. self.assertIsInstance(tb.get_result(tid3), KeyError)
  124. self.assertEqual(tb.get_traceback(tid3), trace)
  125. def test_forget(self):
  126. tb = DatabaseBackend(backend='memory://')
  127. tid = uuid()
  128. tb.mark_as_done(tid, {'foo': 'bar'})
  129. tb.mark_as_done(tid, {'foo': 'bar'})
  130. x = AsyncResult(tid, backend=tb)
  131. x.forget()
  132. self.assertIsNone(x.result)
  133. def test_process_cleanup(self):
  134. tb = DatabaseBackend()
  135. tb.process_cleanup()
  136. def test_reduce(self):
  137. tb = DatabaseBackend()
  138. self.assertTrue(loads(dumps(tb)))
  139. def test_save__restore__delete_group(self):
  140. tb = DatabaseBackend()
  141. tid = uuid()
  142. res = {u'something': 'special'}
  143. self.assertEqual(tb.save_group(tid, res), res)
  144. res2 = tb.restore_group(tid)
  145. self.assertEqual(res2, res)
  146. tb.delete_group(tid)
  147. self.assertIsNone(tb.restore_group(tid))
  148. self.assertIsNone(tb.restore_group('xxx-nonexisting-id'))
  149. def test_cleanup(self):
  150. tb = DatabaseBackend()
  151. for i in range(10):
  152. tb.mark_as_done(uuid(), 42)
  153. tb.save_group(uuid(), {'foo': 'bar'})
  154. s = tb.ResultSession()
  155. for t in s.query(Task).all():
  156. t.date_done = datetime.now() - tb.expires * 2
  157. for t in s.query(TaskSet).all():
  158. t.date_done = datetime.now() - tb.expires * 2
  159. s.commit()
  160. s.close()
  161. tb.cleanup()
  162. def test_Task__repr__(self):
  163. self.assertIn('foo', repr(Task('foo')))
  164. def test_TaskSet__repr__(self):
  165. self.assertIn('foo', repr(TaskSet('foo', None)))