test_sets.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from __future__ import absolute_import
  2. import anyjson
  3. from celery import current_app
  4. from celery.task import Task
  5. from celery.task.sets import subtask, TaskSet
  6. from celery.canvas import Signature
  7. from celery.tests.utils import Case
  8. class MockTask(Task):
  9. name = 'tasks.add'
  10. def run(self, x, y, **kwargs):
  11. return x + y
  12. @classmethod
  13. def apply_async(cls, args, kwargs, **options):
  14. return (args, kwargs, options)
  15. @classmethod
  16. def apply(cls, args, kwargs, **options):
  17. return (args, kwargs, options)
  18. class test_subtask(Case):
  19. def test_behaves_like_type(self):
  20. s = subtask('tasks.add', (2, 2), {'cache': True},
  21. {'routing_key': 'CPU-bound'})
  22. self.assertDictEqual(subtask(s), s)
  23. def test_task_argument_can_be_task_cls(self):
  24. s = subtask(MockTask, (2, 2))
  25. self.assertEqual(s.task, MockTask.name)
  26. def test_apply_async(self):
  27. s = MockTask.subtask((2, 2), {'cache': True},
  28. {'routing_key': 'CPU-bound'})
  29. args, kwargs, options = s.apply_async()
  30. self.assertTupleEqual(args, (2, 2))
  31. self.assertDictEqual(kwargs, {'cache': True})
  32. self.assertDictEqual(options, {'routing_key': 'CPU-bound'})
  33. def test_delay_argmerge(self):
  34. s = MockTask.subtask((2, ), {'cache': True},
  35. {'routing_key': 'CPU-bound'})
  36. args, kwargs, options = s.delay(10, cache=False, other='foo')
  37. self.assertTupleEqual(args, (10, 2))
  38. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  39. self.assertDictEqual(options, {'routing_key': 'CPU-bound'})
  40. def test_apply_async_argmerge(self):
  41. s = MockTask.subtask((2, ), {'cache': True},
  42. {'routing_key': 'CPU-bound'})
  43. args, kwargs, options = s.apply_async((10, ),
  44. {'cache': False, 'other': 'foo'},
  45. routing_key='IO-bound',
  46. exchange='fast')
  47. self.assertTupleEqual(args, (10, 2))
  48. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  49. self.assertDictEqual(options, {'routing_key': 'IO-bound',
  50. 'exchange': 'fast'})
  51. def test_apply_argmerge(self):
  52. s = MockTask.subtask((2, ), {'cache': True},
  53. {'routing_key': 'CPU-bound'})
  54. args, kwargs, options = s.apply((10, ),
  55. {'cache': False, 'other': 'foo'},
  56. routing_key='IO-bound',
  57. exchange='fast')
  58. self.assertTupleEqual(args, (10, 2))
  59. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  60. self.assertDictEqual(options, {'routing_key': 'IO-bound',
  61. 'exchange': 'fast'})
  62. def test_is_JSON_serializable(self):
  63. s = MockTask.subtask((2, ), {'cache': True},
  64. {'routing_key': 'CPU-bound'})
  65. s.args = list(s.args) # tuples are not preserved
  66. # but this doesn't matter.
  67. self.assertEqual(s, subtask(anyjson.loads(anyjson.dumps(s))))
  68. def test_repr(self):
  69. s = MockTask.subtask((2, ), {'cache': True})
  70. self.assertIn('2', repr(s))
  71. self.assertIn('cache=True', repr(s))
  72. def test_reduce(self):
  73. s = MockTask.subtask((2, ), {'cache': True})
  74. cls, args = s.__reduce__()
  75. self.assertDictEqual(dict(cls(*args)), dict(s))
  76. class test_TaskSet(Case):
  77. def test_task_arg_can_be_iterable__compat(self):
  78. ts = TaskSet([MockTask.subtask((i, i))
  79. for i in (2, 4, 8)])
  80. self.assertEqual(len(ts), 3)
  81. def test_respects_ALWAYS_EAGER(self):
  82. app = current_app
  83. class MockTaskSet(TaskSet):
  84. applied = 0
  85. def apply(self, *args, **kwargs):
  86. self.applied += 1
  87. ts = MockTaskSet([MockTask.subtask((i, i))
  88. for i in (2, 4, 8)])
  89. app.conf.CELERY_ALWAYS_EAGER = True
  90. try:
  91. ts.apply_async()
  92. finally:
  93. app.conf.CELERY_ALWAYS_EAGER = False
  94. self.assertEqual(ts.applied, 1)
  95. def test_apply_async(self):
  96. applied = [0]
  97. class mocksubtask(Signature):
  98. def apply_async(self, *args, **kwargs):
  99. applied[0] += 1
  100. ts = TaskSet([mocksubtask(MockTask, (i, i))
  101. for i in (2, 4, 8)])
  102. ts.apply_async()
  103. self.assertEqual(applied[0], 3)
  104. class Publisher(object):
  105. def send(self, *args, **kwargs):
  106. pass
  107. ts.apply_async(publisher=Publisher())
  108. # setting current_task
  109. @current_app.task
  110. def xyz():
  111. pass
  112. from celery._state import _task_stack
  113. xyz.push_request()
  114. _task_stack.push(xyz)
  115. try:
  116. ts.apply_async(publisher=Publisher())
  117. finally:
  118. _task_stack.pop()
  119. xyz.pop_request()
  120. def test_apply(self):
  121. applied = [0]
  122. class mocksubtask(Signature):
  123. def apply(self, *args, **kwargs):
  124. applied[0] += 1
  125. ts = TaskSet([mocksubtask(MockTask, (i, i))
  126. for i in (2, 4, 8)])
  127. ts.apply()
  128. self.assertEqual(applied[0], 3)
  129. def test_set_app(self):
  130. ts = TaskSet([])
  131. ts.app = 42
  132. self.assertEqual(ts.app, 42)
  133. def test_set_tasks(self):
  134. ts = TaskSet([])
  135. ts.tasks = [1, 2, 3]
  136. self.assertEqual(ts, [1, 2, 3])
  137. def test_set_Publisher(self):
  138. ts = TaskSet([])
  139. ts.Publisher = 42
  140. self.assertEqual(ts.Publisher, 42)