test_registry.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from celery.app.registry import _unpickle_task, _unpickle_task_v2
  4. def returns():
  5. return 1
  6. @pytest.mark.usefixtures('depends_on_current_app')
  7. class test_unpickle_task:
  8. def test_unpickle_v1(self, app):
  9. app.tasks['txfoo'] = 'bar'
  10. assert _unpickle_task('txfoo') == 'bar'
  11. def test_unpickle_v2(self, app):
  12. app.tasks['txfoo1'] = 'bar1'
  13. assert _unpickle_task_v2('txfoo1') == 'bar1'
  14. assert _unpickle_task_v2('txfoo1', module='celery') == 'bar1'
  15. class test_TaskRegistry:
  16. def setup(self):
  17. self.mytask = self.app.task(name='A', shared=False)(returns)
  18. self.myperiodic = self.app.task(
  19. name='B', shared=False, type='periodic',
  20. )(returns)
  21. def test_NotRegistered_str(self):
  22. assert repr(self.app.tasks.NotRegistered('tasks.add'))
  23. def assert_register_unregister_cls(self, r, task):
  24. r.unregister(task)
  25. with pytest.raises(r.NotRegistered):
  26. r.unregister(task)
  27. r.register(task)
  28. assert task.name in r
  29. def test_task_registry(self):
  30. r = self.app._tasks
  31. assert isinstance(r, dict)
  32. self.assert_register_unregister_cls(r, self.mytask)
  33. self.assert_register_unregister_cls(r, self.myperiodic)
  34. r.register(self.myperiodic)
  35. r.unregister(self.myperiodic.name)
  36. assert self.myperiodic not in r
  37. r.register(self.myperiodic)
  38. tasks = dict(r)
  39. assert tasks.get(self.mytask.name) is self.mytask
  40. assert tasks.get(self.myperiodic.name) is self.myperiodic
  41. assert r[self.mytask.name] is self.mytask
  42. assert r[self.myperiodic.name] is self.myperiodic
  43. r.unregister(self.mytask)
  44. assert self.mytask.name not in r
  45. r.unregister(self.myperiodic)
  46. assert self.myperiodic.name not in r
  47. assert self.mytask.run()
  48. assert self.myperiodic.run()
  49. def test_compat(self):
  50. assert self.app.tasks.regular()
  51. assert self.app.tasks.periodic()