test_messaging.py 420 B

1234567891011121314151617
  1. from celery import messaging
  2. from celery.tests.utils import unittest
  3. class test_compat_messaging_module(unittest.TestCase):
  4. def test_with_connection(self):
  5. def foo(**kwargs):
  6. pass
  7. self.assertTrue(messaging.with_connection(foo))
  8. def test_get_consume_set(self):
  9. conn = messaging.establish_connection()
  10. messaging.get_consumer_set(conn).close()
  11. conn.close()