test_tyrant.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import sys
  2. import unittest
  3. import errno
  4. import socket
  5. from celery.backends import tyrant
  6. from celery.backends.tyrant import TyrantBackend
  7. from celery.utils import gen_unique_id
  8. from django.core.exceptions import ImproperlyConfigured
  9. _no_tyrant_msg = "* Tokyo Tyrant not running. Will not execute related tests."
  10. _no_tyrant_msg_emitted = False
  11. class SomeClass(object):
  12. def __init__(self, data):
  13. self.data = data
  14. def get_tyrant_or_None():
  15. def emit_no_tyrant_msg():
  16. global _no_tyrant_msg_emitted
  17. if not _no_tyrant_msg_emitted:
  18. sys.stderr.write("\n" + _no_tyrant_msg + "\n")
  19. _no_tyrant_msg_emitted = True
  20. if tyrant.pytyrant is None:
  21. emit_no_tyrant_msg()
  22. return None
  23. try:
  24. tb = TyrantBackend()
  25. try:
  26. tb.open()
  27. except socket.error, exc:
  28. if exc.errno == errno.ECONNREFUSED:
  29. emit_no_tyrant_msg()
  30. return None
  31. else:
  32. raise
  33. return tb
  34. except ImproperlyConfigured, exc:
  35. return None
  36. class TestTyrantBackend(unittest.TestCase):
  37. def test_cached_connection(self):
  38. tb = get_tyrant_or_None()
  39. if not tb:
  40. return # Skip test
  41. self.assertTrue(tb._connection is not None)
  42. tb.close()
  43. self.assertTrue(tb._connection is None)
  44. tb.open()
  45. self.assertTrue(tb._connection is not None)
  46. def test_mark_as_done(self):
  47. tb = get_tyrant_or_None()
  48. if not tb:
  49. return
  50. tid = gen_unique_id()
  51. self.assertFalse(tb.is_successful(tid))
  52. self.assertEquals(tb.get_status(tid), "PENDING")
  53. self.assertEquals(tb.get_result(tid), None)
  54. tb.mark_as_done(tid, 42)
  55. self.assertTrue(tb.is_successful(tid))
  56. self.assertEquals(tb.get_status(tid), "SUCCESS")
  57. self.assertEquals(tb.get_result(tid), 42)
  58. self.assertTrue(tb._cache.get(tid))
  59. self.assertTrue(tb.get_result(tid), 42)
  60. def test_is_pickled(self):
  61. tb = get_tyrant_or_None()
  62. if not tb:
  63. return
  64. tid2 = gen_unique_id()
  65. result = {"foo": "baz", "bar": SomeClass(12345)}
  66. tb.mark_as_done(tid2, result)
  67. # is serialized properly.
  68. rindb = tb.get_result(tid2)
  69. self.assertEquals(rindb.get("foo"), "baz")
  70. self.assertEquals(rindb.get("bar").data, 12345)
  71. def test_mark_as_failure(self):
  72. tb = get_tyrant_or_None()
  73. if not tb:
  74. return
  75. tid3 = gen_unique_id()
  76. try:
  77. raise KeyError("foo")
  78. except KeyError, exception:
  79. pass
  80. tb.mark_as_failure(tid3, exception)
  81. self.assertFalse(tb.is_successful(tid3))
  82. self.assertEquals(tb.get_status(tid3), "FAILURE")
  83. self.assertTrue(isinstance(tb.get_result(tid3), KeyError))
  84. def test_process_cleanup(self):
  85. tb = get_tyrant_or_None()
  86. if not tb:
  87. return
  88. tb.process_cleanup()
  89. self.assertTrue(tb._connection is None)