test_tyrant.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from __future__ import absolute_import
  2. import sys
  3. import socket
  4. from nose import SkipTest
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery import states
  7. from celery.utils import uuid
  8. from celery.backends import tyrant
  9. from celery.backends.tyrant import TyrantBackend
  10. from celery.tests.utils import Case
  11. _no_tyrant_msg = "* Tokyo Tyrant %s. Will not execute related tests."
  12. _no_tyrant_msg_emitted = False
  13. class SomeClass(object):
  14. def __init__(self, data):
  15. self.data = data
  16. def get_tyrant_or_SkipTest():
  17. def emit_no_tyrant_msg(reason):
  18. global _no_tyrant_msg_emitted
  19. if not _no_tyrant_msg_emitted:
  20. sys.stderr.write("\n" + _no_tyrant_msg % reason + "\n")
  21. _no_tyrant_msg_emitted = True
  22. if tyrant.pytyrant is None:
  23. emit_no_tyrant_msg("not installed")
  24. raise SkipTest("pytyrant library not installed")
  25. try:
  26. tb = TyrantBackend()
  27. try:
  28. tb.open()
  29. except socket.error, exc:
  30. emit_no_tyrant_msg("not running")
  31. raise SkipTest("Can't connect to Tokyo server: %s" % (exc, ))
  32. return tb
  33. except ImproperlyConfigured, exc:
  34. if "need to install" in str(exc):
  35. emit_no_tyrant_msg("not installed")
  36. raise SkipTest("Tokyo Tyrant is not installed")
  37. emit_no_tyrant_msg("not configured")
  38. raise SkipTest("Tokyo Tyrant not configured")
  39. class TestTyrantBackend(Case):
  40. def test_cached_connection(self):
  41. tb = get_tyrant_or_SkipTest()
  42. self.assertIsNotNone(tb._connection)
  43. tb.close()
  44. self.assertIsNone(tb._connection)
  45. tb.open()
  46. self.assertIsNone(tb._connection)
  47. def test_mark_as_done(self):
  48. tb = get_tyrant_or_SkipTest()
  49. tid = uuid()
  50. self.assertEqual(tb.get_status(tid), states.PENDING)
  51. self.assertIsNone(tb.get_result(tid), None)
  52. tb.mark_as_done(tid, 42)
  53. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  54. self.assertEqual(tb.get_result(tid), 42)
  55. def test_is_pickled(self):
  56. tb = get_tyrant_or_SkipTest()
  57. tid2 = uuid()
  58. result = {"foo": "baz", "bar": SomeClass(12345)}
  59. tb.mark_as_done(tid2, result)
  60. # is serialized properly.
  61. rindb = tb.get_result(tid2)
  62. self.assertEqual(rindb.get("foo"), "baz")
  63. self.assertEqual(rindb.get("bar").data, 12345)
  64. def test_mark_as_failure(self):
  65. tb = get_tyrant_or_SkipTest()
  66. tid3 = uuid()
  67. try:
  68. raise KeyError("foo")
  69. except KeyError, exception:
  70. pass
  71. tb.mark_as_failure(tid3, exception)
  72. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  73. self.assertIsInstance(tb.get_result(tid3), KeyError)
  74. def test_process_cleanup(self):
  75. tb = get_tyrant_or_SkipTest()
  76. tb.process_cleanup()
  77. self.assertIsNone(tb._connection)