test_database.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import unittest2 as unittest
  2. from datetime import datetime
  3. from celery.exceptions import ImproperlyConfigured
  4. from celery import states
  5. from celery.app import default_app
  6. from celery.db.models import Task, TaskSet
  7. from celery.utils import gen_unique_id
  8. from celery.backends.database import DatabaseBackend
  9. class SomeClass(object):
  10. def __init__(self, data):
  11. self.data = data
  12. class test_DatabaseBackend(unittest.TestCase):
  13. def test_missing_dburi_raises_ImproperlyConfigured(self):
  14. conf = default_app.conf
  15. prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
  16. try:
  17. self.assertRaises(ImproperlyConfigured, DatabaseBackend)
  18. finally:
  19. conf.CELERY_RESULT_DBURI = prev
  20. def test_missing_task_id_is_PENDING(self):
  21. tb = DatabaseBackend()
  22. self.assertEqual(tb.get_status("xxx-does-not-exist"), states.PENDING)
  23. def test_mark_as_done(self):
  24. tb = DatabaseBackend()
  25. tid = gen_unique_id()
  26. self.assertEqual(tb.get_status(tid), states.PENDING)
  27. self.assertIsNone(tb.get_result(tid))
  28. tb.mark_as_done(tid, 42)
  29. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  30. self.assertEqual(tb.get_result(tid), 42)
  31. def test_is_pickled(self):
  32. tb = DatabaseBackend()
  33. tid2 = gen_unique_id()
  34. result = {"foo": "baz", "bar": SomeClass(12345)}
  35. tb.mark_as_done(tid2, result)
  36. # is serialized properly.
  37. rindb = tb.get_result(tid2)
  38. self.assertEqual(rindb.get("foo"), "baz")
  39. self.assertEqual(rindb.get("bar").data, 12345)
  40. def test_mark_as_started(self):
  41. tb = DatabaseBackend()
  42. tid = gen_unique_id()
  43. tb.mark_as_started(tid)
  44. self.assertEqual(tb.get_status(tid), states.STARTED)
  45. def test_mark_as_revoked(self):
  46. tb = DatabaseBackend()
  47. tid = gen_unique_id()
  48. tb.mark_as_revoked(tid)
  49. self.assertEqual(tb.get_status(tid), states.REVOKED)
  50. def test_mark_as_retry(self):
  51. tb = DatabaseBackend()
  52. tid = gen_unique_id()
  53. try:
  54. raise KeyError("foo")
  55. except KeyError, exception:
  56. import traceback
  57. trace = "\n".join(traceback.format_stack())
  58. tb.mark_as_retry(tid, exception, traceback=trace)
  59. self.assertEqual(tb.get_status(tid), states.RETRY)
  60. self.assertIsInstance(tb.get_result(tid), KeyError)
  61. self.assertEqual(tb.get_traceback(tid), trace)
  62. def test_mark_as_failure(self):
  63. tb = DatabaseBackend()
  64. tid3 = gen_unique_id()
  65. try:
  66. raise KeyError("foo")
  67. except KeyError, exception:
  68. import traceback
  69. trace = "\n".join(traceback.format_stack())
  70. tb.mark_as_failure(tid3, exception, traceback=trace)
  71. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  72. self.assertIsInstance(tb.get_result(tid3), KeyError)
  73. self.assertEqual(tb.get_traceback(tid3), trace)
  74. def test_process_cleanup(self):
  75. tb = DatabaseBackend()
  76. tb.process_cleanup()
  77. def test_save___restore_taskset(self):
  78. tb = DatabaseBackend()
  79. tid = gen_unique_id()
  80. res = {u"something": "special"}
  81. self.assertEqual(tb.save_taskset(tid, res), res)
  82. res2 = tb.restore_taskset(tid)
  83. self.assertEqual(res2, res)
  84. self.assertIsNone(tb.restore_taskset("xxx-nonexisting-id"))
  85. def test_cleanup(self):
  86. tb = DatabaseBackend()
  87. for i in range(10):
  88. tb.mark_as_done(gen_unique_id(), 42)
  89. tb.save_taskset(gen_unique_id(), {"foo": "bar"})
  90. s = tb.ResultSession()
  91. for t in s.query(Task).all():
  92. t.date_done = datetime.now() - tb.result_expires * 2
  93. for t in s.query(TaskSet).all():
  94. t.date_done = datetime.now() - tb.result_expires * 2
  95. s.commit()
  96. s.close()
  97. tb.cleanup()
  98. s2 = tb.ResultSession()
  99. self.assertEqual(s2.query(Task).count(), 0)
  100. self.assertEqual(s2.query(TaskSet).count(), 0)
  101. def test_Task__repr__(self):
  102. self.assertIn("foo", repr(Task("foo")))
  103. def test_TaskSet__repr__(self):
  104. self.assertIn("foo", repr(TaskSet("foo", None)))