test_tyrant.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import sys
  2. import socket
  3. import unittest2 as unittest
  4. from nose import SkipTest
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery import states
  7. from celery.utils import gen_unique_id
  8. from celery.backends import tyrant
  9. from celery.backends.tyrant import TyrantBackend
  10. _no_tyrant_msg = "* Tokyo Tyrant %s. Will not execute related tests."
  11. _no_tyrant_msg_emitted = False
  12. class SomeClass(object):
  13. def __init__(self, data):
  14. self.data = data
  15. def get_tyrant_or_SkipTest():
  16. def emit_no_tyrant_msg(reason):
  17. global _no_tyrant_msg_emitted
  18. if not _no_tyrant_msg_emitted:
  19. sys.stderr.write("\n" + _no_tyrant_msg % reason + "\n")
  20. _no_tyrant_msg_emitted = True
  21. if tyrant.pytyrant is None:
  22. emit_no_tyrant_msg("not installed")
  23. raise SkipTest("pytyrant library not installed")
  24. try:
  25. tb = TyrantBackend()
  26. try:
  27. tb.open()
  28. except socket.error, exc:
  29. emit_no_tyrant_msg("not running")
  30. raise SkipTest("Can't connect to Tokyo server: %s" % (exc, ))
  31. return tb
  32. except ImproperlyConfigured, exc:
  33. if "need to install" in str(exc):
  34. emit_no_tyrant_msg("not installed")
  35. raise SkipTest("Tokyo Tyrant is not installed")
  36. emit_no_tyrant_msg("not configured")
  37. raise SkipTest("Tokyo Tyrant not configured")
  38. class TestTyrantBackend(unittest.TestCase):
  39. def test_cached_connection(self):
  40. tb = get_tyrant_or_SkipTest()
  41. self.assertIsNotNone(tb._connection)
  42. tb.close()
  43. self.assertIsNone(tb._connection)
  44. tb.open()
  45. self.assertIsNone(tb._connection)
  46. def test_mark_as_done(self):
  47. tb = get_tyrant_or_SkipTest()
  48. tid = gen_unique_id()
  49. self.assertEqual(tb.get_status(tid), states.PENDING)
  50. self.assertIsNone(tb.get_result(tid), None)
  51. tb.mark_as_done(tid, 42)
  52. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  53. self.assertEqual(tb.get_result(tid), 42)
  54. def test_is_pickled(self):
  55. tb = get_tyrant_or_SkipTest()
  56. tid2 = gen_unique_id()
  57. result = {"foo": "baz", "bar": SomeClass(12345)}
  58. tb.mark_as_done(tid2, result)
  59. # is serialized properly.
  60. rindb = tb.get_result(tid2)
  61. self.assertEqual(rindb.get("foo"), "baz")
  62. self.assertEqual(rindb.get("bar").data, 12345)
  63. def test_mark_as_failure(self):
  64. tb = get_tyrant_or_SkipTest()
  65. tid3 = gen_unique_id()
  66. try:
  67. raise KeyError("foo")
  68. except KeyError, exception:
  69. pass
  70. tb.mark_as_failure(tid3, exception)
  71. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  72. self.assertIsInstance(tb.get_result(tid3), KeyError)
  73. def test_process_cleanup(self):
  74. tb = get_tyrant_or_SkipTest()
  75. tb.process_cleanup()
  76. self.assertIsNone(tb._connection)