disabled_amqp.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import sys
  2. import unittest2 as unittest
  3. from celery import states
  4. from celery.utils import gen_unique_id
  5. from celery.backends.amqp import AMQPBackend
  6. from celery.datastructures import ExceptionInfo
  7. class SomeClass(object):
  8. def __init__(self, data):
  9. self.data = data
  10. class test_AMQPBackend(unittest.TestCase):
  11. def create_backend(self):
  12. return AMQPBackend(serializer="pickle", persistent=False)
  13. def test_mark_as_done(self):
  14. tb1 = self.create_backend()
  15. tb2 = self.create_backend()
  16. tid = gen_unique_id()
  17. tb1.mark_as_done(tid, 42)
  18. self.assertEqual(tb2.get_status(tid), states.SUCCESS)
  19. self.assertEqual(tb2.get_result(tid), 42)
  20. self.assertTrue(tb2._cache.get(tid))
  21. self.assertTrue(tb2.get_result(tid), 42)
  22. def test_is_pickled(self):
  23. tb1 = self.create_backend()
  24. tb2 = self.create_backend()
  25. tid2 = gen_unique_id()
  26. result = {"foo": "baz", "bar": SomeClass(12345)}
  27. tb1.mark_as_done(tid2, result)
  28. # is serialized properly.
  29. rindb = tb2.get_result(tid2)
  30. self.assertEqual(rindb.get("foo"), "baz")
  31. self.assertEqual(rindb.get("bar").data, 12345)
  32. def test_mark_as_failure(self):
  33. tb1 = self.create_backend()
  34. tb2 = self.create_backend()
  35. tid3 = gen_unique_id()
  36. try:
  37. raise KeyError("foo")
  38. except KeyError, exception:
  39. einfo = ExceptionInfo(sys.exc_info())
  40. tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
  41. self.assertEqual(tb2.get_status(tid3), states.FAILURE)
  42. self.assertIsInstance(tb2.get_result(tid3), KeyError)
  43. self.assertEqual(tb2.get_traceback(tid3), einfo.traceback)
  44. def test_process_cleanup(self):
  45. self.create_backend().process_cleanup()