test_sets.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. from __future__ import absolute_import
  2. import anyjson
  3. from mock import Mock, patch
  4. from celery.task import Task
  5. from celery.task.sets import subtask, TaskSet
  6. from celery.canvas import Signature
  7. from celery.tests.case import AppCase
  8. class MockTask(Task):
  9. name = 'tasks.add'
  10. def run(self, x, y, **kwargs):
  11. return x + y
  12. @classmethod
  13. def apply_async(cls, args, kwargs, **options):
  14. return (args, kwargs, options)
  15. @classmethod
  16. def apply(cls, args, kwargs, **options):
  17. return (args, kwargs, options)
  18. class test_subtask(AppCase):
  19. def test_behaves_like_type(self):
  20. s = subtask('tasks.add', (2, 2), {'cache': True},
  21. {'routing_key': 'CPU-bound'})
  22. self.assertDictEqual(subtask(s), s)
  23. def test_task_argument_can_be_task_cls(self):
  24. s = subtask(MockTask, (2, 2))
  25. self.assertEqual(s.task, MockTask.name)
  26. def test_apply_async(self):
  27. s = MockTask.subtask(
  28. (2, 2), {'cache': True}, {'routing_key': 'CPU-bound'},
  29. )
  30. args, kwargs, options = s.apply_async()
  31. self.assertTupleEqual(args, (2, 2))
  32. self.assertDictEqual(kwargs, {'cache': True})
  33. self.assertDictEqual(options, {'routing_key': 'CPU-bound'})
  34. def test_delay_argmerge(self):
  35. s = MockTask.subtask(
  36. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  37. )
  38. args, kwargs, options = s.delay(10, cache=False, other='foo')
  39. self.assertTupleEqual(args, (10, 2))
  40. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  41. self.assertDictEqual(options, {'routing_key': 'CPU-bound'})
  42. def test_apply_async_argmerge(self):
  43. s = MockTask.subtask(
  44. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  45. )
  46. args, kwargs, options = s.apply_async((10, ),
  47. {'cache': False, 'other': 'foo'},
  48. routing_key='IO-bound',
  49. exchange='fast')
  50. self.assertTupleEqual(args, (10, 2))
  51. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  52. self.assertDictEqual(options, {'routing_key': 'IO-bound',
  53. 'exchange': 'fast'})
  54. def test_apply_argmerge(self):
  55. s = MockTask.subtask(
  56. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  57. )
  58. args, kwargs, options = s.apply((10, ),
  59. {'cache': False, 'other': 'foo'},
  60. routing_key='IO-bound',
  61. exchange='fast')
  62. self.assertTupleEqual(args, (10, 2))
  63. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  64. self.assertDictEqual(
  65. options, {'routing_key': 'IO-bound', 'exchange': 'fast'},
  66. )
  67. def test_is_JSON_serializable(self):
  68. s = MockTask.subtask(
  69. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  70. )
  71. s.args = list(s.args) # tuples are not preserved
  72. # but this doesn't matter.
  73. self.assertEqual(s, subtask(anyjson.loads(anyjson.dumps(s))))
  74. def test_repr(self):
  75. s = MockTask.subtask((2, ), {'cache': True})
  76. self.assertIn('2', repr(s))
  77. self.assertIn('cache=True', repr(s))
  78. def test_reduce(self):
  79. s = MockTask.subtask((2, ), {'cache': True})
  80. cls, args = s.__reduce__()
  81. self.assertDictEqual(dict(cls(*args)), dict(s))
  82. class test_TaskSet(AppCase):
  83. def test_task_arg_can_be_iterable__compat(self):
  84. ts = TaskSet([MockTask.subtask((i, i))
  85. for i in (2, 4, 8)], app=self.app)
  86. self.assertEqual(len(ts), 3)
  87. def test_respects_ALWAYS_EAGER(self):
  88. app = self.app
  89. class MockTaskSet(TaskSet):
  90. applied = 0
  91. def apply(self, *args, **kwargs):
  92. self.applied += 1
  93. ts = MockTaskSet(
  94. [MockTask.subtask((i, i)) for i in (2, 4, 8)],
  95. app=self.app,
  96. )
  97. app.conf.CELERY_ALWAYS_EAGER = True
  98. try:
  99. ts.apply_async()
  100. finally:
  101. app.conf.CELERY_ALWAYS_EAGER = False
  102. self.assertEqual(ts.applied, 1)
  103. with patch('celery.task.sets.get_current_worker_task') as gwt:
  104. parent = gwt.return_value = Mock()
  105. parent.request.children = []
  106. ts.apply_async()
  107. self.assertTrue(parent.request.children)
  108. def test_apply_async(self):
  109. applied = [0]
  110. class mocksubtask(Signature):
  111. def apply_async(self, *args, **kwargs):
  112. applied[0] += 1
  113. ts = TaskSet([mocksubtask(MockTask, (i, i))
  114. for i in (2, 4, 8)], app=self.app)
  115. ts.apply_async()
  116. self.assertEqual(applied[0], 3)
  117. class Publisher(object):
  118. def send(self, *args, **kwargs):
  119. pass
  120. ts.apply_async(publisher=Publisher())
  121. # setting current_task
  122. @self.app.task
  123. def xyz():
  124. pass
  125. from celery._state import _task_stack
  126. xyz.push_request()
  127. _task_stack.push(xyz)
  128. try:
  129. ts.apply_async(publisher=Publisher())
  130. finally:
  131. _task_stack.pop()
  132. xyz.pop_request()
  133. def test_apply(self):
  134. applied = [0]
  135. class mocksubtask(Signature):
  136. def apply(self, *args, **kwargs):
  137. applied[0] += 1
  138. ts = TaskSet([mocksubtask(MockTask, (i, i))
  139. for i in (2, 4, 8)], app=self.app)
  140. ts.apply()
  141. self.assertEqual(applied[0], 3)
  142. def test_set_app(self):
  143. ts = TaskSet([], app=self.app)
  144. ts.app = 42
  145. self.assertEqual(ts.app, 42)
  146. def test_set_tasks(self):
  147. ts = TaskSet([], app=self.app)
  148. ts.tasks = [1, 2, 3]
  149. self.assertEqual(ts, [1, 2, 3])
  150. def test_set_Publisher(self):
  151. ts = TaskSet([], app=self.app)
  152. ts.Publisher = 42
  153. self.assertEqual(ts.Publisher, 42)