test_compat.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from __future__ import absolute_import
  2. import celery
  3. from celery.task.base import Task
  4. from celery.tests.utils import Case
  5. class test_MagicModule(Case):
  6. def test_class_property_set_without_type(self):
  7. self.assertTrue(Task.__dict__["app"].__get__(Task()))
  8. def test_class_property_set_on_class(self):
  9. self.assertIs(Task.__dict__["app"].__set__(None, None),
  10. Task.__dict__["app"])
  11. def test_class_property_set(self):
  12. class X(Task):
  13. pass
  14. app = celery.Celery(set_as_current=False)
  15. Task.__dict__["app"].__set__(X(), app)
  16. self.assertEqual(X.app, app)
  17. def test_dir(self):
  18. self.assertTrue(dir(celery.messaging))
  19. def test_direct(self):
  20. import sys
  21. prev_celery = sys.modules.pop("celery", None)
  22. prev_task = sys.modules.pop("celery.task", None)
  23. try:
  24. import celery
  25. self.assertTrue(celery.task)
  26. finally:
  27. sys.modules["celery"] = prev_celery
  28. sys.modules["celery.task"] = prev_task
  29. def test_app_attrs(self):
  30. self.assertEqual(celery.task.control.broadcast,
  31. celery.current_app.control.broadcast)
  32. def test_decorators_task(self):
  33. @celery.decorators.task
  34. def _test_decorators_task():
  35. pass
  36. self.assertTrue(_test_decorators_task.accept_magic_kwargs)
  37. def test_decorators_periodic_task(self):
  38. @celery.decorators.periodic_task(run_every=3600)
  39. def _test_decorators_ptask():
  40. pass
  41. self.assertTrue(_test_decorators_ptask.accept_magic_kwargs)