test_amqp.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import sys
  2. import errno
  3. import unittest
  4. from django.core.exceptions import ImproperlyConfigured
  5. from celery import states
  6. from celery.utils import gen_unique_id
  7. from celery.backends.amqp import AMQPBackend
  8. from celery.datastructures import ExceptionInfo
  9. class SomeClass(object):
  10. def __init__(self, data):
  11. self.data = data
  12. class TestRedisBackend(unittest.TestCase):
  13. def setUp(self):
  14. self.backend = AMQPBackend()
  15. self.backend._use_debug_tracking = True
  16. def test_mark_as_done(self):
  17. tb = self.backend
  18. tid = gen_unique_id()
  19. tb.mark_as_done(tid, 42)
  20. self.assertTrue(tb.is_successful(tid))
  21. self.assertEquals(tb.get_status(tid), states.SUCCESS)
  22. self.assertEquals(tb.get_result(tid), 42)
  23. self.assertTrue(tb._cache.get(tid))
  24. self.assertTrue(tb.get_result(tid), 42)
  25. def test_is_pickled(self):
  26. tb = self.backend
  27. tid2 = gen_unique_id()
  28. result = {"foo": "baz", "bar": SomeClass(12345)}
  29. tb.mark_as_done(tid2, result)
  30. # is serialized properly.
  31. rindb = tb.get_result(tid2)
  32. self.assertEquals(rindb.get("foo"), "baz")
  33. self.assertEquals(rindb.get("bar").data, 12345)
  34. def test_mark_as_failure(self):
  35. tb = self.backend
  36. tid3 = gen_unique_id()
  37. try:
  38. raise KeyError("foo")
  39. except KeyError, exception:
  40. einfo = ExceptionInfo(sys.exc_info())
  41. tb.mark_as_failure(tid3, exception, traceback=einfo.traceback)
  42. self.assertFalse(tb.is_successful(tid3))
  43. self.assertEquals(tb.get_status(tid3), states.FAILURE)
  44. self.assertTrue(isinstance(tb.get_result(tid3), KeyError))
  45. self.assertEquals(tb.get_traceback(tid3), einfo.traceback)
  46. def test_process_cleanup(self):
  47. self.backend.process_cleanup()