test_canvas.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from celery import chain, group, uuid
  4. from .tasks import add, collect_ids, ids
  5. TIMEOUT = 120
  6. class test_chain:
  7. def test_simple_chain(self, manager):
  8. c = add.s(4, 4) | add.s(8) | add.s(16)
  9. assert c().get(timeout=TIMEOUT) == 32
  10. def test_complex_chain(self, manager):
  11. c = (
  12. add.s(2, 2) | (
  13. add.s(4) | add.s(8) | add.s(16)
  14. ) |
  15. group(add.s(i) for i in range(4))
  16. )
  17. res = c()
  18. assert res.get(timeout=TIMEOUT) == [32, 33, 34, 35]
  19. def test_parent_ids(self, manager, num=10):
  20. assert manager.inspect().ping()
  21. c = chain(ids.si(i) for i in range(num))
  22. c.freeze()
  23. res = c()
  24. try:
  25. res.get(timeout=TIMEOUT)
  26. except TimeoutError:
  27. print(manager.inspect.active())
  28. print(manager.inspect.reserved())
  29. print(manager.inspect.stats())
  30. raise
  31. self.assert_ids(res, num - 1)
  32. def assert_ids(self, res, size):
  33. i, root = size, res
  34. while root.parent:
  35. root = root.parent
  36. node = res
  37. while node:
  38. root_id, parent_id, value = node.get(timeout=30)
  39. assert value == i
  40. assert root_id == root.id
  41. if node.parent:
  42. assert parent_id == node.parent.id
  43. node = node.parent
  44. i -= 1
  45. class test_group:
  46. def test_parent_ids(self, manager):
  47. assert manager.inspect().ping()
  48. g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))
  49. res = g()
  50. expected_root_id = res.parent.parent.id
  51. expected_parent_id = res.parent.id
  52. values = res.get(timeout=TIMEOUT)
  53. for i, r in enumerate(values):
  54. root_id, parent_id, value = r
  55. assert root_id == expected_root_id
  56. assert parent_id == expected_parent_id
  57. assert value == i + 2
  58. @pytest.mark.celery(result_backend='redis://')
  59. class xxx_chord:
  60. def test_parent_ids(self, manager):
  61. self.assert_parentids_chord()
  62. def test_parent_ids__already_set(self, manager):
  63. self.assert_parentids_chord(uuid(), uuid())
  64. def assert_parentids_chord(self, base_root=None, base_parent=None):
  65. g = (
  66. ids.si(1) |
  67. ids.si(2) |
  68. group(ids.si(i) for i in range(3, 50)) |
  69. collect_ids.s(i=50) |
  70. ids.si(51)
  71. )
  72. g.freeze(root_id=base_root, parent_id=base_parent)
  73. res = g.apply_async(root_id=base_root, parent_id=base_parent)
  74. expected_root_id = base_root or res.parent.parent.parent.id
  75. root_id, parent_id, value = res.get(timeout=30)
  76. assert value == 51
  77. assert root_id == expected_root_id
  78. assert parent_id == res.parent.id
  79. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  80. assert value == 50
  81. assert root_id == expected_root_id
  82. assert parent_id == res.parent.parent.id
  83. for i, p in enumerate(prev):
  84. root_id, parent_id, value = p
  85. assert root_id == expected_root_id
  86. assert parent_id == res.parent.parent.id
  87. root_id, parent_id, value = res.parent.parent.get(timeout=30)
  88. assert value == 2
  89. assert parent_id == res.parent.parent.parent.id
  90. assert root_id == expected_root_id
  91. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  92. assert value == 1
  93. assert root_id == expected_root_id
  94. assert parent_id == base_parent