test_database.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import sys
  4. from datetime import datetime
  5. from nose import SkipTest
  6. from celery import states
  7. from celery.app import app_or_default
  8. from celery.exceptions import ImproperlyConfigured
  9. from celery.result import AsyncResult
  10. from celery.utils import uuid
  11. from celery.tests.utils import Case, mask_modules
  12. try:
  13. import sqlalchemy # noqa
  14. except ImportError:
  15. DatabaseBackend = Task = TaskSet = None
  16. else:
  17. from celery.backends.database import DatabaseBackend
  18. from celery.db.models import Task, TaskSet
  19. class SomeClass(object):
  20. def __init__(self, data):
  21. self.data = data
  22. class test_DatabaseBackend(Case):
  23. def setUp(self):
  24. if sys.platform.startswith("java"):
  25. raise SkipTest("SQLite not available on Jython")
  26. if hasattr(sys, "pypy_version_info"):
  27. raise SkipTest("Known to not pass on PyPy")
  28. if DatabaseBackend is None:
  29. raise SkipTest("sqlalchemy not installed")
  30. def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self):
  31. with mask_modules("sqlalchemy"):
  32. from celery.backends.database import _sqlalchemy_installed
  33. with self.assertRaises(ImproperlyConfigured):
  34. _sqlalchemy_installed()
  35. def test_pickle_hack_for_sqla_05(self):
  36. import sqlalchemy as sa
  37. from celery.db import session
  38. prev_base = session.ResultModelBase
  39. prev_ver, sa.__version__ = sa.__version__, "0.5.0"
  40. prev_models = sys.modules.pop("celery.db.models", None)
  41. try:
  42. from sqlalchemy.ext.declarative import declarative_base
  43. session.ResultModelBase = declarative_base()
  44. from celery.db.dfd042c7 import PickleType as Type1
  45. from celery.db.models import PickleType as Type2
  46. self.assertIs(Type1, Type2)
  47. finally:
  48. sys.modules["celery.db.models"] = prev_models
  49. sa.__version__ = prev_ver
  50. session.ResultModelBase = prev_base
  51. def test_missing_dburi_raises_ImproperlyConfigured(self):
  52. conf = app_or_default().conf
  53. prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
  54. try:
  55. with self.assertRaises(ImproperlyConfigured):
  56. DatabaseBackend()
  57. finally:
  58. conf.CELERY_RESULT_DBURI = prev
  59. def test_missing_task_id_is_PENDING(self):
  60. tb = DatabaseBackend()
  61. self.assertEqual(tb.get_status("xxx-does-not-exist"), states.PENDING)
  62. def test_missing_task_meta_is_dict_with_pending(self):
  63. tb = DatabaseBackend()
  64. self.assertDictContainsSubset({
  65. 'status': states.PENDING,
  66. 'task_id': "xxx-does-not-exist-at-all",
  67. 'result': None,
  68. 'traceback': None,
  69. }, tb.get_task_meta("xxx-does-not-exist-at-all"))
  70. def test_mark_as_done(self):
  71. tb = DatabaseBackend()
  72. tid = uuid()
  73. self.assertEqual(tb.get_status(tid), states.PENDING)
  74. self.assertIsNone(tb.get_result(tid))
  75. tb.mark_as_done(tid, 42)
  76. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  77. self.assertEqual(tb.get_result(tid), 42)
  78. def test_is_pickled(self):
  79. tb = DatabaseBackend()
  80. tid2 = uuid()
  81. result = {"foo": "baz", "bar": SomeClass(12345)}
  82. tb.mark_as_done(tid2, result)
  83. # is serialized properly.
  84. rindb = tb.get_result(tid2)
  85. self.assertEqual(rindb.get("foo"), "baz")
  86. self.assertEqual(rindb.get("bar").data, 12345)
  87. def test_mark_as_started(self):
  88. tb = DatabaseBackend()
  89. tid = uuid()
  90. tb.mark_as_started(tid)
  91. self.assertEqual(tb.get_status(tid), states.STARTED)
  92. def test_mark_as_revoked(self):
  93. tb = DatabaseBackend()
  94. tid = uuid()
  95. tb.mark_as_revoked(tid)
  96. self.assertEqual(tb.get_status(tid), states.REVOKED)
  97. def test_mark_as_retry(self):
  98. tb = DatabaseBackend()
  99. tid = uuid()
  100. try:
  101. raise KeyError("foo")
  102. except KeyError, exception:
  103. import traceback
  104. trace = "\n".join(traceback.format_stack())
  105. tb.mark_as_retry(tid, exception, traceback=trace)
  106. self.assertEqual(tb.get_status(tid), states.RETRY)
  107. self.assertIsInstance(tb.get_result(tid), KeyError)
  108. self.assertEqual(tb.get_traceback(tid), trace)
  109. def test_mark_as_failure(self):
  110. tb = DatabaseBackend()
  111. tid3 = uuid()
  112. try:
  113. raise KeyError("foo")
  114. except KeyError, exception:
  115. import traceback
  116. trace = "\n".join(traceback.format_stack())
  117. tb.mark_as_failure(tid3, exception, traceback=trace)
  118. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  119. self.assertIsInstance(tb.get_result(tid3), KeyError)
  120. self.assertEqual(tb.get_traceback(tid3), trace)
  121. def test_forget(self):
  122. tb = DatabaseBackend(backend="memory://")
  123. tid = uuid()
  124. tb.mark_as_done(tid, {"foo": "bar"})
  125. x = AsyncResult(tid)
  126. x.forget()
  127. self.assertIsNone(x.result)
  128. def test_process_cleanup(self):
  129. tb = DatabaseBackend()
  130. tb.process_cleanup()
  131. def test_save__restore__delete_taskset(self):
  132. tb = DatabaseBackend()
  133. tid = uuid()
  134. res = {u"something": "special"}
  135. self.assertEqual(tb.save_taskset(tid, res), res)
  136. res2 = tb.restore_taskset(tid)
  137. self.assertEqual(res2, res)
  138. tb.delete_taskset(tid)
  139. self.assertIsNone(tb.restore_taskset(tid))
  140. self.assertIsNone(tb.restore_taskset("xxx-nonexisting-id"))
  141. def test_cleanup(self):
  142. tb = DatabaseBackend()
  143. for i in range(10):
  144. tb.mark_as_done(uuid(), 42)
  145. tb.save_taskset(uuid(), {"foo": "bar"})
  146. s = tb.ResultSession()
  147. for t in s.query(Task).all():
  148. t.date_done = datetime.now() - tb.expires * 2
  149. for t in s.query(TaskSet).all():
  150. t.date_done = datetime.now() - tb.expires * 2
  151. s.commit()
  152. s.close()
  153. tb.cleanup()
  154. def test_Task__repr__(self):
  155. self.assertIn("foo", repr(Task("foo")))
  156. def test_TaskSet__repr__(self):
  157. self.assertIn("foo", repr(TaskSet("foo", None)))