test_redis.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import sys
  4. import socket
  5. from celery.tests.utils import unittest
  6. from nose import SkipTest
  7. from celery.exceptions import ImproperlyConfigured
  8. from celery import states
  9. from celery.utils import gen_unique_id
  10. from celery.backends import redis
  11. from celery.backends.redis import RedisBackend
  12. from celery.tests.utils import mask_modules
  13. _no_redis_msg = "* Redis %s. Will not execute related tests."
  14. _no_redis_msg_emitted = False
  15. try:
  16. from redis.exceptions import ConnectionError
  17. except ImportError:
  18. class ConnectionError(socket.error): # noqa
  19. pass
  20. class SomeClass(object):
  21. def __init__(self, data):
  22. self.data = data
  23. def get_redis_or_SkipTest():
  24. def emit_no_redis_msg(reason):
  25. global _no_redis_msg_emitted
  26. if not _no_redis_msg_emitted:
  27. sys.stderr.write("\n" + _no_redis_msg % reason + "\n")
  28. _no_redis_msg_emitted = True
  29. if redis.redis is None:
  30. emit_no_redis_msg("not installed")
  31. raise SkipTest("redis library not installed")
  32. try:
  33. tb = RedisBackend(redis_db="celery_unittest")
  34. try:
  35. # Evaluate lazy connection
  36. tb.client.info()
  37. except ConnectionError, exc:
  38. emit_no_redis_msg("not running")
  39. raise SkipTest("can't connect to redis: %s" % (exc, ))
  40. return tb
  41. except ImproperlyConfigured, exc:
  42. if "need to install" in str(exc):
  43. return emit_no_redis_msg("not installed")
  44. return emit_no_redis_msg("not configured")
  45. class TestRedisBackend(unittest.TestCase):
  46. def test_mark_as_done(self):
  47. tb = get_redis_or_SkipTest()
  48. tid = gen_unique_id()
  49. self.assertEqual(tb.get_status(tid), states.PENDING)
  50. self.assertIsNone(tb.get_result(tid))
  51. tb.mark_as_done(tid, 42)
  52. self.assertEqual(tb.get_status(tid), states.SUCCESS)
  53. self.assertEqual(tb.get_result(tid), 42)
  54. def test_is_pickled(self):
  55. tb = get_redis_or_SkipTest()
  56. tid2 = gen_unique_id()
  57. result = {"foo": "baz", "bar": SomeClass(12345)}
  58. tb.mark_as_done(tid2, result)
  59. # is serialized properly.
  60. rindb = tb.get_result(tid2)
  61. self.assertEqual(rindb.get("foo"), "baz")
  62. self.assertEqual(rindb.get("bar").data, 12345)
  63. def test_mark_as_failure(self):
  64. tb = get_redis_or_SkipTest()
  65. tid3 = gen_unique_id()
  66. try:
  67. raise KeyError("foo")
  68. except KeyError, exception:
  69. pass
  70. tb.mark_as_failure(tid3, exception)
  71. self.assertEqual(tb.get_status(tid3), states.FAILURE)
  72. self.assertIsInstance(tb.get_result(tid3), KeyError)
  73. class TestRedisBackendNoRedis(unittest.TestCase):
  74. def test_redis_None_if_redis_not_installed(self):
  75. prev = sys.modules.pop("celery.backends.redis")
  76. try:
  77. with mask_modules("redis"):
  78. from celery.backends.redis import redis
  79. self.assertIsNone(redis)
  80. finally:
  81. sys.modules["celery.backends.redis"] = prev
  82. def test_constructor_raises_if_redis_not_installed(self):
  83. from celery.backends import redis
  84. prev = redis.RedisBackend.redis
  85. redis.RedisBackend.redis = None
  86. try:
  87. self.assertRaises(ImproperlyConfigured, redis.RedisBackend)
  88. finally:
  89. redis.RedisBackend.redis = prev