test_canvas.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from mock import Mock
  4. from celery import task
  5. from celery.canvas import Signature, chain, group, chord, subtask
  6. from celery.tests.utils import Case
  7. SIG = Signature({"task": "TASK",
  8. "args": ("A1", ),
  9. "kwargs": {"K1": "V1"},
  10. "options": {"task_id": "TASK_ID"},
  11. "subtask_type": ""})
  12. @task
  13. def add(x, y):
  14. return x + y
  15. @task
  16. def mul(x, y):
  17. return x * y
  18. @task
  19. def div(x, y):
  20. return x / y
  21. class test_Signature(Case):
  22. def test_getitem_property_class(self):
  23. self.assertTrue(Signature.task)
  24. self.assertTrue(Signature.args)
  25. self.assertTrue(Signature.kwargs)
  26. self.assertTrue(Signature.options)
  27. self.assertTrue(Signature.subtask_type)
  28. def test_getitem_property(self):
  29. self.assertEqual(SIG.task, "TASK")
  30. self.assertEqual(SIG.args, ("A1", ))
  31. self.assertEqual(SIG.kwargs, {"K1": "V1"})
  32. self.assertEqual(SIG.options, {"task_id": "TASK_ID"})
  33. self.assertEqual(SIG.subtask_type, "")
  34. def test_replace(self):
  35. x = Signature("TASK", ("A"), {})
  36. self.assertTupleEqual(x.replace(args=("B", )).args, ("B", ))
  37. self.assertDictEqual(x.replace(kwargs={"FOO": "BAR"}).kwargs,
  38. {"FOO": "BAR"})
  39. self.assertDictEqual(x.replace(options={"task_id": "123"}).options,
  40. {"task_id": "123"})
  41. def test_set(self):
  42. self.assertDictEqual(Signature("TASK", x=1).set(task_id="2").options,
  43. {"x": 1, "task_id": "2"})
  44. def test_link(self):
  45. x = subtask(SIG)
  46. x.link(SIG)
  47. x.link(SIG)
  48. self.assertIn(SIG, x.options["link"])
  49. self.assertEqual(len(x.options["link"]), 1)
  50. def test_link_error(self):
  51. x = subtask(SIG)
  52. x.link_error(SIG)
  53. x.link_error(SIG)
  54. self.assertIn(SIG, x.options["link_error"])
  55. self.assertEqual(len(x.options["link_error"]), 1)
  56. def test_flatten_links(self):
  57. tasks = [add.s(2, 2), mul.s(4), div.s(2)]
  58. tasks[0].link(tasks[1])
  59. tasks[1].link(tasks[2])
  60. self.assertEqual(tasks[0].flatten_links(), tasks)
  61. def test_OR(self):
  62. x = add.s(2, 2) | mul.s(4)
  63. self.assertIsInstance(x, chain)
  64. y = add.s(4, 4) | div.s(2)
  65. z = x | y
  66. self.assertIsInstance(y, chain)
  67. self.assertIsInstance(z, chain)
  68. self.assertEqual(len(z.tasks), 4)
  69. with self.assertRaises(TypeError):
  70. x | 10
  71. def test_INVERT(self):
  72. x = add.s(2, 2)
  73. x.apply_async = Mock()
  74. x.apply_async.return_value = Mock()
  75. x.apply_async.return_value.get = Mock()
  76. x.apply_async.return_value.get.return_value = 4
  77. self.assertEqual(~x, 4)
  78. self.assertTrue(x.apply_async.called)
  79. class test_chain(Case):
  80. def test_repr(self):
  81. x = add.s(2, 2) | add.s(2)
  82. self.assertEqual(repr(x), '%s(2, 2) | %s(2)' % (add.name, add.name))
  83. def test_reverse(self):
  84. x = add.s(2, 2) | add.s(2)
  85. self.assertIsInstance(subtask(x), chain)
  86. self.assertIsInstance(subtask(dict(x)), chain)
  87. class test_group(Case):
  88. def test_repr(self):
  89. x = group([add.s(2, 2), add.s(4, 4)])
  90. self.assertEqual(repr(x), repr(x.tasks))
  91. def test_reverse(self):
  92. x = group([add.s(2, 2), add.s(4, 4)])
  93. self.assertIsInstance(subtask(x), group)
  94. self.assertIsInstance(subtask(dict(x)), group)
  95. class test_chord(Case):
  96. def test_reverse(self):
  97. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  98. self.assertIsInstance(subtask(x), chord)
  99. self.assertIsInstance(subtask(dict(x)), chord)
  100. def test_clone_clones_body(self):
  101. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  102. y = x.clone()
  103. self.assertIsNot(x.kwargs["body"], y.kwargs["body"])
  104. y.kwargs.pop("body")
  105. z = y.clone()
  106. self.assertIsNone(z.kwargs.get("body"))
  107. def test_links_to_body(self):
  108. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  109. x.link(div.s(2))
  110. self.assertFalse(x.options.get("link"))
  111. self.assertTrue(x.kwargs["body"].options["link"])
  112. x.link_error(div.s(2))
  113. self.assertFalse(x.options.get("link_error"))
  114. self.assertTrue(x.kwargs["body"].options["link_error"])
  115. self.assertTrue(x.tasks)
  116. self.assertTrue(x.body)
  117. def test_repr(self):
  118. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  119. self.assertTrue(repr(x))
  120. x.kwargs["body"] = None
  121. self.assertIn("without body", repr(x))