test_tyrant.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import sys
  2. import socket
  3. import unittest2 as unittest
  4. from celery.exceptions import ImproperlyConfigured
  5. from celery import states
  6. from celery.utils import gen_unique_id
  7. from celery.backends import tyrant
  8. from celery.backends.tyrant import TyrantBackend
  9. _no_tyrant_msg = "* Tokyo Tyrant %s. Will not execute related tests."
  10. _no_tyrant_msg_emitted = False
  11. class SomeClass(object):
  12. def __init__(self, data):
  13. self.data = data
  14. def get_tyrant_or_None():
  15. def emit_no_tyrant_msg(reason):
  16. global _no_tyrant_msg_emitted
  17. if not _no_tyrant_msg_emitted:
  18. sys.stderr.write("\n" + _no_tyrant_msg % reason + "\n")
  19. _no_tyrant_msg_emitted = True
  20. if tyrant.pytyrant is None:
  21. return emit_no_tyrant_msg("not installed")
  22. try:
  23. tb = TyrantBackend()
  24. try:
  25. tb.open()
  26. except socket.error:
  27. return emit_no_tyrant_msg("not running")
  28. return tb
  29. except ImproperlyConfigured, exc:
  30. if "need to install" in str(exc):
  31. return emit_no_tyrant_msg("not installed")
  32. return emit_no_tyrant_msg("not configured")
  33. class TestTyrantBackend(unittest.TestCase):
  34. def test_cached_connection(self):
  35. tb = get_tyrant_or_None()
  36. if not tb:
  37. return # Skip test
  38. self.assertIsNotNone(tb._connection)
  39. tb.close()
  40. self.assertIsNone(tb._connection)
  41. tb.open()
  42. self.assertIsNone(tb._connection)
  43. def test_mark_as_done(self):
  44. tb = get_tyrant_or_None()
  45. if not tb:
  46. return
  47. tid = gen_unique_id()
  48. self.assertFalse(tb.is_successful(tid))
  49. self.assertEqual(tb.get_status(tid), states.PENDING)
  50. self.assertIsNone(tb.get_result(tid), None)
  51. tb.mark_as_done(tid, 42)
  52. self.assertTrue(tb.is_successful(tid))
  53. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  54. self.assertEqual(tb.get_result(tid), 42)
  55. def test_is_pickled(self):
  56. tb = get_tyrant_or_None()
  57. if not tb:
  58. return
  59. tid2 = gen_unique_id()
  60. result = {"foo": "baz", "bar": SomeClass(12345)}
  61. tb.mark_as_done(tid2, result)
  62. # is serialized properly.
  63. rindb = tb.get_result(tid2)
  64. self.assertEqual(rindb.get("foo"), "baz")
  65. self.assertEqual(rindb.get("bar").data, 12345)
  66. def test_mark_as_failure(self):
  67. tb = get_tyrant_or_None()
  68. if not tb:
  69. return
  70. tid3 = gen_unique_id()
  71. try:
  72. raise KeyError("foo")
  73. except KeyError, exception:
  74. pass
  75. tb.mark_as_failure(tid3, exception)
  76. self.assertFalse(tb.is_successful(tid3))
  77. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  78. self.assertIsInstance(tb.get_result(tid3), KeyError)
  79. def test_process_cleanup(self):
  80. tb = get_tyrant_or_None()
  81. if not tb:
  82. return
  83. tb.process_cleanup()
  84. self.assertIsNone(tb._connection)