test_base.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import sys
  2. import types
  3. import unittest2 as unittest
  4. from celery.serialization import subclass_exception
  5. from celery.serialization import find_nearest_pickleable_exception as fnpe
  6. from celery.serialization import UnpickleableExceptionWrapper
  7. from celery.serialization import get_pickleable_exception as gpe
  8. from celery import states
  9. from celery.backends.base import BaseBackend, KeyValueStoreBackend
  10. from celery.backends.base import BaseDictBackend
  11. from celery.utils import gen_unique_id
  12. class wrapobject(object):
  13. def __init__(self, *args, **kwargs):
  14. self.args = args
  15. Oldstyle = types.ClassType("Oldstyle", (), {})
  16. Unpickleable = subclass_exception("Unpickleable", KeyError, "foo.module")
  17. Impossible = subclass_exception("Impossible", object, "foo.module")
  18. Lookalike = subclass_exception("Lookalike", wrapobject, "foo.module")
  19. b = BaseBackend()
  20. class test_BaseBackend_interface(unittest.TestCase):
  21. def test_get_status(self):
  22. self.assertRaises(NotImplementedError,
  23. b.get_status, "SOMExx-N0Nex1stant-IDxx-")
  24. def test_store_result(self):
  25. self.assertRaises(NotImplementedError,
  26. b.store_result, "SOMExx-N0nex1stant-IDxx-", 42, states.SUCCESS)
  27. def test_reload_task_result(self):
  28. self.assertRaises(NotImplementedError,
  29. b.reload_task_result, "SOMExx-N0nex1stant-IDxx-")
  30. def test_reload_taskset_result(self):
  31. self.assertRaises(NotImplementedError,
  32. b.reload_taskset_result, "SOMExx-N0nex1stant-IDxx-")
  33. def test_get_result(self):
  34. self.assertRaises(NotImplementedError,
  35. b.get_result, "SOMExx-N0nex1stant-IDxx-")
  36. def test_restore_taskset(self):
  37. self.assertRaises(NotImplementedError,
  38. b.restore_taskset, "SOMExx-N0nex1stant-IDxx-")
  39. def test_save_taskset(self):
  40. self.assertRaises(NotImplementedError,
  41. b.save_taskset, "SOMExx-N0nex1stant-IDxx-", "blergh")
  42. def test_get_traceback(self):
  43. self.assertRaises(NotImplementedError,
  44. b.get_traceback, "SOMExx-N0nex1stant-IDxx-")
  45. class test_exception_pickle(unittest.TestCase):
  46. def test_oldstyle(self):
  47. self.assertIsNone(fnpe(Oldstyle()))
  48. def test_BaseException(self):
  49. self.assertIsNone(fnpe(Exception()))
  50. def test_get_pickleable_exception(self):
  51. exc = Exception("foo")
  52. self.assertEqual(gpe(exc), exc)
  53. def test_unpickleable(self):
  54. self.assertIsInstance(fnpe(Unpickleable()), KeyError)
  55. self.assertIsNone(fnpe(Impossible()))
  56. class test_prepare_exception(unittest.TestCase):
  57. def test_unpickleable(self):
  58. x = b.prepare_exception(Unpickleable(1, 2, "foo"))
  59. self.assertIsInstance(x, KeyError)
  60. y = b.exception_to_python(x)
  61. self.assertIsInstance(y, KeyError)
  62. def test_impossible(self):
  63. x = b.prepare_exception(Impossible())
  64. self.assertIsInstance(x, UnpickleableExceptionWrapper)
  65. y = b.exception_to_python(x)
  66. self.assertEqual(y.__class__.__name__, "Impossible")
  67. if sys.version_info < (2, 5):
  68. self.assertTrue(y.__class__.__module__)
  69. else:
  70. self.assertEqual(y.__class__.__module__, "foo.module")
  71. def test_regular(self):
  72. x = b.prepare_exception(KeyError("baz"))
  73. self.assertIsInstance(x, KeyError)
  74. y = b.exception_to_python(x)
  75. self.assertIsInstance(y, KeyError)
  76. class KVBackend(KeyValueStoreBackend):
  77. def __init__(self, *args, **kwargs):
  78. self.db = {}
  79. super(KVBackend, self).__init__(KeyValueStoreBackend)
  80. def get(self, key):
  81. return self.db.get(key)
  82. def set(self, key, value):
  83. self.db[key] = value
  84. class DictBackend(BaseDictBackend):
  85. def _save_taskset(self, taskset_id, result):
  86. return "taskset-saved"
  87. def _restore_taskset(self, taskset_id):
  88. if taskset_id == "exists":
  89. return {"result": "taskset"}
  90. def _get_task_meta_for(self, task_id):
  91. if task_id == "task-exists":
  92. return {"result": "task"}
  93. class test_BaseDictBackend(unittest.TestCase):
  94. def setUp(self):
  95. self.b = DictBackend()
  96. def test_save_taskset(self):
  97. self.assertEqual(self.b.save_taskset("foofoo", "xxx"),
  98. "taskset-saved")
  99. def test_restore_taskset(self):
  100. self.assertIsNone(self.b.restore_taskset("missing"))
  101. self.assertIsNone(self.b.restore_taskset("missing"))
  102. self.assertEqual(self.b.restore_taskset("exists"), "taskset")
  103. self.assertEqual(self.b.restore_taskset("exists"), "taskset")
  104. self.assertEqual(self.b.restore_taskset("exists", cache=False),
  105. "taskset")
  106. def test_reload_taskset_result(self):
  107. self.b._cache = {}
  108. self.b.reload_taskset_result("exists")
  109. self.b._cache["exists"] = {"result": "taskset"}
  110. def test_reload_task_result(self):
  111. self.b._cache = {}
  112. self.b.reload_taskset_result("task-exists")
  113. self.b._cache["task-exists"] = {"result": "task"}
  114. class test_KeyValueStoreBackend(unittest.TestCase):
  115. def setUp(self):
  116. self.b = KVBackend()
  117. def test_get_store_result(self):
  118. tid = gen_unique_id()
  119. self.b.mark_as_done(tid, "Hello world")
  120. self.assertEqual(self.b.get_result(tid), "Hello world")
  121. self.assertEqual(self.b.get_status(tid), states.SUCCESS)
  122. def test_get_missing_meta(self):
  123. self.assertIsNone(self.b.get_result("xxx-missing"))
  124. self.assertEqual(self.b.get_status("xxx-missing"), states.PENDING)
  125. def test_save_restore_taskset(self):
  126. tid = gen_unique_id()
  127. self.b.save_taskset(tid, "Hello world")
  128. self.assertEqual(self.b.restore_taskset(tid), "Hello world")
  129. def test_restore_missing_taskset(self):
  130. self.assertIsNone(self.b.restore_taskset("xxx-nonexistant"))
  131. class test_KeyValueStoreBackend_interface(unittest.TestCase):
  132. def test_get(self):
  133. self.assertRaises(NotImplementedError, KeyValueStoreBackend().get,
  134. "a")
  135. def test_set(self):
  136. self.assertRaises(NotImplementedError, KeyValueStoreBackend().set,
  137. "a", 1)
  138. def test_cleanup(self):
  139. self.assertFalse(KeyValueStoreBackend().cleanup())