test_registry.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from __future__ import absolute_import
  2. from celery import Celery
  3. from celery.app.registry import (
  4. TaskRegistry,
  5. _unpickle_task,
  6. _unpickle_task_v2,
  7. )
  8. from celery.task import Task, PeriodicTask
  9. from celery.tests.utils import AppCase, Case
  10. class MockTask(Task):
  11. name = 'celery.unittest.test_task'
  12. def run(self, **kwargs):
  13. return True
  14. class MockPeriodicTask(PeriodicTask):
  15. name = 'celery.unittest.test_periodic_task'
  16. run_every = 10
  17. def run(self, **kwargs):
  18. return True
  19. class test_unpickle_task(AppCase):
  20. def setup(self):
  21. self.app = Celery(set_as_current=True)
  22. def test_unpickle_v1(self):
  23. self.app.tasks['txfoo'] = 'bar'
  24. self.assertEqual(_unpickle_task('txfoo'), 'bar')
  25. def test_unpickle_v2(self):
  26. self.app.tasks['txfoo1'] = 'bar1'
  27. self.assertEqual(_unpickle_task_v2('txfoo1'), 'bar1')
  28. self.assertEqual(_unpickle_task_v2('txfoo1', module='celery'), 'bar1')
  29. class test_TaskRegistry(Case):
  30. def test_NotRegistered_str(self):
  31. self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))
  32. def assertRegisterUnregisterCls(self, r, task):
  33. with self.assertRaises(r.NotRegistered):
  34. r.unregister(task)
  35. r.register(task)
  36. self.assertIn(task.name, r)
  37. def assertRegisterUnregisterFunc(self, r, task, task_name):
  38. with self.assertRaises(r.NotRegistered):
  39. r.unregister(task_name)
  40. r.register(task, task_name)
  41. self.assertIn(task_name, r)
  42. def test_task_registry(self):
  43. r = TaskRegistry()
  44. self.assertIsInstance(r, dict, 'TaskRegistry is mapping')
  45. self.assertRegisterUnregisterCls(r, MockTask)
  46. self.assertRegisterUnregisterCls(r, MockPeriodicTask)
  47. r.register(MockPeriodicTask)
  48. r.unregister(MockPeriodicTask.name)
  49. self.assertNotIn(MockPeriodicTask, r)
  50. r.register(MockPeriodicTask)
  51. tasks = dict(r)
  52. self.assertIsInstance(tasks.get(MockTask.name), MockTask)
  53. self.assertIsInstance(tasks.get(MockPeriodicTask.name),
  54. MockPeriodicTask)
  55. self.assertIsInstance(r[MockTask.name], MockTask)
  56. self.assertIsInstance(r[MockPeriodicTask.name],
  57. MockPeriodicTask)
  58. r.unregister(MockTask)
  59. self.assertNotIn(MockTask.name, r)
  60. r.unregister(MockPeriodicTask)
  61. self.assertNotIn(MockPeriodicTask.name, r)
  62. self.assertTrue(MockTask().run())
  63. self.assertTrue(MockPeriodicTask().run())
  64. def test_compat(self):
  65. r = TaskRegistry()
  66. r.regular()
  67. r.periodic()