test_database.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from __future__ import with_statement
  2. import sys
  3. from datetime import datetime
  4. from nose import SkipTest
  5. from celery import states
  6. from celery.app import app_or_default
  7. from celery.exceptions import ImproperlyConfigured
  8. from celery.result import AsyncResult
  9. from celery.utils import gen_unique_id
  10. from celery.tests.utils import mask_modules
  11. from celery.tests.utils import unittest
  12. try:
  13. import sqlalchemy # noqa
  14. except ImportError:
  15. DatabaseBackend = Task = TaskSet = None
  16. else:
  17. from celery.backends.database import DatabaseBackend
  18. from celery.db.models import Task, TaskSet
  19. class SomeClass(object):
  20. def __init__(self, data):
  21. self.data = data
  22. class test_DatabaseBackend(unittest.TestCase):
  23. def setUp(self):
  24. if sys.platform.startswith("java"):
  25. raise SkipTest("SQLite not available on Jython")
  26. if hasattr(sys, "pypy_version_info"):
  27. raise SkipTest("Known to not pass on PyPy")
  28. if DatabaseBackend is None:
  29. raise SkipTest("sqlalchemy not installed")
  30. def test_missing_SQLAlchemy_raises_ImproperlyConfigured(self):
  31. with mask_modules("sqlalchemy"):
  32. from celery.backends.database import _sqlalchemy_installed
  33. self.assertRaises(ImproperlyConfigured, _sqlalchemy_installed)
  34. def test_pickle_hack_for_sqla_05(self):
  35. import sqlalchemy as sa
  36. from celery.db import session
  37. prev_base = session.ResultModelBase
  38. prev_ver, sa.__version__ = sa.__version__, "0.5.0"
  39. prev_models = sys.modules.pop("celery.db.models", None)
  40. try:
  41. from sqlalchemy.ext.declarative import declarative_base
  42. session.ResultModelBase = declarative_base()
  43. from celery.db.dfd042c7 import PickleType as Type1
  44. from celery.db.models import PickleType as Type2
  45. self.assertIs(Type1, Type2)
  46. finally:
  47. sys.modules["celery.db.models"] = prev_models
  48. sa.__version__ = prev_ver
  49. session.ResultModelBase = prev_base
  50. def test_missing_dburi_raises_ImproperlyConfigured(self):
  51. conf = app_or_default().conf
  52. prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
  53. try:
  54. self.assertRaises(ImproperlyConfigured, DatabaseBackend)
  55. finally:
  56. conf.CELERY_RESULT_DBURI = prev
  57. def test_missing_task_id_is_PENDING(self):
  58. tb = DatabaseBackend()
  59. self.assertEqual(tb.get_status("xxx-does-not-exist"), states.PENDING)
  60. def test_missing_task_meta_is_dict_with_pending(self):
  61. tb = DatabaseBackend()
  62. self.assertDictContainsSubset({
  63. 'status': states.PENDING,
  64. 'task_id': "xxx-does-not-exist-at-all",
  65. 'result': None,
  66. 'traceback': None,
  67. }, tb.get_task_meta("xxx-does-not-exist-at-all"))
  68. def test_mark_as_done(self):
  69. tb = DatabaseBackend()
  70. tid = gen_unique_id()
  71. self.assertEqual(tb.get_status(tid), states.PENDING)
  72. self.assertIsNone(tb.get_result(tid))
  73. tb.mark_as_done(tid, 42)
  74. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  75. self.assertEqual(tb.get_result(tid), 42)
  76. def test_is_pickled(self):
  77. tb = DatabaseBackend()
  78. tid2 = gen_unique_id()
  79. result = {"foo": "baz", "bar": SomeClass(12345)}
  80. tb.mark_as_done(tid2, result)
  81. # is serialized properly.
  82. rindb = tb.get_result(tid2)
  83. self.assertEqual(rindb.get("foo"), "baz")
  84. self.assertEqual(rindb.get("bar").data, 12345)
  85. def test_mark_as_started(self):
  86. tb = DatabaseBackend()
  87. tid = gen_unique_id()
  88. tb.mark_as_started(tid)
  89. self.assertEqual(tb.get_status(tid), states.STARTED)
  90. def test_mark_as_revoked(self):
  91. tb = DatabaseBackend()
  92. tid = gen_unique_id()
  93. tb.mark_as_revoked(tid)
  94. self.assertEqual(tb.get_status(tid), states.REVOKED)
  95. def test_mark_as_retry(self):
  96. tb = DatabaseBackend()
  97. tid = gen_unique_id()
  98. try:
  99. raise KeyError("foo")
  100. except KeyError, exception:
  101. import traceback
  102. trace = "\n".join(traceback.format_stack())
  103. tb.mark_as_retry(tid, exception, traceback=trace)
  104. self.assertEqual(tb.get_status(tid), states.RETRY)
  105. self.assertIsInstance(tb.get_result(tid), KeyError)
  106. self.assertEqual(tb.get_traceback(tid), trace)
  107. def test_mark_as_failure(self):
  108. tb = DatabaseBackend()
  109. tid3 = gen_unique_id()
  110. try:
  111. raise KeyError("foo")
  112. except KeyError, exception:
  113. import traceback
  114. trace = "\n".join(traceback.format_stack())
  115. tb.mark_as_failure(tid3, exception, traceback=trace)
  116. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  117. self.assertIsInstance(tb.get_result(tid3), KeyError)
  118. self.assertEqual(tb.get_traceback(tid3), trace)
  119. def test_forget(self):
  120. tb = DatabaseBackend(backend="memory://")
  121. tid = gen_unique_id()
  122. tb.mark_as_done(tid, {"foo": "bar"})
  123. x = AsyncResult(tid)
  124. x.forget()
  125. self.assertIsNone(x.result)
  126. def test_process_cleanup(self):
  127. tb = DatabaseBackend()
  128. tb.process_cleanup()
  129. def test_save__restore__delete_taskset(self):
  130. tb = DatabaseBackend()
  131. tid = gen_unique_id()
  132. res = {u"something": "special"}
  133. self.assertEqual(tb.save_taskset(tid, res), res)
  134. res2 = tb.restore_taskset(tid)
  135. self.assertEqual(res2, res)
  136. tb.delete_taskset(tid)
  137. self.assertIsNone(tb.restore_taskset(tid))
  138. self.assertIsNone(tb.restore_taskset("xxx-nonexisting-id"))
  139. def test_cleanup(self):
  140. tb = DatabaseBackend()
  141. for i in range(10):
  142. tb.mark_as_done(gen_unique_id(), 42)
  143. tb.save_taskset(gen_unique_id(), {"foo": "bar"})
  144. s = tb.ResultSession()
  145. for t in s.query(Task).all():
  146. t.date_done = datetime.now() - tb.expires * 2
  147. for t in s.query(TaskSet).all():
  148. t.date_done = datetime.now() - tb.expires * 2
  149. s.commit()
  150. s.close()
  151. tb.cleanup()
  152. def test_Task__repr__(self):
  153. self.assertIn("foo", repr(Task("foo")))
  154. def test_TaskSet__repr__(self):
  155. self.assertIn("foo", repr(TaskSet("foo", None)))