test_canvas.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from celery import chain, group, uuid
  4. from .tasks import add, collect_ids, ids
  5. TIMEOUT = 30
  6. class test_chain:
  7. def test_simple_chain(self, manager):
  8. c = add.s(4, 4) | add.s(8) | add.s(16)
  9. assert c().get(timeout=TIMEOUT) == 32
  10. def test_complex_chain(self, manager):
  11. c = (
  12. add.s(2, 2) | (
  13. add.s(4) | add.s(8) | add.s(16)
  14. ) |
  15. group(add.s(i) for i in range(4))
  16. )
  17. res = c()
  18. assert res.get(timeout=TIMEOUT) == [32, 33, 34, 35]
  19. def test_parent_ids(self, manager, num=10):
  20. assert manager.inspect().ping()
  21. c = chain(ids.si(i) for i in range(num))
  22. c.freeze()
  23. res = c()
  24. res.get(timeout=TIMEOUT)
  25. self.assert_ids(res, num - 1)
  26. def assert_ids(self, res, size):
  27. i, root = size, res
  28. while root.parent:
  29. root = root.parent
  30. node = res
  31. while node:
  32. root_id, parent_id, value = node.get(timeout=30)
  33. assert value == i
  34. assert root_id == root.id
  35. if node.parent:
  36. assert parent_id == node.parent.id
  37. node = node.parent
  38. i -= 1
  39. class test_group:
  40. def test_parent_ids(self, manager):
  41. assert manager.inspect().ping()
  42. g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))
  43. res = g()
  44. expected_root_id = res.parent.parent.id
  45. expected_parent_id = res.parent.id
  46. values = res.get(timeout=TIMEOUT)
  47. for i, r in enumerate(values):
  48. root_id, parent_id, value = r
  49. assert root_id == expected_root_id
  50. assert parent_id == expected_parent_id
  51. assert value == i + 2
  52. class xxx_chord:
  53. @pytest.mark.celery(result_backend='redis://')
  54. def test_parent_ids(self, manager):
  55. self.assert_parentids_chord()
  56. self.assert_parentids_chord(uuid(), uuid())
  57. def assert_parentids_chord(self, base_root=None, base_parent=None):
  58. g = (
  59. ids.si(1) |
  60. ids.si(2) |
  61. group(ids.si(i) for i in range(3, 50)) |
  62. collect_ids.s(i=50) |
  63. ids.si(51)
  64. )
  65. g.freeze(root_id=base_root, parent_id=base_parent)
  66. res = g.apply_async(root_id=base_root, parent_id=base_parent)
  67. expected_root_id = base_root or res.parent.parent.parent.id
  68. root_id, parent_id, value = res.get(timeout=30)
  69. assert value == 51
  70. assert root_id == expected_root_id
  71. assert parent_id == res.parent.id
  72. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  73. assert value == 50
  74. assert root_id == expected_root_id
  75. assert parent_id == res.parent.parent.id
  76. for i, p in enumerate(prev):
  77. root_id, parent_id, value = p
  78. assert root_id == expected_root_id
  79. assert parent_id == res.parent.parent.id
  80. root_id, parent_id, value = res.parent.parent.get(timeout=30)
  81. assert value == 2
  82. assert parent_id == res.parent.parent.parent.id
  83. assert root_id == expected_root_id
  84. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  85. assert value == 1
  86. assert root_id == expected_root_id
  87. assert parent_id == base_parent