test_redis.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import sys
  2. import socket
  3. import unittest2 as unittest
  4. from nose import SkipTest
  5. from celery.exceptions import ImproperlyConfigured
  6. from celery import states
  7. from celery.utils import gen_unique_id
  8. from celery.backends import pyredis
  9. from celery.backends.pyredis import RedisBackend
  10. from celery.tests.utils import execute_context, mask_modules
  11. _no_redis_msg = "* Redis %s. Will not execute related tests."
  12. _no_redis_msg_emitted = False
  13. try:
  14. from redis.exceptions import ConnectionError
  15. except ImportError:
  16. class ConnectionError(socket.error):
  17. pass
  18. class SomeClass(object):
  19. def __init__(self, data):
  20. self.data = data
  21. def get_redis_or_SkipTest():
  22. def emit_no_redis_msg(reason):
  23. global _no_redis_msg_emitted
  24. if not _no_redis_msg_emitted:
  25. sys.stderr.write("\n" + _no_redis_msg % reason + "\n")
  26. _no_redis_msg_emitted = True
  27. if pyredis.redis is None:
  28. emit_no_redis_msg("not installed")
  29. raise SkipTest("redis library not installed")
  30. try:
  31. tb = RedisBackend(redis_db="celery_unittest")
  32. try:
  33. tb.open()
  34. # Evaluate lazy connection
  35. tb._connection.connection.connect(tb._connection)
  36. except ConnectionError, exc:
  37. emit_no_redis_msg("not running")
  38. raise SkipTest("can't connect to redis: %s" % (exc, ))
  39. return tb
  40. except ImproperlyConfigured, exc:
  41. if "need to install" in str(exc):
  42. return emit_no_redis_msg("not installed")
  43. return emit_no_redis_msg("not configured")
  44. class TestRedisBackend(unittest.TestCase):
  45. def test_cached_connection(self):
  46. tb = get_redis_or_SkipTest()
  47. self.assertIsNotNone(tb._connection)
  48. tb.close()
  49. self.assertIsNone(tb._connection)
  50. tb.open()
  51. self.assertIsNotNone(tb._connection)
  52. def test_mark_as_done(self):
  53. tb = get_redis_or_SkipTest()
  54. tid = gen_unique_id()
  55. self.assertEqual(tb.get_status(tid), states.PENDING)
  56. self.assertIsNone(tb.get_result(tid))
  57. tb.mark_as_done(tid, 42)
  58. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  59. self.assertEqual(tb.get_result(tid), 42)
  60. def test_is_pickled(self):
  61. tb = get_redis_or_SkipTest()
  62. tid2 = gen_unique_id()
  63. result = {"foo": "baz", "bar": SomeClass(12345)}
  64. tb.mark_as_done(tid2, result)
  65. # is serialized properly.
  66. rindb = tb.get_result(tid2)
  67. self.assertEqual(rindb.get("foo"), "baz")
  68. self.assertEqual(rindb.get("bar").data, 12345)
  69. def test_mark_as_failure(self):
  70. tb = get_redis_or_SkipTest()
  71. tid3 = gen_unique_id()
  72. try:
  73. raise KeyError("foo")
  74. except KeyError, exception:
  75. pass
  76. tb.mark_as_failure(tid3, exception)
  77. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  78. self.assertIsInstance(tb.get_result(tid3), KeyError)
  79. def test_process_cleanup(self):
  80. tb = get_redis_or_SkipTest()
  81. tb.process_cleanup()
  82. self.assertIsNone(tb._connection)
  83. def test_connection_close_if_connected(self):
  84. tb = get_redis_or_SkipTest()
  85. tb.open()
  86. self.assertIsNotNone(tb._connection)
  87. tb.close()
  88. self.assertIsNone(tb._connection)
  89. tb.close()
  90. self.assertIsNone(tb._connection)
  91. class TestTyrantBackendNoTyrant(unittest.TestCase):
  92. def test_tyrant_None_if_tyrant_not_installed(self):
  93. prev = sys.modules.pop("celery.backends.pyredis")
  94. try:
  95. def with_redis_masked(_val):
  96. from celery.backends.pyredis import redis
  97. self.assertIsNone(redis)
  98. context = mask_modules("redis")
  99. execute_context(context, with_redis_masked)
  100. finally:
  101. sys.modules["celery.backends.pyredis"] = prev
  102. def test_constructor_raises_if_tyrant_not_installed(self):
  103. from celery.backends import pyredis
  104. prev = pyredis.redis
  105. pyredis.redis = None
  106. try:
  107. self.assertRaises(ImproperlyConfigured, pyredis.RedisBackend)
  108. finally:
  109. pyredis.redis = prev
  110. def test_constructor_raises_if_not_host_or_port(self):
  111. from celery.backends import pyredis
  112. prev_host = pyredis.RedisBackend.redis_host
  113. prev_port = pyredis.RedisBackend.redis_port
  114. pyredis.RedisBackend.redis_host = None
  115. pyredis.RedisBackend.redis_port = None
  116. try:
  117. self.assertRaises(ImproperlyConfigured, pyredis.RedisBackend)
  118. finally:
  119. pyredis.RedisBackend.redis_host = prev_host
  120. pyredis.RedisBackend.redis_port = prev_port