test_redis_unit.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from datetime import timedelta
  2. from celery import current_app
  3. from celery import states
  4. from celery.utils import gen_unique_id
  5. from celery.utils.timeutils import timedelta_seconds
  6. from celery.tests.utils import unittest
  7. class Redis(object):
  8. class Connection(object):
  9. connected = True
  10. def disconnect(self):
  11. self.connected = False
  12. def __init__(self, host=None, port=None, db=None, password=None, **kw):
  13. self.host = host
  14. self.port = port
  15. self.db = db
  16. self.password = password
  17. self.connection = self.Connection()
  18. self.keyspace = {}
  19. self.expiry = {}
  20. def get(self, key):
  21. return self.keyspace.get(key)
  22. def set(self, key, value):
  23. self.keyspace[key] = value
  24. def expire(self, key, expires):
  25. self.expiry[key] = expires
  26. def delete(self, key):
  27. self.keyspace.pop(key)
  28. class redis(object):
  29. Redis = Redis
  30. class test_RedisBackend(unittest.TestCase):
  31. def get_backend(self):
  32. from celery.backends import redis
  33. class RedisBackend(redis.RedisBackend):
  34. redis = redis
  35. return RedisBackend
  36. def setUp(self):
  37. self.Backend = self.get_backend()
  38. def test_expires_defaults_to_config(self):
  39. conf = current_app.conf
  40. prev = conf.CELERY_TASK_RESULT_EXPIRES
  41. conf.CELERY_TASK_RESULT_EXPIRES = 10
  42. try:
  43. b = self.Backend(expires=None)
  44. self.assertEqual(b.expires, 10)
  45. finally:
  46. conf.CELERY_TASK_RESULT_EXPIRES = prev
  47. def test_expires_is_int(self):
  48. b = self.Backend(expires=48)
  49. self.assertEqual(b.expires, 48)
  50. def test_expires_is_None(self):
  51. b = self.Backend(expires=None)
  52. self.assertEqual(b.expires, timedelta_seconds(
  53. current_app.conf.CELERY_TASK_RESULT_EXPIRES))
  54. def test_expires_is_timedelta(self):
  55. b = self.Backend(expires=timedelta(minutes=1))
  56. self.assertEqual(b.expires, 60)
  57. def test_get_set_forget(self):
  58. b = self.Backend()
  59. uuid = gen_unique_id()
  60. b.store_result(uuid, 42, states.SUCCESS)
  61. self.assertEqual(b.get_status(uuid), states.SUCCESS)
  62. self.assertEqual(b.get_result(uuid), 42)
  63. b.forget(uuid)
  64. self.assertEqual(b.get_status(uuid), states.PENDING)
  65. def test_set_expires(self):
  66. b = self.Backend(expires=512)
  67. uuid = gen_unique_id()
  68. key = b.get_key_for_task(uuid)
  69. b.store_result(uuid, 42, states.SUCCESS)
  70. self.assertEqual(b.client.expiry[key], 512)