test_task.py 10 KB


  1. import unittest
  2. from StringIO import StringIO
  3. from celery import task
  4. from celery import registry
  5. from celery import messaging
  6. from celery.result import EagerResult
  7. from celery.backends import default_backend
  8. from datetime import datetime, timedelta
  9. from celery.decorators import task as task_dec
  10. def return_True(*args, **kwargs):
  11. # Task run functions can't be closures/lambdas, as they're pickled.
  12. return True
  13. return_True_task = task_dec()(return_True)
  14. def raise_exception(self, **kwargs):
  15. raise Exception("%s error" % self.__class__)
  16. class IncrementCounterTask(task.Task):
  17. name = "c.unittest.increment_counter_task"
  18. count = 0
  19. def run(self, increment_by=1, **kwargs):
  20. increment_by = increment_by or 1
  21. self.__class__.count += increment_by
  22. return self.__class__.count
  23. class RaisingTask(task.Task):
  24. name = "c.unittest.raising_task"
  25. def run(self, **kwargs):
  26. raise KeyError("foo")
  27. class RetryTask(task.Task):
  28. max_retries = 3
  29. iterations = 0
  30. def run(self, arg1, arg2, kwarg=1, **kwargs):
  31. self.__class__.iterations += 1
  32. retries = kwargs["task_retries"]
  33. if retries >= 3:
  34. return arg1
  35. else:
  36. kwargs.update({"kwarg": kwarg})
  37. return self.retry(args=[arg1, arg2], kwargs=kwargs, countdown=0)
  38. class MyCustomException(Exception):
  39. """Random custom exception."""
  40. class RetryTaskCustomExc(task.Task):
  41. max_retries = 3
  42. iterations = 0
  43. def run(self, arg1, arg2, kwarg=1, **kwargs):
  44. self.__class__.iterations += 1
  45. retries = kwargs["task_retries"]
  46. if retries >= 3:
  47. return arg1 + kwarg
  48. else:
  49. try:
  50. raise MyCustomException("Elaine Marie Benes")
  51. except MyCustomException, exc:
  52. kwargs.update({"kwarg": kwarg})
  53. return self.retry(args=[arg1, arg2], kwargs=kwargs,
  54. countdown=0, exc=exc)
  55. class TestTaskRetries(unittest.TestCase):
  56. def test_retry(self):
  57. RetryTask.max_retries = 3
  58. RetryTask.iterations = 0
  59. result = RetryTask.apply([0xFF, 0xFFFF])
  60. self.assertEquals(result.get(), 0xFF)
  61. self.assertEquals(RetryTask.iterations, 4)
  62. def test_retry_with_kwargs(self):
  63. RetryTaskCustomExc.max_retries = 3
  64. RetryTaskCustomExc.iterations = 0
  65. result = RetryTaskCustomExc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
  66. self.assertEquals(result.get(), 0xFF + 0xF)
  67. self.assertEquals(RetryTaskCustomExc.iterations, 4)
  68. def test_retry_with_custom_exception(self):
  69. RetryTaskCustomExc.max_retries = 2
  70. RetryTaskCustomExc.iterations = 0
  71. result = RetryTaskCustomExc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
  72. self.assertRaises(MyCustomException,
  73. result.get)
  74. self.assertEquals(RetryTaskCustomExc.iterations, 3)
  75. def test_max_retries_exceeded(self):
  76. RetryTask.max_retries = 2
  77. RetryTask.iterations = 0
  78. result = RetryTask.apply([0xFF, 0xFFFF])
  79. self.assertRaises(RetryTask.MaxRetriesExceededError,
  80. result.get)
  81. self.assertEquals(RetryTask.iterations, 3)
  82. RetryTask.max_retries = 1
  83. RetryTask.iterations = 0
  84. result = RetryTask.apply([0xFF, 0xFFFF])
  85. self.assertRaises(RetryTask.MaxRetriesExceededError,
  86. result.get)
  87. self.assertEquals(RetryTask.iterations, 2)
  88. class TestCeleryTasks(unittest.TestCase):
  89. def createTaskCls(self, cls_name, task_name=None):
  90. attrs = {"__module__": self.__module__}
  91. if task_name:
  92. attrs["name"] = task_name
  93. cls = type(cls_name, (task.Task, ), attrs)
  94. cls.run = return_True
  95. return cls
  96. def test_ping(self):
  97. from celery import conf
  98. conf.ALWAYS_EAGER = True
  99. self.assertEquals(task.ping(), 'pong')
  100. conf.ALWAYS_EAGER = False
  101. def test_execute_remote(self):
  102. from celery import conf
  103. conf.ALWAYS_EAGER = True
  104. self.assertEquals(task.execute_remote(return_True, ["foo"]).get(),
  105. True)
  106. conf.ALWAYS_EAGER = False
  107. def test_dmap(self):
  108. from celery import conf
  109. import operator
  110. conf.ALWAYS_EAGER = True
  111. res = task.dmap(operator.add, zip(xrange(10), xrange(10)))
  112. self.assertTrue(res, sum([operator.add(x, x)
  113. for x in xrange(10)]))
  114. conf.ALWAYS_EAGER = False
  115. def test_dmap_async(self):
  116. from celery import conf
  117. import operator
  118. conf.ALWAYS_EAGER = True
  119. res = task.dmap_async(operator.add, zip(xrange(10), xrange(10)))
  120. self.assertTrue(res.get(), sum([operator.add(x, x)
  121. for x in xrange(10)]))
  122. conf.ALWAYS_EAGER = False
  123. def assertNextTaskDataEquals(self, consumer, presult, task_name,
  124. test_eta=False, **kwargs):
  125. next_task = consumer.fetch()
  126. task_data = next_task.decode()
  127. self.assertEquals(task_data["id"], presult.task_id)
  128. self.assertEquals(task_data["task"], task_name)
  129. task_kwargs = task_data.get("kwargs", {})
  130. if test_eta:
  131. self.assertTrue(isinstance(task_data.get("eta"), datetime))
  132. for arg_name, arg_value in kwargs.items():
  133. self.assertEquals(task_kwargs.get(arg_name), arg_value)
  134. def test_incomplete_task_cls(self):
  135. class IncompleteTask(task.Task):
  136. name = "c.unittest.t.itask"
  137. self.assertRaises(NotImplementedError, IncompleteTask().run)
  138. def test_regular_task(self):
  139. T1 = self.createTaskCls("T1", "c.unittest.t.t1")
  140. self.assertTrue(isinstance(T1(), T1))
  141. self.assertTrue(T1().run())
  142. self.assertTrue(callable(T1()),
  143. "Task class is callable()")
  144. self.assertTrue(T1()(),
  145. "Task class runs run() when called")
  146. # task name generated out of class module + name.
  147. T2 = self.createTaskCls("T2")
  148. self.assertEquals(T2().name, "celery.tests.test_task.T2")
  149. t1 = T1()
  150. consumer = t1.get_consumer()
  151. self.assertRaises(NotImplementedError, consumer.receive, "foo", "foo")
  152. consumer.discard_all()
  153. self.assertTrue(consumer.fetch() is None)
  154. # Without arguments.
  155. presult = t1.delay()
  156. self.assertNextTaskDataEquals(consumer, presult, t1.name)
  157. # With arguments.
  158. presult2 = task.apply_async(t1, name="George Constanza")
  159. self.assertNextTaskDataEquals(consumer, presult2, t1.name,
  160. name="George Constanza")
  161. # With eta.
  162. presult2 = task.apply_async(t1, kwargs=dict(name="George Constanza"),
  163. eta=datetime.now() + timedelta(days=1))
  164. self.assertNextTaskDataEquals(consumer, presult2, t1.name,
  165. name="George Constanza", test_eta=True)
  166. # With countdown.
  167. presult2 = task.apply_async(t1, kwargs=dict(name="George Constanza"),
  168. countdown=10)
  169. self.assertNextTaskDataEquals(consumer, presult2, t1.name,
  170. name="George Constanza", test_eta=True)
  171. # Discarding all tasks.
  172. task.discard_all()
  173. tid3 = task.apply_async(t1)
  174. self.assertEquals(task.discard_all(), 1)
  175. self.assertTrue(consumer.fetch() is None)
  176. self.assertFalse(task.is_successful(presult.task_id))
  177. self.assertFalse(presult.successful())
  178. default_backend.mark_as_done(presult.task_id, result=None)
  179. self.assertTrue(task.is_successful(presult.task_id))
  180. self.assertTrue(presult.successful())
  181. publisher = t1.get_publisher()
  182. self.assertTrue(isinstance(publisher, messaging.TaskPublisher))
  183. def test_get_logger(self):
  184. T1 = self.createTaskCls("T1", "c.unittest.t.t1")
  185. t1 = T1()
  186. logfh = StringIO()
  187. logger = t1.get_logger(logfile=logfh, loglevel=0)
  188. self.assertTrue(logger)
  189. class TestTaskSet(unittest.TestCase):
  190. def test_function_taskset(self):
  191. from celery import conf
  192. conf.ALWAYS_EAGER = True
  193. ts = task.TaskSet(return_True_task.name, [
  194. [[1], {}], [[2], {}], [[3], {}], [[4], {}], [[5], {}]])
  195. res = ts.run()
  196. self.assertEquals(res.join(), [True, True, True, True, True])
  197. conf.ALWAYS_EAGER = False
  198. def test_counter_taskset(self):
  199. IncrementCounterTask.count = 0
  200. ts = task.TaskSet(IncrementCounterTask, [
  201. [[], {}],
  202. [[], {"increment_by": 2}],
  203. [[], {"increment_by": 3}],
  204. [[], {"increment_by": 4}],
  205. [[], {"increment_by": 5}],
  206. [[], {"increment_by": 6}],
  207. [[], {"increment_by": 7}],
  208. [[], {"increment_by": 8}],
  209. [[], {"increment_by": 9}],
  210. ])
  211. self.assertEquals(ts.task_name, IncrementCounterTask.name)
  212. self.assertEquals(ts.total, 9)
  213. consumer = IncrementCounterTask().get_consumer()
  214. consumer.discard_all()
  215. taskset_res = ts.run()
  216. subtasks = taskset_res.subtasks
  217. taskset_id = taskset_res.taskset_id
  218. for subtask in subtasks:
  219. m = consumer.decoder(consumer.fetch().body)
  220. self.assertEquals(m.get("taskset"), taskset_id)
  221. self.assertEquals(m.get("task"), IncrementCounterTask.name)
  222. self.assertEquals(m.get("id"), subtask.task_id)
  223. IncrementCounterTask().run(
  224. increment_by=m.get("kwargs", {}).get("increment_by"))
  225. self.assertEquals(IncrementCounterTask.count, sum(xrange(1, 10)))
  226. class TestTaskApply(unittest.TestCase):
  227. def test_apply(self):
  228. IncrementCounterTask.count = 0
  229. e = IncrementCounterTask.apply()
  230. self.assertTrue(isinstance(e, EagerResult))
  231. self.assertEquals(e.get(), 1)
  232. e = IncrementCounterTask.apply(args=[1])
  233. self.assertEquals(e.get(), 2)
  234. e = IncrementCounterTask.apply(kwargs={"increment_by": 4})
  235. self.assertEquals(e.get(), 6)
  236. self.assertTrue(e.successful())
  237. self.assertTrue(e.ready())
  238. self.assertTrue(repr(e).startswith("<EagerResult:"))
  239. f = RaisingTask.apply()
  240. self.assertTrue(f.ready())
  241. self.assertFalse(f.successful())
  242. self.assertTrue(f.traceback)
  243. self.assertRaises(KeyError, f.get)