test_registry.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import pytest
  2. from celery.app.registry import _unpickle_task, _unpickle_task_v2
  3. from celery.exceptions import InvalidTaskError
  4. def returns():
  5. return 1
  6. @pytest.mark.usefixtures('depends_on_current_app')
  7. class test_unpickle_task:
  8. def test_unpickle_v1(self, app):
  9. app.tasks['txfoo'] = 'bar'
  10. assert _unpickle_task('txfoo') == 'bar'
  11. def test_unpickle_v2(self, app):
  12. app.tasks['txfoo1'] = 'bar1'
  13. assert _unpickle_task_v2('txfoo1') == 'bar1'
  14. assert _unpickle_task_v2('txfoo1', module='celery') == 'bar1'
  15. class test_TaskRegistry:
  16. def setup(self):
  17. self.mytask = self.app.task(name='A', shared=False)(returns)
  18. self.missing_name_task = self.app.task(
  19. name=None, shared=False)(returns)
  20. self.missing_name_task.name = None # name is overridden with path
  21. self.myperiodic = self.app.task(
  22. name='B', shared=False, type='periodic',
  23. )(returns)
  24. def test_NotRegistered_str(self):
  25. assert repr(self.app.tasks.NotRegistered('tasks.add'))
  26. def assert_register_unregister_cls(self, r, task):
  27. r.unregister(task)
  28. with pytest.raises(r.NotRegistered):
  29. r.unregister(task)
  30. r.register(task)
  31. assert task.name in r
  32. def test_task_registry(self):
  33. r = self.app._tasks
  34. assert isinstance(r, dict)
  35. self.assert_register_unregister_cls(r, self.mytask)
  36. self.assert_register_unregister_cls(r, self.myperiodic)
  37. with pytest.raises(InvalidTaskError):
  38. r.register(self.missing_name_task)
  39. r.register(self.myperiodic)
  40. r.unregister(self.myperiodic.name)
  41. assert self.myperiodic not in r
  42. r.register(self.myperiodic)
  43. tasks = dict(r)
  44. assert tasks.get(self.mytask.name) is self.mytask
  45. assert tasks.get(self.myperiodic.name) is self.myperiodic
  46. assert r[self.mytask.name] is self.mytask
  47. assert r[self.myperiodic.name] is self.myperiodic
  48. r.unregister(self.mytask)
  49. assert self.mytask.name not in r
  50. r.unregister(self.myperiodic)
  51. assert self.myperiodic.name not in r
  52. assert self.mytask.run()
  53. assert self.myperiodic.run()
  54. def test_compat(self):
  55. assert self.app.tasks.regular()
  56. assert self.app.tasks.periodic()