test_tasks.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. from __future__ import absolute_import
  2. from datetime import datetime, timedelta
  3. from kombu import Queue
  4. from celery import Task
  5. from celery import group
  6. from celery.app.task import _reprtask
  7. from celery.exceptions import Ignore, Retry
  8. from celery.five import items, range, string_t
  9. from celery.result import EagerResult
  10. from celery.utils import uuid
  11. from celery.utils.timeutils import parse_iso8601
  12. from celery.tests.case import (
  13. AppCase, ContextMock, Mock, depends_on_current_app, patch,
  14. )
  15. def return_True(*args, **kwargs):
  16. # Task run functions can't be closures/lambdas, as they're pickled.
  17. return True
  18. def raise_exception(self, **kwargs):
  19. raise Exception('%s error' % self.__class__)
  20. class MockApplyTask(Task):
  21. abstract = True
  22. applied = 0
  23. def run(self, x, y):
  24. return x * y
  25. def apply_async(self, *args, **kwargs):
  26. self.applied += 1
  27. class TasksCase(AppCase):
  28. def setup(self):
  29. self.mytask = self.app.task(shared=False)(return_True)
  30. @self.app.task(bind=True, count=0, shared=False)
  31. def increment_counter(self, increment_by=1):
  32. self.count += increment_by or 1
  33. return self.count
  34. self.increment_counter = increment_counter
  35. @self.app.task(shared=False)
  36. def raising():
  37. raise KeyError('foo')
  38. self.raising = raising
  39. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  40. def retry_task(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
  41. self.iterations += 1
  42. rmax = self.max_retries if max_retries is None else max_retries
  43. assert repr(self.request)
  44. retries = self.request.retries
  45. if care and retries >= rmax:
  46. return arg1
  47. else:
  48. raise self.retry(countdown=0, max_retries=rmax)
  49. self.retry_task = retry_task
  50. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  51. def retry_task_noargs(self, **kwargs):
  52. self.iterations += 1
  53. if self.request.retries >= 3:
  54. return 42
  55. else:
  56. raise self.retry(countdown=0)
  57. self.retry_task_noargs = retry_task_noargs
  58. @self.app.task(bind=True, max_retries=3, iterations=0,
  59. base=MockApplyTask, shared=False)
  60. def retry_task_mockapply(self, arg1, arg2, kwarg=1):
  61. self.iterations += 1
  62. retries = self.request.retries
  63. if retries >= 3:
  64. return arg1
  65. raise self.retry(countdown=0)
  66. self.retry_task_mockapply = retry_task_mockapply
  67. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  68. def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):
  69. self.iterations += 1
  70. retries = self.request.retries
  71. if retries >= 3:
  72. return arg1 + kwarg
  73. else:
  74. try:
  75. raise MyCustomException('Elaine Marie Benes')
  76. except MyCustomException as exc:
  77. kwargs.update(kwarg=kwarg)
  78. raise self.retry(countdown=0, exc=exc)
  79. self.retry_task_customexc = retry_task_customexc
  80. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  81. shared=False)
  82. def autoretry_task_no_kwargs(self, a, b):
  83. self.iterations += 1
  84. return a/b
  85. self.autoretry_task_no_kwargs = autoretry_task_no_kwargs
  86. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  87. retry_kwargs={'max_retries': 5}, shared=False)
  88. def autoretry_task(self, a, b):
  89. self.iterations += 1
  90. return a/b
  91. self.autoretry_task = autoretry_task
  92. class MyCustomException(Exception):
  93. """Random custom exception."""
  94. class test_task_retries(TasksCase):
  95. def test_retry(self):
  96. self.retry_task.max_retries = 3
  97. self.retry_task.iterations = 0
  98. self.retry_task.apply([0xFF, 0xFFFF])
  99. self.assertEqual(self.retry_task.iterations, 4)
  100. self.retry_task.max_retries = 3
  101. self.retry_task.iterations = 0
  102. self.retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
  103. self.assertEqual(self.retry_task.iterations, 11)
  104. def test_retry_no_args(self):
  105. self.retry_task_noargs.max_retries = 3
  106. self.retry_task_noargs.iterations = 0
  107. self.retry_task_noargs.apply(propagate=True).get()
  108. self.assertEqual(self.retry_task_noargs.iterations, 4)
  109. def test_signature_from_request__passes_headers(self):
  110. self.retry_task.push_request()
  111. self.retry_task.request.headers = {'custom': 10.1}
  112. sig = self.retry_task.signature_from_request()
  113. self.assertEqual(sig.options['headers']['custom'], 10.1)
  114. def test_signature_from_request__delivery_info(self):
  115. self.retry_task.push_request()
  116. self.retry_task.request.delivery_info = {
  117. 'exchange': 'testex',
  118. 'routing_key': 'testrk',
  119. }
  120. sig = self.retry_task.signature_from_request()
  121. self.assertEqual(sig.options['exchange'], 'testex')
  122. self.assertEqual(sig.options['routing_key'], 'testrk')
  123. def test_retry_kwargs_can_be_empty(self):
  124. self.retry_task_mockapply.push_request()
  125. try:
  126. with self.assertRaises(Retry):
  127. import sys
  128. try:
  129. sys.exc_clear()
  130. except AttributeError:
  131. pass
  132. self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
  133. finally:
  134. self.retry_task_mockapply.pop_request()
  135. def test_retry_not_eager(self):
  136. self.retry_task_mockapply.push_request()
  137. try:
  138. self.retry_task_mockapply.request.called_directly = False
  139. exc = Exception('baz')
  140. try:
  141. self.retry_task_mockapply.retry(
  142. args=[4, 4], kwargs={'task_retries': 0},
  143. exc=exc, throw=False,
  144. )
  145. self.assertTrue(self.retry_task_mockapply.applied)
  146. finally:
  147. self.retry_task_mockapply.applied = 0
  148. try:
  149. with self.assertRaises(Retry):
  150. self.retry_task_mockapply.retry(
  151. args=[4, 4], kwargs={'task_retries': 0},
  152. exc=exc, throw=True)
  153. self.assertTrue(self.retry_task_mockapply.applied)
  154. finally:
  155. self.retry_task_mockapply.applied = 0
  156. finally:
  157. self.retry_task_mockapply.pop_request()
  158. def test_retry_with_kwargs(self):
  159. self.retry_task_customexc.max_retries = 3
  160. self.retry_task_customexc.iterations = 0
  161. self.retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
  162. self.assertEqual(self.retry_task_customexc.iterations, 4)
  163. def test_retry_with_custom_exception(self):
  164. self.retry_task_customexc.max_retries = 2
  165. self.retry_task_customexc.iterations = 0
  166. result = self.retry_task_customexc.apply(
  167. [0xFF, 0xFFFF], {'kwarg': 0xF},
  168. )
  169. with self.assertRaises(MyCustomException):
  170. result.get()
  171. self.assertEqual(self.retry_task_customexc.iterations, 3)
  172. def test_max_retries_exceeded(self):
  173. self.retry_task.max_retries = 2
  174. self.retry_task.iterations = 0
  175. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  176. with self.assertRaises(self.retry_task.MaxRetriesExceededError):
  177. result.get()
  178. self.assertEqual(self.retry_task.iterations, 3)
  179. self.retry_task.max_retries = 1
  180. self.retry_task.iterations = 0
  181. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  182. with self.assertRaises(self.retry_task.MaxRetriesExceededError):
  183. result.get()
  184. self.assertEqual(self.retry_task.iterations, 2)
  185. def test_autoretry_no_kwargs(self):
  186. self.autoretry_task_no_kwargs.max_retries = 3
  187. self.autoretry_task_no_kwargs.iterations = 0
  188. self.autoretry_task_no_kwargs.apply((1, 0))
  189. self.assertEqual(self.autoretry_task_no_kwargs.iterations, 4)
  190. def test_autoretry(self):
  191. self.autoretry_task.max_retries = 3
  192. self.autoretry_task.iterations = 0
  193. self.autoretry_task.apply((1, 0))
  194. self.assertEqual(self.autoretry_task.iterations, 6)
  195. class test_canvas_utils(TasksCase):
  196. def test_si(self):
  197. self.assertTrue(self.retry_task.si())
  198. self.assertTrue(self.retry_task.si().immutable)
  199. def test_chunks(self):
  200. self.assertTrue(self.retry_task.chunks(range(100), 10))
  201. def test_map(self):
  202. self.assertTrue(self.retry_task.map(range(100)))
  203. def test_starmap(self):
  204. self.assertTrue(self.retry_task.starmap(range(100)))
  205. def test_on_success(self):
  206. self.retry_task.on_success(1, 1, (), {})
  207. class test_tasks(TasksCase):
  208. def now(self):
  209. return self.app.now()
  210. @depends_on_current_app
  211. def test_unpickle_task(self):
  212. import pickle
  213. @self.app.task(shared=True)
  214. def xxx():
  215. pass
  216. self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx.app.tasks[xxx.name])
  217. @patch('celery.app.task.current_app')
  218. @depends_on_current_app
  219. def test_bind__no_app(self, current_app):
  220. class XTask(Task):
  221. _app = None
  222. XTask._app = None
  223. XTask.__bound__ = False
  224. XTask.bind = Mock(name='bind')
  225. self.assertIs(XTask.app, current_app)
  226. XTask.bind.assert_called_with(current_app)
  227. def test_reprtask__no_fmt(self):
  228. self.assertTrue(_reprtask(self.mytask))
  229. def test_AsyncResult(self):
  230. task_id = uuid()
  231. result = self.retry_task.AsyncResult(task_id)
  232. self.assertEqual(result.backend, self.retry_task.backend)
  233. self.assertEqual(result.id, task_id)
  234. def assertNextTaskDataEqual(self, consumer, presult, task_name,
  235. test_eta=False, test_expires=False, **kwargs):
  236. next_task = consumer.queues[0].get(accept=['pickle', 'json'])
  237. task_data = next_task.decode()
  238. self.assertEqual(task_data['id'], presult.id)
  239. self.assertEqual(task_data['task'], task_name)
  240. task_kwargs = task_data.get('kwargs', {})
  241. if test_eta:
  242. self.assertIsInstance(task_data.get('eta'), string_t)
  243. to_datetime = parse_iso8601(task_data.get('eta'))
  244. self.assertIsInstance(to_datetime, datetime)
  245. if test_expires:
  246. self.assertIsInstance(task_data.get('expires'), string_t)
  247. to_datetime = parse_iso8601(task_data.get('expires'))
  248. self.assertIsInstance(to_datetime, datetime)
  249. for arg_name, arg_value in items(kwargs):
  250. self.assertEqual(task_kwargs.get(arg_name), arg_value)
  251. def test_incomplete_task_cls(self):
  252. class IncompleteTask(Task):
  253. app = self.app
  254. name = 'c.unittest.t.itask'
  255. with self.assertRaises(NotImplementedError):
  256. IncompleteTask().run()
  257. def test_task_kwargs_must_be_dictionary(self):
  258. with self.assertRaises(TypeError):
  259. self.increment_counter.apply_async([], 'str')
  260. def test_task_args_must_be_list(self):
  261. with self.assertRaises(ValueError):
  262. self.increment_counter.apply_async('s', {})
  263. def test_regular_task(self):
  264. self.assertIsInstance(self.mytask, Task)
  265. self.assertTrue(self.mytask.run())
  266. self.assertTrue(
  267. callable(self.mytask), 'Task class is callable()',
  268. )
  269. self.assertTrue(self.mytask(), 'Task class runs run() when called')
  270. with self.app.connection_or_acquire() as conn:
  271. consumer = self.app.amqp.TaskConsumer(conn)
  272. with self.assertRaises(NotImplementedError):
  273. consumer.receive('foo', 'foo')
  274. consumer.purge()
  275. self.assertIsNone(consumer.queues[0].get())
  276. self.app.amqp.TaskConsumer(conn, queues=[Queue('foo')])
  277. # Without arguments.
  278. presult = self.mytask.delay()
  279. self.assertNextTaskDataEqual(consumer, presult, self.mytask.name)
  280. # With arguments.
  281. presult2 = self.mytask.apply_async(
  282. kwargs=dict(name='George Costanza'),
  283. )
  284. self.assertNextTaskDataEqual(
  285. consumer, presult2, self.mytask.name, name='George Costanza',
  286. )
  287. # send_task
  288. sresult = self.app.send_task(self.mytask.name,
  289. kwargs=dict(name='Elaine M. Benes'))
  290. self.assertNextTaskDataEqual(
  291. consumer, sresult, self.mytask.name, name='Elaine M. Benes',
  292. )
  293. # With eta.
  294. presult2 = self.mytask.apply_async(
  295. kwargs=dict(name='George Costanza'),
  296. eta=self.now() + timedelta(days=1),
  297. expires=self.now() + timedelta(days=2),
  298. )
  299. self.assertNextTaskDataEqual(
  300. consumer, presult2, self.mytask.name,
  301. name='George Costanza', test_eta=True, test_expires=True,
  302. )
  303. # With countdown.
  304. presult2 = self.mytask.apply_async(
  305. kwargs=dict(name='George Costanza'), countdown=10, expires=12,
  306. )
  307. self.assertNextTaskDataEqual(
  308. consumer, presult2, self.mytask.name,
  309. name='George Costanza', test_eta=True, test_expires=True,
  310. )
  311. # Discarding all tasks.
  312. consumer.purge()
  313. self.mytask.apply_async()
  314. self.assertEqual(consumer.purge(), 1)
  315. self.assertIsNone(consumer.queues[0].get())
  316. self.assertFalse(presult.successful())
  317. self.mytask.backend.mark_as_done(presult.id, result=None)
  318. self.assertTrue(presult.successful())
  319. def test_send_event(self):
  320. mytask = self.mytask._get_current_object()
  321. mytask.app.events = Mock(name='events')
  322. mytask.app.events.attach_mock(ContextMock(), 'default_dispatcher')
  323. mytask.request.id = 'fb'
  324. mytask.send_event('task-foo', id=3122)
  325. mytask.app.events.default_dispatcher().send.assert_called_with(
  326. 'task-foo', uuid='fb', id=3122,
  327. )
  328. def test_replace(self):
  329. sig1 = Mock(name='sig1')
  330. with self.assertRaises(Ignore):
  331. self.mytask.replace(sig1)
  332. def test_replace__group(self):
  333. c = group([self.mytask.s()], app=self.app)
  334. c.freeze = Mock(name='freeze')
  335. c.delay = Mock(name='delay')
  336. self.mytask.request.id = 'id'
  337. self.mytask.request.group = 'group'
  338. self.mytask.request.root_id = 'root_id',
  339. with self.assertRaises(Ignore):
  340. self.mytask.replace(c)
  341. def test_send_error_email_enabled(self):
  342. mytask = self.increment_counter._get_current_object()
  343. mytask.send_error_emails = True
  344. mytask.disable_error_emails = False
  345. mytask.ErrorMail = Mock(name='ErrorMail')
  346. context = Mock(name='context')
  347. exc = Mock(name='context')
  348. mytask.send_error_email(context, exc, foo=1)
  349. mytask.ErrorMail.assert_called_with(mytask, foo=1)
  350. mytask.ErrorMail().send.assert_called_with(context, exc)
  351. def test_add_trail__no_trail(self):
  352. mytask = self.increment_counter._get_current_object()
  353. mytask.trail = False
  354. mytask.add_trail('foo')
  355. def test_repr_v2_compat(self):
  356. self.mytask.__v2_compat__ = True
  357. self.assertIn('v2 compatible', repr(self.mytask))
  358. def test_apply_with_self(self):
  359. @self.app.task(__self__=42, shared=False)
  360. def tawself(self):
  361. return self
  362. self.assertEqual(tawself.apply().get(), 42)
  363. self.assertEqual(tawself(), 42)
  364. def test_context_get(self):
  365. self.mytask.push_request()
  366. try:
  367. request = self.mytask.request
  368. request.foo = 32
  369. self.assertEqual(request.get('foo'), 32)
  370. self.assertEqual(request.get('bar', 36), 36)
  371. request.clear()
  372. finally:
  373. self.mytask.pop_request()
  374. def test_annotate(self):
  375. with patch('celery.app.task.resolve_all_annotations') as anno:
  376. anno.return_value = [{'FOO': 'BAR'}]
  377. @self.app.task(shared=False)
  378. def task():
  379. pass
  380. task.annotate()
  381. self.assertEqual(task.FOO, 'BAR')
  382. def test_after_return(self):
  383. self.mytask.push_request()
  384. try:
  385. self.mytask.request.chord = self.mytask.s()
  386. self.mytask.after_return('SUCCESS', 1.0, 'foobar', (), {}, None)
  387. self.mytask.request.clear()
  388. finally:
  389. self.mytask.pop_request()
  390. def test_update_state(self):
  391. @self.app.task(shared=False)
  392. def yyy():
  393. pass
  394. yyy.push_request()
  395. try:
  396. tid = uuid()
  397. yyy.update_state(tid, 'FROBULATING', {'fooz': 'baaz'})
  398. self.assertEqual(yyy.AsyncResult(tid).status, 'FROBULATING')
  399. self.assertDictEqual(yyy.AsyncResult(tid).result, {'fooz': 'baaz'})
  400. yyy.request.id = tid
  401. yyy.update_state(state='FROBUZATING', meta={'fooz': 'baaz'})
  402. self.assertEqual(yyy.AsyncResult(tid).status, 'FROBUZATING')
  403. self.assertDictEqual(yyy.AsyncResult(tid).result, {'fooz': 'baaz'})
  404. finally:
  405. yyy.pop_request()
  406. def test_repr(self):
  407. @self.app.task(shared=False)
  408. def task_test_repr():
  409. pass
  410. self.assertIn('task_test_repr', repr(task_test_repr))
  411. def test_has___name__(self):
  412. @self.app.task(shared=False)
  413. def yyy2():
  414. pass
  415. self.assertTrue(yyy2.__name__)
  416. class test_apply_task(TasksCase):
  417. def test_apply_throw(self):
  418. with self.assertRaises(KeyError):
  419. self.raising.apply(throw=True)
  420. def test_apply_with_task_eager_propagates(self):
  421. self.app.conf.task_eager_propagates = True
  422. with self.assertRaises(KeyError):
  423. self.raising.apply()
  424. def test_apply(self):
  425. self.increment_counter.count = 0
  426. e = self.increment_counter.apply()
  427. self.assertIsInstance(e, EagerResult)
  428. self.assertEqual(e.get(), 1)
  429. e = self.increment_counter.apply(args=[1])
  430. self.assertEqual(e.get(), 2)
  431. e = self.increment_counter.apply(kwargs={'increment_by': 4})
  432. self.assertEqual(e.get(), 6)
  433. self.assertTrue(e.successful())
  434. self.assertTrue(e.ready())
  435. self.assertTrue(repr(e).startswith('<EagerResult:'))
  436. f = self.raising.apply()
  437. self.assertTrue(f.ready())
  438. self.assertFalse(f.successful())
  439. self.assertTrue(f.traceback)
  440. with self.assertRaises(KeyError):
  441. f.get()