test_chord.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from __future__ import absolute_import
  2. from mock import patch
  3. from contextlib import contextmanager
  4. from celery import canvas
  5. from celery import current_app
  6. from celery import result
  7. from celery.result import AsyncResult, GroupResult
  8. from celery.task import task, TaskSet
  9. from celery.tests.utils import AppCase, Mock
  10. passthru = lambda x: x
  11. @current_app.task
  12. def add(x, y):
  13. return x + y
  14. @current_app.task
  15. def callback(r):
  16. return r
  17. class TSR(GroupResult):
  18. is_ready = True
  19. value = None
  20. def ready(self):
  21. return self.is_ready
  22. def join(self, **kwargs):
  23. return self.value
  24. def join_native(self, **kwargs):
  25. return self.value
  26. @contextmanager
  27. def patch_unlock_retry():
  28. unlock = current_app.tasks['celery.chord_unlock']
  29. retry = Mock()
  30. prev, unlock.retry = unlock.retry, retry
  31. yield unlock, retry
  32. unlock.retry = prev
  33. class test_unlock_chord_task(AppCase):
  34. @patch('celery.result.GroupResult')
  35. def test_unlock_ready(self, GroupResult):
  36. class AlwaysReady(TSR):
  37. is_ready = True
  38. value = [2, 4, 8, 6]
  39. @task()
  40. def callback(*args, **kwargs):
  41. pass
  42. pts, result.GroupResult = result.GroupResult, AlwaysReady
  43. callback.apply_async = Mock()
  44. callback_s = callback.s()
  45. try:
  46. with patch_unlock_retry() as (unlock, retry):
  47. subtask, canvas.maybe_subtask = canvas.maybe_subtask, passthru
  48. try:
  49. unlock('group_id', callback_s,
  50. result=map(AsyncResult, [1, 2, 3]))
  51. finally:
  52. canvas.maybe_subtask = subtask
  53. callback.apply_async.assert_called_with(([2, 4, 8, 6], ), {})
  54. # did not retry
  55. self.assertFalse(retry.call_count)
  56. finally:
  57. result.GroupResult = pts
  58. @patch('celery.result.GroupResult')
  59. def test_when_not_ready(self, GroupResult):
  60. with patch_unlock_retry() as (unlock, retry):
  61. class NeverReady(TSR):
  62. is_ready = False
  63. pts, result.GroupResult = result.GroupResult, NeverReady
  64. try:
  65. callback = Mock()
  66. unlock('group_id', callback, interval=10, max_retries=30,
  67. result=map(AsyncResult, [1, 2, 3]))
  68. self.assertFalse(callback.delay.call_count)
  69. # did retry
  70. unlock.retry.assert_called_with(countdown=10, max_retries=30)
  71. finally:
  72. result.GroupResult = pts
  73. def test_is_in_registry(self):
  74. self.assertIn('celery.chord_unlock', current_app.tasks)
  75. class test_chord(AppCase):
  76. def test_eager(self):
  77. from celery import chord
  78. @task()
  79. def addX(x, y):
  80. return x + y
  81. @task()
  82. def sumX(n):
  83. return sum(n)
  84. self.app.conf.CELERY_ALWAYS_EAGER = True
  85. try:
  86. x = chord(addX.s(i, i) for i in xrange(10))
  87. body = sumX.s()
  88. result = x(body)
  89. self.assertEqual(result.get(), sum(i + i for i in xrange(10)))
  90. finally:
  91. self.app.conf.CELERY_ALWAYS_EAGER = False
  92. def test_apply(self):
  93. self.app.conf.CELERY_ALWAYS_EAGER = False
  94. from celery import chord
  95. m = Mock()
  96. m.app.conf.CELERY_ALWAYS_EAGER = False
  97. m.AsyncResult = AsyncResult
  98. prev, chord.Chord = chord.Chord, m
  99. try:
  100. x = chord(add.s(i, i) for i in xrange(10))
  101. body = add.s(2)
  102. result = x(body)
  103. # does not modify original subtask
  104. with self.assertRaises(KeyError):
  105. body.options['task_id']
  106. self.assertTrue(chord.Chord.called)
  107. finally:
  108. chord.Chord = prev
  109. class test_Chord_task(AppCase):
  110. def test_run(self):
  111. prev, current_app.backend = current_app.backend, Mock()
  112. current_app.backend.cleanup = Mock()
  113. current_app.backend.cleanup.__name__ = 'cleanup'
  114. try:
  115. Chord = current_app.tasks['celery.chord']
  116. body = dict()
  117. Chord(TaskSet(add.subtask((i, i)) for i in xrange(5)), body)
  118. Chord([add.subtask((i, i)) for i in xrange(5)], body)
  119. self.assertEqual(current_app.backend.on_chord_apply.call_count, 2)
  120. finally:
  121. current_app.backend = prev