test_chord.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from mock import patch
  4. from contextlib import contextmanager
  5. from celery import canvas
  6. from celery import current_app
  7. from celery import result
  8. from celery.result import AsyncResult, GroupResult
  9. from celery.task import task, TaskSet
  10. from celery.tests.utils import AppCase, Mock
  11. passthru = lambda x: x
  12. @current_app.task
  13. def add(x, y):
  14. return x + y
  15. @current_app.task
  16. def callback(r):
  17. return r
  18. class TSR(GroupResult):
  19. is_ready = True
  20. value = None
  21. def ready(self):
  22. return self.is_ready
  23. def join(self, **kwargs):
  24. return self.value
  25. def join_native(self, **kwargs):
  26. return self.value
  27. @contextmanager
  28. def patch_unlock_retry():
  29. unlock = current_app.tasks['celery.chord_unlock']
  30. retry = Mock()
  31. prev, unlock.retry = unlock.retry, retry
  32. try:
  33. yield unlock, retry
  34. finally:
  35. unlock.retry = prev
  36. class test_unlock_chord_task(AppCase):
  37. @patch('celery.result.GroupResult')
  38. def test_unlock_ready(self, GroupResult):
  39. class AlwaysReady(TSR):
  40. is_ready = True
  41. value = [2, 4, 8, 6]
  42. @task()
  43. def callback(*args, **kwargs):
  44. pass
  45. pts, result.GroupResult = result.GroupResult, AlwaysReady
  46. callback.apply_async = Mock()
  47. callback_s = callback.s()
  48. try:
  49. with patch_unlock_retry() as (unlock, retry):
  50. subtask, canvas.maybe_subtask = canvas.maybe_subtask, passthru
  51. try:
  52. unlock('group_id', callback_s,
  53. result=map(AsyncResult, ['1', '2', '3']),
  54. GroupResult=AlwaysReady)
  55. finally:
  56. canvas.maybe_subtask = subtask
  57. callback.apply_async.assert_called_with(([2, 4, 8, 6], ), {})
  58. # did not retry
  59. self.assertFalse(retry.call_count)
  60. finally:
  61. result.GroupResult = pts
  62. @patch('celery.result.GroupResult')
  63. def test_when_not_ready(self, GroupResult):
  64. with patch_unlock_retry() as (unlock, retry):
  65. class NeverReady(TSR):
  66. is_ready = False
  67. pts, result.GroupResult = result.GroupResult, NeverReady
  68. try:
  69. callback = Mock()
  70. unlock('group_id', callback, interval=10, max_retries=30,
  71. result=map(AsyncResult, [1, 2, 3]),
  72. GroupResult=NeverReady)
  73. self.assertFalse(callback.delay.call_count)
  74. # did retry
  75. unlock.retry.assert_called_with(countdown=10, max_retries=30)
  76. finally:
  77. result.GroupResult = pts
  78. def test_is_in_registry(self):
  79. self.assertIn('celery.chord_unlock', current_app.tasks)
  80. class test_chord(AppCase):
  81. def test_eager(self):
  82. from celery import chord
  83. @task()
  84. def addX(x, y):
  85. return x + y
  86. @task()
  87. def sumX(n):
  88. return sum(n)
  89. self.app.conf.CELERY_ALWAYS_EAGER = True
  90. try:
  91. x = chord(addX.s(i, i) for i in xrange(10))
  92. body = sumX.s()
  93. result = x(body)
  94. self.assertEqual(result.get(), sum(i + i for i in xrange(10)))
  95. finally:
  96. self.app.conf.CELERY_ALWAYS_EAGER = False
  97. def test_apply(self):
  98. self.app.conf.CELERY_ALWAYS_EAGER = False
  99. from celery import chord
  100. m = Mock()
  101. m.app.conf.CELERY_ALWAYS_EAGER = False
  102. m.AsyncResult = AsyncResult
  103. prev, chord._type = chord._type, m
  104. try:
  105. x = chord(add.s(i, i) for i in xrange(10))
  106. body = add.s(2)
  107. result = x(body)
  108. self.assertTrue(result.id)
  109. # does not modify original subtask
  110. with self.assertRaises(KeyError):
  111. body.options['task_id']
  112. self.assertTrue(chord._type.called)
  113. finally:
  114. chord._type = prev
  115. class test_Chord_task(AppCase):
  116. def test_run(self):
  117. prev, current_app.backend = current_app.backend, Mock()
  118. current_app.backend.cleanup = Mock()
  119. current_app.backend.cleanup.__name__ = 'cleanup'
  120. try:
  121. Chord = current_app.tasks['celery.chord']
  122. body = dict()
  123. Chord(TaskSet(add.subtask((i, i)) for i in xrange(5)), body)
  124. Chord([add.subtask((j, j)) for j in xrange(5)], body)
  125. self.assertEqual(current_app.backend.on_chord_apply.call_count, 2)
  126. finally:
  127. current_app.backend = prev