test_tyrant.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import sys
  2. import errno
  3. import socket
  4. import unittest
  5. from django.core.exceptions import ImproperlyConfigured
  6. from celery import states
  7. from celery.utils import gen_unique_id
  8. from celery.backends import tyrant
  9. from celery.backends.tyrant import TyrantBackend
  10. _no_tyrant_msg = "* Tokyo Tyrant %s. Will not execute related tests."
  11. _no_tyrant_msg_emitted = False
  12. class SomeClass(object):
  13. def __init__(self, data):
  14. self.data = data
  15. def get_tyrant_or_None():
  16. def emit_no_tyrant_msg(reason):
  17. global _no_tyrant_msg_emitted
  18. if not _no_tyrant_msg_emitted:
  19. sys.stderr.write("\n" + _no_tyrant_msg % reason + "\n")
  20. _no_tyrant_msg_emitted = True
  21. if tyrant.pytyrant is None:
  22. return emit_no_tyrant_msg("not installed")
  23. try:
  24. tb = TyrantBackend()
  25. try:
  26. tb.open()
  27. except socket.error:
  28. return emit_no_tyrant_msg("not running")
  29. return tb
  30. except ImproperlyConfigured, exc:
  31. if "need to install" in str(exc):
  32. return emit_no_tyrant_msg("not installed")
  33. return emit_no_tyrant_msg("not configured")
  34. class TestTyrantBackend(unittest.TestCase):
  35. def test_cached_connection(self):
  36. tb = get_tyrant_or_None()
  37. if not tb:
  38. return # Skip test
  39. self.assertTrue(tb._connection is not None)
  40. tb.close()
  41. self.assertTrue(tb._connection is None)
  42. tb.open()
  43. self.assertTrue(tb._connection is not None)
  44. def test_mark_as_done(self):
  45. tb = get_tyrant_or_None()
  46. if not tb:
  47. return
  48. tid = gen_unique_id()
  49. self.assertFalse(tb.is_successful(tid))
  50. self.assertEquals(tb.get_status(tid), states.PENDING)
  51. self.assertEquals(tb.get_result(tid), None)
  52. tb.mark_as_done(tid, 42)
  53. self.assertTrue(tb.is_successful(tid))
  54. self.assertEquals(tb.get_status(tid), states.SUCCESS)
  55. self.assertEquals(tb.get_result(tid), 42)
  56. self.assertTrue(tb.get_result(tid), 42)
  57. def test_is_pickled(self):
  58. tb = get_tyrant_or_None()
  59. if not tb:
  60. return
  61. tid2 = gen_unique_id()
  62. result = {"foo": "baz", "bar": SomeClass(12345)}
  63. tb.mark_as_done(tid2, result)
  64. # is serialized properly.
  65. rindb = tb.get_result(tid2)
  66. self.assertEquals(rindb.get("foo"), "baz")
  67. self.assertEquals(rindb.get("bar").data, 12345)
  68. def test_mark_as_failure(self):
  69. tb = get_tyrant_or_None()
  70. if not tb:
  71. return
  72. tid3 = gen_unique_id()
  73. try:
  74. raise KeyError("foo")
  75. except KeyError, exception:
  76. pass
  77. tb.mark_as_failure(tid3, exception)
  78. self.assertFalse(tb.is_successful(tid3))
  79. self.assertEquals(tb.get_status(tid3), states.FAILURE)
  80. self.assertTrue(isinstance(tb.get_result(tid3), KeyError))
  81. def test_process_cleanup(self):
  82. tb = get_tyrant_or_None()
  83. if not tb:
  84. return
  85. tb.process_cleanup()
  86. self.assertTrue(tb._connection is None)