test_registry.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from celery.app.registry import _unpickle_task, _unpickle_task_v2
  4. from celery.exceptions import InvalidTaskError
  5. def returns():
  6. return 1
  7. @pytest.mark.usefixtures('depends_on_current_app')
  8. class test_unpickle_task:
  9. def test_unpickle_v1(self, app):
  10. app.tasks['txfoo'] = 'bar'
  11. assert _unpickle_task('txfoo') == 'bar'
  12. def test_unpickle_v2(self, app):
  13. app.tasks['txfoo1'] = 'bar1'
  14. assert _unpickle_task_v2('txfoo1') == 'bar1'
  15. assert _unpickle_task_v2('txfoo1', module='celery') == 'bar1'
  16. class test_TaskRegistry:
  17. def setup(self):
  18. self.mytask = self.app.task(name='A', shared=False)(returns)
  19. self.missing_name_task = self.app.task(
  20. name=None, shared=False)(returns)
  21. self.missing_name_task.name = None # name is overridden with path
  22. self.myperiodic = self.app.task(
  23. name='B', shared=False, type='periodic',
  24. )(returns)
  25. def test_NotRegistered_str(self):
  26. assert repr(self.app.tasks.NotRegistered('tasks.add'))
  27. def assert_register_unregister_cls(self, r, task):
  28. r.unregister(task)
  29. with pytest.raises(r.NotRegistered):
  30. r.unregister(task)
  31. r.register(task)
  32. assert task.name in r
  33. def test_task_registry(self):
  34. r = self.app._tasks
  35. assert isinstance(r, dict)
  36. self.assert_register_unregister_cls(r, self.mytask)
  37. self.assert_register_unregister_cls(r, self.myperiodic)
  38. with pytest.raises(InvalidTaskError):
  39. r.register(self.missing_name_task)
  40. r.register(self.myperiodic)
  41. r.unregister(self.myperiodic.name)
  42. assert self.myperiodic not in r
  43. r.register(self.myperiodic)
  44. tasks = dict(r)
  45. assert tasks.get(self.mytask.name) is self.mytask
  46. assert tasks.get(self.myperiodic.name) is self.myperiodic
  47. assert r[self.mytask.name] is self.mytask
  48. assert r[self.myperiodic.name] is self.myperiodic
  49. r.unregister(self.mytask)
  50. assert self.mytask.name not in r
  51. r.unregister(self.myperiodic)
  52. assert self.myperiodic.name not in r
  53. assert self.mytask.run()
  54. assert self.myperiodic.run()
  55. def test_compat(self):
  56. assert self.app.tasks.regular()
  57. assert self.app.tasks.periodic()