test_base.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import sys
  2. import types
  3. from celery.tests.utils import unittest
  4. from celery.utils.serialization import subclass_exception
  5. from celery.utils.serialization import \
  6. find_nearest_pickleable_exception as fnpe
  7. from celery.utils.serialization import UnpickleableExceptionWrapper
  8. from celery.utils.serialization import get_pickleable_exception as gpe
  9. from celery import states
  10. from celery.backends.base import BaseBackend, KeyValueStoreBackend
  11. from celery.backends.base import BaseDictBackend
  12. from celery.utils import gen_unique_id
  13. class wrapobject(object):
  14. def __init__(self, *args, **kwargs):
  15. self.args = args
  16. Oldstyle = types.ClassType("Oldstyle", (), {})
  17. Unpickleable = subclass_exception("Unpickleable", KeyError, "foo.module")
  18. Impossible = subclass_exception("Impossible", object, "foo.module")
  19. Lookalike = subclass_exception("Lookalike", wrapobject, "foo.module")
  20. b = BaseBackend()
  21. class test_BaseBackend_interface(unittest.TestCase):
  22. def test_get_status(self):
  23. self.assertRaises(NotImplementedError,
  24. b.get_status, "SOMExx-N0Nex1stant-IDxx-")
  25. def test_store_result(self):
  26. self.assertRaises(NotImplementedError,
  27. b.store_result, "SOMExx-N0nex1stant-IDxx-", 42, states.SUCCESS)
  28. def test_reload_task_result(self):
  29. self.assertRaises(NotImplementedError,
  30. b.reload_task_result, "SOMExx-N0nex1stant-IDxx-")
  31. def test_reload_taskset_result(self):
  32. self.assertRaises(NotImplementedError,
  33. b.reload_taskset_result, "SOMExx-N0nex1stant-IDxx-")
  34. def test_get_result(self):
  35. self.assertRaises(NotImplementedError,
  36. b.get_result, "SOMExx-N0nex1stant-IDxx-")
  37. def test_restore_taskset(self):
  38. self.assertRaises(NotImplementedError,
  39. b.restore_taskset, "SOMExx-N0nex1stant-IDxx-")
  40. def test_save_taskset(self):
  41. self.assertRaises(NotImplementedError,
  42. b.save_taskset, "SOMExx-N0nex1stant-IDxx-", "blergh")
  43. def test_get_traceback(self):
  44. self.assertRaises(NotImplementedError,
  45. b.get_traceback, "SOMExx-N0nex1stant-IDxx-")
  46. def test_forget(self):
  47. self.assertRaises(NotImplementedError,
  48. b.forget, "SOMExx-N0nex1stant-IDxx-")
  49. class test_exception_pickle(unittest.TestCase):
  50. def test_oldstyle(self):
  51. self.assertIsNone(fnpe(Oldstyle()))
  52. def test_BaseException(self):
  53. self.assertIsNone(fnpe(Exception()))
  54. def test_get_pickleable_exception(self):
  55. exc = Exception("foo")
  56. self.assertEqual(gpe(exc), exc)
  57. def test_unpickleable(self):
  58. self.assertIsInstance(fnpe(Unpickleable()), KeyError)
  59. self.assertIsNone(fnpe(Impossible()))
  60. class test_prepare_exception(unittest.TestCase):
  61. def test_unpickleable(self):
  62. x = b.prepare_exception(Unpickleable(1, 2, "foo"))
  63. self.assertIsInstance(x, KeyError)
  64. y = b.exception_to_python(x)
  65. self.assertIsInstance(y, KeyError)
  66. def test_impossible(self):
  67. x = b.prepare_exception(Impossible())
  68. self.assertIsInstance(x, UnpickleableExceptionWrapper)
  69. y = b.exception_to_python(x)
  70. self.assertEqual(y.__class__.__name__, "Impossible")
  71. if sys.version_info < (2, 5):
  72. self.assertTrue(y.__class__.__module__)
  73. else:
  74. self.assertEqual(y.__class__.__module__, "foo.module")
  75. def test_regular(self):
  76. x = b.prepare_exception(KeyError("baz"))
  77. self.assertIsInstance(x, KeyError)
  78. y = b.exception_to_python(x)
  79. self.assertIsInstance(y, KeyError)
  80. class KVBackend(KeyValueStoreBackend):
  81. def __init__(self, *args, **kwargs):
  82. self.db = {}
  83. super(KVBackend, self).__init__(KeyValueStoreBackend)
  84. def get(self, key):
  85. return self.db.get(key)
  86. def set(self, key, value):
  87. self.db[key] = value
  88. def delete(self, key):
  89. self.db.pop(key, None)
  90. class DictBackend(BaseDictBackend):
  91. def _save_taskset(self, taskset_id, result):
  92. return "taskset-saved"
  93. def _restore_taskset(self, taskset_id):
  94. if taskset_id == "exists":
  95. return {"result": "taskset"}
  96. def _get_task_meta_for(self, task_id):
  97. if task_id == "task-exists":
  98. return {"result": "task"}
  99. class test_BaseDictBackend(unittest.TestCase):
  100. def setUp(self):
  101. self.b = DictBackend()
  102. def test_save_taskset(self):
  103. self.assertEqual(self.b.save_taskset("foofoo", "xxx"),
  104. "taskset-saved")
  105. def test_restore_taskset(self):
  106. self.assertIsNone(self.b.restore_taskset("missing"))
  107. self.assertIsNone(self.b.restore_taskset("missing"))
  108. self.assertEqual(self.b.restore_taskset("exists"), "taskset")
  109. self.assertEqual(self.b.restore_taskset("exists"), "taskset")
  110. self.assertEqual(self.b.restore_taskset("exists", cache=False),
  111. "taskset")
  112. def test_reload_taskset_result(self):
  113. self.b._cache = {}
  114. self.b.reload_taskset_result("exists")
  115. self.b._cache["exists"] = {"result": "taskset"}
  116. def test_reload_task_result(self):
  117. self.b._cache = {}
  118. self.b.reload_task_result("task-exists")
  119. self.b._cache["task-exists"] = {"result": "task"}
  120. class test_KeyValueStoreBackend(unittest.TestCase):
  121. def setUp(self):
  122. self.b = KVBackend()
  123. def test_get_store_delete_result(self):
  124. tid = gen_unique_id()
  125. self.b.mark_as_done(tid, "Hello world")
  126. self.assertEqual(self.b.get_result(tid), "Hello world")
  127. self.assertEqual(self.b.get_status(tid), states.SUCCESS)
  128. self.b.forget(tid)
  129. self.assertEqual(self.b.get_status(tid), states.PENDING)
  130. def test_get_missing_meta(self):
  131. self.assertIsNone(self.b.get_result("xxx-missing"))
  132. self.assertEqual(self.b.get_status("xxx-missing"), states.PENDING)
  133. def test_save_restore_taskset(self):
  134. tid = gen_unique_id()
  135. self.b.save_taskset(tid, "Hello world")
  136. self.assertEqual(self.b.restore_taskset(tid), "Hello world")
  137. def test_restore_missing_taskset(self):
  138. self.assertIsNone(self.b.restore_taskset("xxx-nonexistant"))
  139. class test_KeyValueStoreBackend_interface(unittest.TestCase):
  140. def test_get(self):
  141. self.assertRaises(NotImplementedError, KeyValueStoreBackend().get,
  142. "a")
  143. def test_set(self):
  144. self.assertRaises(NotImplementedError, KeyValueStoreBackend().set,
  145. "a", 1)
  146. def test_cleanup(self):
  147. self.assertFalse(KeyValueStoreBackend().cleanup())
  148. def test_delete(self):
  149. self.assertRaises(NotImplementedError, KeyValueStoreBackend().delete,
  150. "a")
  151. def test_forget(self):
  152. self.assertRaises(NotImplementedError, KeyValueStoreBackend().forget,
  153. "a")