test_redis.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import sys
  2. import errno
  3. import socket
  4. import unittest
  5. from django.core.exceptions import ImproperlyConfigured
  6. from celery import states
  7. from celery.utils import gen_unique_id
  8. from celery.backends import pyredis
  9. from celery.backends.pyredis import RedisBackend
  10. from testunits.utils import execute_context, mask_modules
  11. _no_redis_msg = "* Redis %s. Will not execute related tests."
  12. _no_redis_msg_emitted = False
  13. try:
  14. from redis.exceptions import ConnectionError
  15. except ImportError:
  16. class ConnectionError(socket.error):
  17. pass
  18. class SomeClass(object):
  19. def __init__(self, data):
  20. self.data = data
  21. def get_redis_or_None():
  22. def emit_no_redis_msg(reason):
  23. global _no_redis_msg_emitted
  24. if not _no_redis_msg_emitted:
  25. sys.stderr.write("\n" + _no_redis_msg % reason + "\n")
  26. _no_redis_msg_emitted = True
  27. if pyredis.redis is None:
  28. return emit_no_redis_msg("not installed")
  29. try:
  30. tb = RedisBackend(redis_db="celery_unittest")
  31. try:
  32. tb.open()
  33. # Evaluate lazy connection
  34. tb._connection.connection.connect(tb._connection)
  35. except ConnectionError, exc:
  36. return emit_no_redis_msg("not running")
  37. return tb
  38. except ImproperlyConfigured, exc:
  39. if "need to install" in str(exc):
  40. return emit_no_redis_msg("not installed")
  41. return emit_no_redis_msg("not configured")
  42. class TestRedisBackend(unittest.TestCase):
  43. def test_cached_connection(self):
  44. tb = get_redis_or_None()
  45. if not tb:
  46. return # Skip test
  47. self.assertTrue(tb._connection is not None)
  48. tb.close()
  49. self.assertTrue(tb._connection is None)
  50. tb.open()
  51. self.assertTrue(tb._connection is not None)
  52. def test_mark_as_done(self):
  53. tb = get_redis_or_None()
  54. if not tb:
  55. return
  56. tid = gen_unique_id()
  57. self.assertFalse(tb.is_successful(tid))
  58. self.assertEquals(tb.get_status(tid), states.PENDING)
  59. self.assertEquals(tb.get_result(tid), None)
  60. tb.mark_as_done(tid, 42)
  61. self.assertTrue(tb.is_successful(tid))
  62. self.assertEquals(tb.get_status(tid), states.SUCCESS)
  63. self.assertEquals(tb.get_result(tid), 42)
  64. self.assertTrue(tb.get_result(tid), 42)
  65. def test_is_pickled(self):
  66. tb = get_redis_or_None()
  67. if not tb:
  68. return
  69. tid2 = gen_unique_id()
  70. result = {"foo": "baz", "bar": SomeClass(12345)}
  71. tb.mark_as_done(tid2, result)
  72. # is serialized properly.
  73. rindb = tb.get_result(tid2)
  74. self.assertEquals(rindb.get("foo"), "baz")
  75. self.assertEquals(rindb.get("bar").data, 12345)
  76. def test_mark_as_failure(self):
  77. tb = get_redis_or_None()
  78. if not tb:
  79. return
  80. tid3 = gen_unique_id()
  81. try:
  82. raise KeyError("foo")
  83. except KeyError, exception:
  84. pass
  85. tb.mark_as_failure(tid3, exception)
  86. self.assertFalse(tb.is_successful(tid3))
  87. self.assertEquals(tb.get_status(tid3), states.FAILURE)
  88. self.assertTrue(isinstance(tb.get_result(tid3), KeyError))
  89. def test_process_cleanup(self):
  90. tb = get_redis_or_None()
  91. if not tb:
  92. return
  93. tb.process_cleanup()
  94. self.assertTrue(tb._connection is None)
  95. def test_connection_close_if_connected(self):
  96. tb = get_redis_or_None()
  97. if not tb:
  98. return
  99. tb.open()
  100. self.assertTrue(tb._connection is not None)
  101. tb.close()
  102. self.assertTrue(tb._connection is None)
  103. tb.close()
  104. self.assertTrue(tb._connection is None)
  105. class TestTyrantBackendNoTyrant(unittest.TestCase):
  106. def test_tyrant_None_if_tyrant_not_installed(self):
  107. prev = sys.modules.pop("celery.backends.pyredis")
  108. try:
  109. def with_redis_masked(_val):
  110. from celery.backends.pyredis import redis
  111. self.assertTrue(redis is None)
  112. context = mask_modules("redis")
  113. execute_context(context, with_redis_masked)
  114. finally:
  115. sys.modules["celery.backends.pyredis"] = prev
  116. def test_constructor_raises_if_tyrant_not_installed(self):
  117. from celery.backends import pyredis
  118. prev = pyredis.redis
  119. pyredis.redis = None
  120. try:
  121. self.assertRaises(ImproperlyConfigured, pyredis.RedisBackend)
  122. finally:
  123. pyredis.redis = prev
  124. def test_constructor_raises_if_not_host_or_port(self):
  125. from celery.backends import pyredis
  126. prev_host = pyredis.RedisBackend.redis_host
  127. prev_port = pyredis.RedisBackend.redis_port
  128. pyredis.RedisBackend.redis_host = None
  129. pyredis.RedisBackend.redis_port = None
  130. try:
  131. self.assertRaises(ImproperlyConfigured, pyredis.RedisBackend)
  132. finally:
  133. pyredis.RedisBackend.redis_host = prev_host
  134. pyredis.RedisBackend.redis_port = prev_port