test_builtins.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from __future__ import absolute_import
  2. from mock import Mock
  3. from celery import current_app as app, group, task, chord
  4. from celery.app import builtins
  5. from celery.state import _task_stack
  6. from celery.tests.utils import Case
  7. @task
  8. def add(x, y):
  9. return x + y
  10. @task
  11. def xsum(x):
  12. return sum(x)
  13. class test_backend_cleanup(Case):
  14. def test_run(self):
  15. prev = app.backend
  16. app.backend.cleanup = Mock()
  17. app.backend.cleanup.__name__ = "cleanup"
  18. try:
  19. cleanup_task = builtins.add_backend_cleanup_task(app)
  20. cleanup_task()
  21. self.assertTrue(app.backend.cleanup.called)
  22. finally:
  23. app.backend = prev
  24. class test_group(Case):
  25. def setUp(self):
  26. self.prev = app.tasks.get("celery.group")
  27. self.task = builtins.add_group_task(app)()
  28. def tearDown(self):
  29. app.tasks["celery.group"] = self.prev
  30. def test_apply_async_eager(self):
  31. self.task.apply = Mock()
  32. app.conf.CELERY_ALWAYS_EAGER = True
  33. try:
  34. self.task.apply_async()
  35. finally:
  36. app.conf.CELERY_ALWAYS_EAGER = False
  37. self.assertTrue(self.task.apply.called)
  38. def test_apply(self):
  39. x = group([add.s(4, 4), add.s(8, 8)])
  40. x.name = self.task.name
  41. res = x.apply()
  42. self.assertEqual(res.get().join(), [8, 16])
  43. def test_apply_async(self):
  44. x = group([add.s(4, 4), add.s(8, 8)])
  45. x.apply_async()
  46. def test_apply_async_with_parent(self):
  47. _task_stack.push(add)
  48. try:
  49. x = group([add.s(4, 4), add.s(8, 8)])
  50. x.apply_async()
  51. self.assertTrue(add.request.children)
  52. finally:
  53. _task_stack.pop()
  54. class test_chain(Case):
  55. def setUp(self):
  56. self.prev = app.tasks.get("celery.chain")
  57. self.task = builtins.add_chain_task(app)()
  58. def tearDown(self):
  59. app.tasks["celery.chain"] = self.prev
  60. def test_apply_async(self):
  61. c = add.s(2, 2) | add.s(4) | add.s(8)
  62. result = c.apply_async()
  63. self.assertTrue(result.parent)
  64. self.assertTrue(result.parent.parent)
  65. self.assertIsNone(result.parent.parent.parent)
  66. class test_chord(Case):
  67. def setUp(self):
  68. self.prev = app.tasks.get("celery.chord")
  69. self.task = builtins.add_chain_task(app)()
  70. def tearDown(self):
  71. app.tasks["celery.chord"] = self.prev
  72. def test_apply_async(self):
  73. x = chord([add.s(i, i) for i in xrange(10)], body=xsum.s())
  74. r = x.apply_async()
  75. self.assertTrue(r)
  76. self.assertTrue(r.parent)
  77. def test_apply_eager(self):
  78. app.conf.CELERY_ALWAYS_EAGER = True
  79. try:
  80. x = chord([add.s(i, i) for i in xrange(10)], body=xsum.s())
  81. r = x.apply_async()
  82. self.assertEqual(r.get(), 90)
  83. finally:
  84. app.conf.CELERY_ALWAYS_EAGER = False