test_amqp.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from __future__ import with_statement
  2. import sys
  3. import unittest
  4. import errno
  5. from django.core.exceptions import ImproperlyConfigured
  6. from celery.backends.amqp import AMQPBackend
  7. from celery.utils import gen_unique_id
  8. from celery.datastructures import ExceptionInfo
  9. class SomeClass(object):
  10. def __init__(self, data):
  11. self.data = data
  12. class TestRedisBackend(unittest.TestCase):
  13. def setUp(self):
  14. self.backend = AMQPBackend()
  15. def test_mark_as_done(self):
  16. tb = self.backend
  17. tid = gen_unique_id()
  18. tb.mark_as_done(tid, 42)
  19. self.assertTrue(tb.is_successful(tid))
  20. self.assertEquals(tb.get_status(tid), "SUCCESS")
  21. self.assertEquals(tb.get_result(tid), 42)
  22. self.assertTrue(tb._cache.get(tid))
  23. self.assertTrue(tb.get_result(tid), 42)
  24. def test_is_pickled(self):
  25. tb = self.backend
  26. tid2 = gen_unique_id()
  27. result = {"foo": "baz", "bar": SomeClass(12345)}
  28. tb.mark_as_done(tid2, result)
  29. # is serialized properly.
  30. rindb = tb.get_result(tid2)
  31. self.assertEquals(rindb.get("foo"), "baz")
  32. self.assertEquals(rindb.get("bar").data, 12345)
  33. def test_mark_as_failure(self):
  34. tb = self.backend
  35. tid3 = gen_unique_id()
  36. try:
  37. raise KeyError("foo")
  38. except KeyError, exception:
  39. einfo = ExceptionInfo(sys.exc_info())
  40. tb.mark_as_failure(tid3, exception, traceback=einfo.traceback)
  41. self.assertFalse(tb.is_successful(tid3))
  42. self.assertEquals(tb.get_status(tid3), "FAILURE")
  43. self.assertTrue(isinstance(tb.get_result(tid3), KeyError))
  44. self.assertEquals(tb.get_traceback(tid3), einfo.traceback)
  45. def test_process_cleanup(self):
  46. self.backend.process_cleanup()