test_chord.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. from __future__ import absolute_import
  2. from mock import patch
  3. from contextlib import contextmanager
  4. from celery import canvas
  5. from celery import current_app
  6. from celery import result
  7. from celery.five import range
  8. from celery.result import AsyncResult, GroupResult
  9. from celery.task import task, TaskSet
  10. from celery.tests.utils import AppCase, Mock
  11. passthru = lambda x: x
  12. @current_app.task
  13. def add(x, y):
  14. return x + y
  15. @current_app.task
  16. def callback(r):
  17. return r
  18. class TSR(GroupResult):
  19. is_ready = True
  20. value = None
  21. def ready(self):
  22. return self.is_ready
  23. def join(self, **kwargs):
  24. return self.value
  25. def join_native(self, **kwargs):
  26. return self.value
  27. @contextmanager
  28. def patch_unlock_retry():
  29. unlock = current_app.tasks['celery.chord_unlock']
  30. retry = Mock()
  31. prev, unlock.retry = unlock.retry, retry
  32. try:
  33. yield unlock, retry
  34. finally:
  35. unlock.retry = prev
  36. class test_unlock_chord_task(AppCase):
  37. @patch('celery.result.GroupResult')
  38. def test_unlock_ready(self, GroupResult):
  39. class AlwaysReady(TSR):
  40. is_ready = True
  41. value = [2, 4, 8, 6]
  42. @task()
  43. def callback(*args, **kwargs):
  44. pass
  45. pts, result.GroupResult = result.GroupResult, AlwaysReady
  46. callback.apply_async = Mock()
  47. callback_s = callback.s()
  48. try:
  49. with patch_unlock_retry() as (unlock, retry):
  50. subtask, canvas.maybe_subtask = canvas.maybe_subtask, passthru
  51. try:
  52. unlock('group_id', callback_s,
  53. result=[AsyncResult(r) for r in ['1', 2, 3]],
  54. GroupResult=AlwaysReady)
  55. finally:
  56. canvas.maybe_subtask = subtask
  57. callback.apply_async.assert_called_with(([2, 4, 8, 6], ), {})
  58. # did not retry
  59. self.assertFalse(retry.call_count)
  60. finally:
  61. result.GroupResult = pts
  62. @patch('celery.result.GroupResult')
  63. def test_when_not_ready(self, GroupResult):
  64. with patch_unlock_retry() as (unlock, retry):
  65. class NeverReady(TSR):
  66. is_ready = False
  67. pts, result.GroupResult = result.GroupResult, NeverReady
  68. try:
  69. callback = Mock()
  70. unlock('group_id', callback, interval=10, max_retries=30,
  71. result=[AsyncResult(x) for x in ['1', '2', '3']],
  72. GroupResult=NeverReady)
  73. self.assertFalse(callback.delay.call_count)
  74. # did retry
  75. unlock.retry.assert_called_with(countdown=10, max_retries=30)
  76. finally:
  77. result.GroupResult = pts
  78. def test_is_in_registry(self):
  79. self.assertIn('celery.chord_unlock', current_app.tasks)
  80. class test_chord(AppCase):
  81. def test_eager(self):
  82. from celery import chord
  83. @task()
  84. def addX(x, y):
  85. return x + y
  86. @task()
  87. def sumX(n):
  88. return sum(n)
  89. self.app.conf.CELERY_ALWAYS_EAGER = True
  90. try:
  91. x = chord(addX.s(i, i) for i in range(10))
  92. body = sumX.s()
  93. result = x(body)
  94. self.assertEqual(result.get(), sum(i + i for i in range(10)))
  95. finally:
  96. self.app.conf.CELERY_ALWAYS_EAGER = False
  97. def test_apply(self):
  98. self.app.conf.CELERY_ALWAYS_EAGER = False
  99. from celery import chord
  100. m = Mock()
  101. m.app.conf.CELERY_ALWAYS_EAGER = False
  102. m.AsyncResult = AsyncResult
  103. prev, chord._type = chord._type, m
  104. try:
  105. x = chord(add.s(i, i) for i in range(10))
  106. body = add.s(2)
  107. result = x(body)
  108. self.assertTrue(result.id)
  109. # does not modify original subtask
  110. with self.assertRaises(KeyError):
  111. body.options['task_id']
  112. self.assertTrue(chord._type.called)
  113. finally:
  114. chord._type = prev
  115. class test_Chord_task(AppCase):
  116. def test_run(self):
  117. prev, current_app.backend = current_app.backend, Mock()
  118. current_app.backend.cleanup = Mock()
  119. current_app.backend.cleanup.__name__ = 'cleanup'
  120. try:
  121. Chord = current_app.tasks['celery.chord']
  122. body = dict()
  123. Chord(TaskSet(add.subtask((i, i)) for i in range(5)), body)
  124. Chord([add.subtask((j, j)) for j in range(5)], body)
  125. self.assertEqual(current_app.backend.on_chord_apply.call_count, 2)
  126. finally:
  127. current_app.backend = prev