test_canvas.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from mock import Mock
  4. from celery import current_app, task
  5. from celery.canvas import Signature, chain, group, chord, subtask
  6. from celery.result import EagerResult
  7. from celery.tests.utils import Case
  8. SIG = Signature({'task': 'TASK',
  9. 'args': ('A1', ),
  10. 'kwargs': {'K1': 'V1'},
  11. 'options': {'task_id': 'TASK_ID'},
  12. 'subtask_type': ''})
  13. @task()
  14. def add(x, y):
  15. return x + y
  16. @task()
  17. def mul(x, y):
  18. return x * y
  19. @task()
  20. def div(x, y):
  21. return x / y
  22. class test_Signature(Case):
  23. def test_getitem_property_class(self):
  24. self.assertTrue(Signature.task)
  25. self.assertTrue(Signature.args)
  26. self.assertTrue(Signature.kwargs)
  27. self.assertTrue(Signature.options)
  28. self.assertTrue(Signature.subtask_type)
  29. def test_getitem_property(self):
  30. self.assertEqual(SIG.task, 'TASK')
  31. self.assertEqual(SIG.args, ('A1', ))
  32. self.assertEqual(SIG.kwargs, {'K1': 'V1'})
  33. self.assertEqual(SIG.options, {'task_id': 'TASK_ID'})
  34. self.assertEqual(SIG.subtask_type, '')
  35. def test_replace(self):
  36. x = Signature('TASK', ('A'), {})
  37. self.assertTupleEqual(x.replace(args=('B', )).args, ('B', ))
  38. self.assertDictEqual(
  39. x.replace(kwargs={'FOO': 'BAR'}).kwargs,
  40. {'FOO': 'BAR'},
  41. )
  42. self.assertDictEqual(
  43. x.replace(options={'task_id': '123'}).options,
  44. {'task_id': '123'},
  45. )
  46. def test_set(self):
  47. self.assertDictEqual(
  48. Signature('TASK', x=1).set(task_id='2').options,
  49. {'x': 1, 'task_id': '2'},
  50. )
  51. def test_link(self):
  52. x = subtask(SIG)
  53. x.link(SIG)
  54. x.link(SIG)
  55. self.assertIn(SIG, x.options['link'])
  56. self.assertEqual(len(x.options['link']), 1)
  57. def test_link_error(self):
  58. x = subtask(SIG)
  59. x.link_error(SIG)
  60. x.link_error(SIG)
  61. self.assertIn(SIG, x.options['link_error'])
  62. self.assertEqual(len(x.options['link_error']), 1)
  63. def test_flatten_links(self):
  64. tasks = [add.s(2, 2), mul.s(4), div.s(2)]
  65. tasks[0].link(tasks[1])
  66. tasks[1].link(tasks[2])
  67. self.assertEqual(tasks[0].flatten_links(), tasks)
  68. def test_OR(self):
  69. x = add.s(2, 2) | mul.s(4)
  70. self.assertIsInstance(x, chain)
  71. y = add.s(4, 4) | div.s(2)
  72. z = x | y
  73. self.assertIsInstance(y, chain)
  74. self.assertIsInstance(z, chain)
  75. self.assertEqual(len(z.tasks), 4)
  76. with self.assertRaises(TypeError):
  77. x | 10
  78. def test_INVERT(self):
  79. x = add.s(2, 2)
  80. x.apply_async = Mock()
  81. x.apply_async.return_value = Mock()
  82. x.apply_async.return_value.get = Mock()
  83. x.apply_async.return_value.get.return_value = 4
  84. self.assertEqual(~x, 4)
  85. self.assertTrue(x.apply_async.called)
  86. class test_chain(Case):
  87. def test_repr(self):
  88. x = add.s(2, 2) | add.s(2)
  89. self.assertEqual(repr(x), '%s(2, 2) | %s(2)' % (add.name, add.name))
  90. def test_reverse(self):
  91. x = add.s(2, 2) | add.s(2)
  92. self.assertIsInstance(subtask(x), chain)
  93. self.assertIsInstance(subtask(dict(x)), chain)
  94. def test_always_eager(self):
  95. current_app.conf.CELERY_ALWAYS_EAGER = True
  96. try:
  97. self.assertEqual(~(add.s(4, 4) | add.s(8)), 16)
  98. finally:
  99. current_app.conf.CELERY_ALWAYS_EAGER = False
  100. def test_apply(self):
  101. x = chain(add.s(4, 4), add.s(8), add.s(10))
  102. res = x.apply()
  103. self.assertIsInstance(res, EagerResult)
  104. self.assertEqual(res.get(), 26)
  105. self.assertEqual(res.parent.get(), 16)
  106. self.assertEqual(res.parent.parent.get(), 8)
  107. self.assertIsNone(res.parent.parent.parent)
  108. class test_group(Case):
  109. def test_repr(self):
  110. x = group([add.s(2, 2), add.s(4, 4)])
  111. self.assertEqual(repr(x), repr(x.tasks))
  112. def test_reverse(self):
  113. x = group([add.s(2, 2), add.s(4, 4)])
  114. self.assertIsInstance(subtask(x), group)
  115. self.assertIsInstance(subtask(dict(x)), group)
  116. class test_chord(Case):
  117. def test_reverse(self):
  118. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  119. self.assertIsInstance(subtask(x), chord)
  120. self.assertIsInstance(subtask(dict(x)), chord)
  121. def test_clone_clones_body(self):
  122. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  123. y = x.clone()
  124. self.assertIsNot(x.kwargs['body'], y.kwargs['body'])
  125. y.kwargs.pop('body')
  126. z = y.clone()
  127. self.assertIsNone(z.kwargs.get('body'))
  128. def test_links_to_body(self):
  129. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  130. x.link(div.s(2))
  131. self.assertFalse(x.options.get('link'))
  132. self.assertTrue(x.kwargs['body'].options['link'])
  133. x.link_error(div.s(2))
  134. self.assertFalse(x.options.get('link_error'))
  135. self.assertTrue(x.kwargs['body'].options['link_error'])
  136. self.assertTrue(x.tasks)
  137. self.assertTrue(x.body)
  138. def test_repr(self):
  139. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  140. self.assertTrue(repr(x))
  141. x.kwargs['body'] = None
  142. self.assertIn('without body', repr(x))