test_database.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import sys
  4. from datetime import datetime
  5. from nose import SkipTest
  6. from celery import states
  7. from celery.app import app_or_default
  8. from celery.exceptions import ImproperlyConfigured
  9. from celery.result import AsyncResult
  10. from celery.utils import uuid
  11. from celery.tests.utils import mask_modules
  12. from celery.tests.utils import unittest
  13. try:
  14. import sqlalchemy # noqa
  15. except ImportError:
  16. DatabaseBackend = Task = TaskSet = None
  17. else:
  18. from celery.backends.database import DatabaseBackend
  19. from celery.db.models import Task, TaskSet
  20. class SomeClass(object):
  21. def __init__(self, data):
  22. self.data = data
  23. class test_DatabaseBackend(unittest.TestCase):
  24. def setUp(self):
  25. if sys.platform.startswith("java"):
  26. raise SkipTest("SQLite not available on Jython")
  27. if hasattr(sys, "pypy_version_info"):
  28. raise SkipTest("Known to not pass on PyPy")
  29. if DatabaseBackend is None:
  30. raise SkipTest("sqlalchemy not installed")
  31. def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self):
  32. with mask_modules("sqlalchemy"):
  33. from celery.backends.database import _sqlalchemy_installed
  34. with self.assertRaises(ImproperlyConfigured):
  35. _sqlalchemy_installed()
  36. def test_pickle_hack_for_sqla_05(self):
  37. import sqlalchemy as sa
  38. from celery.db import session
  39. prev_base = session.ResultModelBase
  40. prev_ver, sa.__version__ = sa.__version__, "0.5.0"
  41. prev_models = sys.modules.pop("celery.db.models", None)
  42. try:
  43. from sqlalchemy.ext.declarative import declarative_base
  44. session.ResultModelBase = declarative_base()
  45. from celery.db.dfd042c7 import PickleType as Type1
  46. from celery.db.models import PickleType as Type2
  47. self.assertIs(Type1, Type2)
  48. finally:
  49. sys.modules["celery.db.models"] = prev_models
  50. sa.__version__ = prev_ver
  51. session.ResultModelBase = prev_base
  52. def test_missing_dburi_raises_ImproperlyConfigured(self):
  53. conf = app_or_default().conf
  54. prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
  55. try:
  56. with self.assertRaises(ImproperlyConfigured):
  57. DatabaseBackend()
  58. finally:
  59. conf.CELERY_RESULT_DBURI = prev
  60. def test_missing_task_id_is_PENDING(self):
  61. tb = DatabaseBackend()
  62. self.assertEqual(tb.get_status("xxx-does-not-exist"), states.PENDING)
  63. def test_missing_task_meta_is_dict_with_pending(self):
  64. tb = DatabaseBackend()
  65. self.assertDictContainsSubset({
  66. 'status': states.PENDING,
  67. 'task_id': "xxx-does-not-exist-at-all",
  68. 'result': None,
  69. 'traceback': None,
  70. }, tb.get_task_meta("xxx-does-not-exist-at-all"))
  71. def test_mark_as_done(self):
  72. tb = DatabaseBackend()
  73. tid = uuid()
  74. self.assertEqual(tb.get_status(tid), states.PENDING)
  75. self.assertIsNone(tb.get_result(tid))
  76. tb.mark_as_done(tid, 42)
  77. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  78. self.assertEqual(tb.get_result(tid), 42)
  79. def test_is_pickled(self):
  80. tb = DatabaseBackend()
  81. tid2 = uuid()
  82. result = {"foo": "baz", "bar": SomeClass(12345)}
  83. tb.mark_as_done(tid2, result)
  84. # is serialized properly.
  85. rindb = tb.get_result(tid2)
  86. self.assertEqual(rindb.get("foo"), "baz")
  87. self.assertEqual(rindb.get("bar").data, 12345)
  88. def test_mark_as_started(self):
  89. tb = DatabaseBackend()
  90. tid = uuid()
  91. tb.mark_as_started(tid)
  92. self.assertEqual(tb.get_status(tid), states.STARTED)
  93. def test_mark_as_revoked(self):
  94. tb = DatabaseBackend()
  95. tid = uuid()
  96. tb.mark_as_revoked(tid)
  97. self.assertEqual(tb.get_status(tid), states.REVOKED)
  98. def test_mark_as_retry(self):
  99. tb = DatabaseBackend()
  100. tid = uuid()
  101. try:
  102. raise KeyError("foo")
  103. except KeyError, exception:
  104. import traceback
  105. trace = "\n".join(traceback.format_stack())
  106. tb.mark_as_retry(tid, exception, traceback=trace)
  107. self.assertEqual(tb.get_status(tid), states.RETRY)
  108. self.assertIsInstance(tb.get_result(tid), KeyError)
  109. self.assertEqual(tb.get_traceback(tid), trace)
  110. def test_mark_as_failure(self):
  111. tb = DatabaseBackend()
  112. tid3 = uuid()
  113. try:
  114. raise KeyError("foo")
  115. except KeyError, exception:
  116. import traceback
  117. trace = "\n".join(traceback.format_stack())
  118. tb.mark_as_failure(tid3, exception, traceback=trace)
  119. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  120. self.assertIsInstance(tb.get_result(tid3), KeyError)
  121. self.assertEqual(tb.get_traceback(tid3), trace)
  122. def test_forget(self):
  123. tb = DatabaseBackend(backend="memory://")
  124. tid = uuid()
  125. tb.mark_as_done(tid, {"foo": "bar"})
  126. x = AsyncResult(tid)
  127. x.forget()
  128. self.assertIsNone(x.result)
  129. def test_process_cleanup(self):
  130. tb = DatabaseBackend()
  131. tb.process_cleanup()
  132. def test_save__restore__delete_taskset(self):
  133. tb = DatabaseBackend()
  134. tid = uuid()
  135. res = {u"something": "special"}
  136. self.assertEqual(tb.save_taskset(tid, res), res)
  137. res2 = tb.restore_taskset(tid)
  138. self.assertEqual(res2, res)
  139. tb.delete_taskset(tid)
  140. self.assertIsNone(tb.restore_taskset(tid))
  141. self.assertIsNone(tb.restore_taskset("xxx-nonexisting-id"))
  142. def test_cleanup(self):
  143. tb = DatabaseBackend()
  144. for i in range(10):
  145. tb.mark_as_done(uuid(), 42)
  146. tb.save_taskset(uuid(), {"foo": "bar"})
  147. s = tb.ResultSession()
  148. for t in s.query(Task).all():
  149. t.date_done = datetime.now() - tb.expires * 2
  150. for t in s.query(TaskSet).all():
  151. t.date_done = datetime.now() - tb.expires * 2
  152. s.commit()
  153. s.close()
  154. tb.cleanup()
  155. def test_Task__repr__(self):
  156. self.assertIn("foo", repr(Task("foo")))
  157. def test_TaskSet__repr__(self):
  158. self.assertIn("foo", repr(TaskSet("foo", None)))