test_actors.py 657 B

123456789101112131415161718192021222324252627
  1. from __future__ import absolute_import
  2. from kombu import BrokerConnection
  3. from celery.actors import Actor, Agent, AwareAgent
  4. from celery.tests.utils import AppCase
  5. class test_using_connection(AppCase):
  6. def assertConnection(self, cls):
  7. x = cls(app=self.app)
  8. self.assertTrue(x.connection)
  9. conn = BrokerConnection(transport="memory")
  10. x = cls(app=self.app, connection=conn)
  11. self.assertIs(x.connection, conn)
  12. def test_Actor(self):
  13. self.assertConnection(Actor)
  14. def test_Agent(self):
  15. self.assertConnection(Agent)
  16. def test_AwareAgent(self):
  17. self.assertConnection(AwareAgent)