test_sets.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import anyjson
  4. from celery import current_app
  5. from celery.task import Task
  6. from celery.task.sets import subtask, TaskSet
  7. from celery.canvas import Signature
  8. from celery.tests.utils import Case
  9. class MockTask(Task):
  10. name = 'tasks.add'
  11. def run(self, x, y, **kwargs):
  12. return x + y
  13. @classmethod
  14. def apply_async(cls, args, kwargs, **options):
  15. return (args, kwargs, options)
  16. @classmethod
  17. def apply(cls, args, kwargs, **options):
  18. return (args, kwargs, options)
  19. class test_subtask(Case):
  20. def test_behaves_like_type(self):
  21. s = subtask('tasks.add', (2, 2), {'cache': True},
  22. {'routing_key': 'CPU-bound'})
  23. self.assertDictEqual(subtask(s), s)
  24. def test_task_argument_can_be_task_cls(self):
  25. s = subtask(MockTask, (2, 2))
  26. self.assertEqual(s.task, MockTask.name)
  27. def test_apply_async(self):
  28. s = MockTask.subtask(
  29. (2, 2), {'cache': True}, {'routing_key': 'CPU-bound'},
  30. )
  31. args, kwargs, options = s.apply_async()
  32. self.assertTupleEqual(args, (2, 2))
  33. self.assertDictEqual(kwargs, {'cache': True})
  34. self.assertDictEqual(options, {'routing_key': 'CPU-bound'})
  35. def test_delay_argmerge(self):
  36. s = MockTask.subtask(
  37. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  38. )
  39. args, kwargs, options = s.delay(10, cache=False, other='foo')
  40. self.assertTupleEqual(args, (10, 2))
  41. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  42. self.assertDictEqual(options, {'routing_key': 'CPU-bound'})
  43. def test_apply_async_argmerge(self):
  44. s = MockTask.subtask(
  45. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  46. )
  47. args, kwargs, options = s.apply_async((10, ),
  48. {'cache': False, 'other': 'foo'},
  49. routing_key='IO-bound',
  50. exchange='fast')
  51. self.assertTupleEqual(args, (10, 2))
  52. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  53. self.assertDictEqual(options, {'routing_key': 'IO-bound',
  54. 'exchange': 'fast'})
  55. def test_apply_argmerge(self):
  56. s = MockTask.subtask(
  57. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  58. )
  59. args, kwargs, options = s.apply((10, ),
  60. {'cache': False, 'other': 'foo'},
  61. routing_key='IO-bound',
  62. exchange='fast')
  63. self.assertTupleEqual(args, (10, 2))
  64. self.assertDictEqual(kwargs, {'cache': False, 'other': 'foo'})
  65. self.assertDictEqual(
  66. options, {'routing_key': 'IO-bound', 'exchange': 'fast'},
  67. )
  68. def test_is_JSON_serializable(self):
  69. s = MockTask.subtask(
  70. (2, ), {'cache': True}, {'routing_key': 'CPU-bound'},
  71. )
  72. s.args = list(s.args) # tuples are not preserved
  73. # but this doesn't matter.
  74. self.assertEqual(s, subtask(anyjson.loads(anyjson.dumps(s))))
  75. def test_repr(self):
  76. s = MockTask.subtask((2, ), {'cache': True})
  77. self.assertIn('2', repr(s))
  78. self.assertIn('cache=True', repr(s))
  79. def test_reduce(self):
  80. s = MockTask.subtask((2, ), {'cache': True})
  81. cls, args = s.__reduce__()
  82. self.assertDictEqual(dict(cls(*args)), dict(s))
  83. class test_TaskSet(Case):
  84. def test_task_arg_can_be_iterable__compat(self):
  85. ts = TaskSet([MockTask.subtask((i, i))
  86. for i in (2, 4, 8)])
  87. self.assertEqual(len(ts), 3)
  88. def test_respects_ALWAYS_EAGER(self):
  89. app = current_app
  90. class MockTaskSet(TaskSet):
  91. applied = 0
  92. def apply(self, *args, **kwargs):
  93. self.applied += 1
  94. ts = MockTaskSet(
  95. [MockTask.subtask((i, i)) for i in (2, 4, 8)],
  96. )
  97. app.conf.CELERY_ALWAYS_EAGER = True
  98. try:
  99. ts.apply_async()
  100. finally:
  101. app.conf.CELERY_ALWAYS_EAGER = False
  102. self.assertEqual(ts.applied, 1)
  103. def test_apply_async(self):
  104. applied = [0]
  105. class mocksubtask(Signature):
  106. def apply_async(self, *args, **kwargs):
  107. applied[0] += 1
  108. ts = TaskSet([mocksubtask(MockTask, (i, i))
  109. for i in (2, 4, 8)])
  110. ts.apply_async()
  111. self.assertEqual(applied[0], 3)
  112. class Publisher(object):
  113. def send(self, *args, **kwargs):
  114. pass
  115. ts.apply_async(publisher=Publisher())
  116. # setting current_task
  117. @current_app.task
  118. def xyz():
  119. pass
  120. from celery._state import _task_stack
  121. xyz.push_request()
  122. _task_stack.push(xyz)
  123. try:
  124. ts.apply_async(publisher=Publisher())
  125. finally:
  126. _task_stack.pop()
  127. xyz.pop_request()
  128. def test_apply(self):
  129. applied = [0]
  130. class mocksubtask(Signature):
  131. def apply(self, *args, **kwargs):
  132. applied[0] += 1
  133. ts = TaskSet([mocksubtask(MockTask, (i, i))
  134. for i in (2, 4, 8)])
  135. ts.apply()
  136. self.assertEqual(applied[0], 3)
  137. def test_set_app(self):
  138. ts = TaskSet([])
  139. ts.app = 42
  140. self.assertEqual(ts.app, 42)
  141. def test_set_tasks(self):
  142. ts = TaskSet([])
  143. ts.tasks = [1, 2, 3]
  144. self.assertEqual(ts, [1, 2, 3])
  145. def test_set_Publisher(self):
  146. ts = TaskSet([])
  147. ts.Publisher = 42
  148. self.assertEqual(ts.Publisher, 42)