test_request.py 35 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import numbers
  4. import os
  5. import signal
  6. import socket
  7. import sys
  8. from datetime import datetime, timedelta
  9. from billiard.einfo import ExceptionInfo
  10. from kombu.utils import uuid
  11. from kombu.utils.encoding import default_encode, from_utf8, safe_str, safe_repr
  12. from celery import states
  13. from celery.app.trace import (
  14. trace_task,
  15. _trace_task_ret,
  16. TraceInfo,
  17. mro_lookup,
  18. build_tracer,
  19. setup_worker_optimizations,
  20. reset_worker_optimizations,
  21. )
  22. from celery.concurrency.base import BasePool
  23. from celery.exceptions import (
  24. Ignore,
  25. InvalidTaskError,
  26. Reject,
  27. Retry,
  28. TaskRevokedError,
  29. Terminated,
  30. WorkerLostError,
  31. )
  32. from celery.five import monotonic
  33. from celery.signals import task_revoked
  34. from celery.worker import request as module
  35. from celery.worker.request import (
  36. Request, create_request_cls, logger as req_logger,
  37. )
  38. from celery.worker.state import revoked
  39. from celery.tests.case import (
  40. AppCase,
  41. Case,
  42. Mock,
  43. TaskMessage,
  44. task_message_from_sig,
  45. patch,
  46. skip,
  47. )
  48. class RequestCase(AppCase):
  49. def setup(self):
  50. self.app.conf.result_serializer = 'pickle'
  51. @self.app.task(shared=False)
  52. def add(x, y, **kw_):
  53. return x + y
  54. self.add = add
  55. @self.app.task(shared=False)
  56. def mytask(i, **kwargs):
  57. return i ** i
  58. self.mytask = mytask
  59. @self.app.task(shared=False)
  60. def mytask_raising(i):
  61. raise KeyError(i)
  62. self.mytask_raising = mytask_raising
  63. def xRequest(self, name=None, id=None, args=None, kwargs=None,
  64. on_ack=None, on_reject=None, Request=Request, **head):
  65. args = [1] if args is None else args
  66. kwargs = {'f': 'x'} if kwargs is None else kwargs
  67. on_ack = on_ack or Mock(name='on_ack')
  68. on_reject = on_reject or Mock(name='on_reject')
  69. message = TaskMessage(
  70. name or self.mytask.name, id, args=args, kwargs=kwargs, **head
  71. )
  72. return Request(message, app=self.app,
  73. on_ack=on_ack, on_reject=on_reject)
  74. class test_mro_lookup(Case):
  75. def test_order(self):
  76. class A:
  77. pass
  78. class B(A):
  79. pass
  80. class C(B):
  81. pass
  82. class D(C):
  83. @classmethod
  84. def mro(cls):
  85. return ()
  86. A.x = 10
  87. self.assertEqual(mro_lookup(C, 'x'), A)
  88. self.assertIsNone(mro_lookup(C, 'x', stop={A}))
  89. B.x = 10
  90. self.assertEqual(mro_lookup(C, 'x'), B)
  91. C.x = 10
  92. self.assertEqual(mro_lookup(C, 'x'), C)
  93. self.assertIsNone(mro_lookup(D, 'x'))
  94. def jail(app, task_id, name, args, kwargs):
  95. request = {'id': task_id}
  96. task = app.tasks[name]
  97. task.__trace__ = None # rebuild
  98. return trace_task(
  99. task, task_id, args, kwargs, request=request, eager=False, app=app,
  100. ).retval
  101. @skip.if_python3()
  102. class test_default_encode(AppCase):
  103. def test_jython(self):
  104. prev, sys.platform = sys.platform, 'java 1.6.1'
  105. try:
  106. self.assertEqual(default_encode(bytes('foo')), 'foo')
  107. finally:
  108. sys.platform = prev
  109. def test_cpython(self):
  110. prev, sys.platform = sys.platform, 'darwin'
  111. gfe, sys.getfilesystemencoding = (
  112. sys.getfilesystemencoding,
  113. lambda: 'utf-8',
  114. )
  115. try:
  116. self.assertEqual(default_encode(bytes('foo')), 'foo')
  117. finally:
  118. sys.platform = prev
  119. sys.getfilesystemencoding = gfe
  120. class test_Retry(AppCase):
  121. def test_retry_semipredicate(self):
  122. try:
  123. raise Exception('foo')
  124. except Exception as exc:
  125. ret = Retry('Retrying task', exc)
  126. self.assertEqual(ret.exc, exc)
  127. class test_trace_task(RequestCase):
  128. def setup(self):
  129. @self.app.task(shared=False)
  130. def mytask(i, **kwargs):
  131. return i ** i
  132. self.mytask = mytask
  133. @self.app.task(shared=False)
  134. def mytask_raising(i):
  135. raise KeyError(i)
  136. self.mytask_raising = mytask_raising
  137. @patch('celery.app.trace.logger')
  138. def test_process_cleanup_fails(self, _logger):
  139. self.mytask.backend = Mock()
  140. self.mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  141. tid = uuid()
  142. ret = jail(self.app, tid, self.mytask.name, [2], {})
  143. self.assertEqual(ret, 4)
  144. self.mytask.backend.mark_as_done.assert_called()
  145. self.assertIn('Process cleanup failed', _logger.error.call_args[0][0])
  146. def test_process_cleanup_BaseException(self):
  147. self.mytask.backend = Mock()
  148. self.mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  149. with self.assertRaises(SystemExit):
  150. jail(self.app, uuid(), self.mytask.name, [2], {})
  151. def test_execute_jail_success(self):
  152. ret = jail(self.app, uuid(), self.mytask.name, [2], {})
  153. self.assertEqual(ret, 4)
  154. def test_marked_as_started(self):
  155. _started = []
  156. def store_result(tid, meta, state, **kwargs):
  157. if state == states.STARTED:
  158. _started.append(tid)
  159. self.mytask.backend.store_result = Mock(name='store_result')
  160. self.mytask.backend.store_result.side_effect = store_result
  161. self.mytask.track_started = True
  162. tid = uuid()
  163. jail(self.app, tid, self.mytask.name, [2], {})
  164. self.assertIn(tid, _started)
  165. self.mytask.ignore_result = True
  166. tid = uuid()
  167. jail(self.app, tid, self.mytask.name, [2], {})
  168. self.assertNotIn(tid, _started)
  169. def test_execute_jail_failure(self):
  170. ret = jail(
  171. self.app, uuid(), self.mytask_raising.name, [4], {},
  172. )
  173. self.assertIsInstance(ret, ExceptionInfo)
  174. self.assertTupleEqual(ret.exception.args, (4,))
  175. def test_execute_ignore_result(self):
  176. @self.app.task(shared=False, ignore_result=True)
  177. def ignores_result(i):
  178. return i ** i
  179. task_id = uuid()
  180. ret = jail(self.app, task_id, ignores_result.name, [4], {})
  181. self.assertEqual(ret, 256)
  182. self.assertFalse(self.app.AsyncResult(task_id).ready())
  183. class test_Request(RequestCase):
  184. def get_request(self, sig, Request=Request, **kwargs):
  185. return Request(
  186. task_message_from_sig(self.app, sig),
  187. on_ack=Mock(name='on_ack'),
  188. on_reject=Mock(name='on_reject'),
  189. eventer=Mock(name='eventer'),
  190. app=self.app,
  191. connection_errors=(socket.error,),
  192. task=sig.type,
  193. **kwargs
  194. )
  195. def test_shadow(self):
  196. self.assertEqual(
  197. self.get_request(self.add.s(2, 2).set(shadow='fooxyz')).name,
  198. 'fooxyz',
  199. )
  200. def test_invalid_eta_raises_InvalidTaskError(self):
  201. with self.assertRaises(InvalidTaskError):
  202. self.get_request(self.add.s(2, 2).set(eta='12345'))
  203. def test_invalid_expires_raises_InvalidTaskError(self):
  204. with self.assertRaises(InvalidTaskError):
  205. self.get_request(self.add.s(2, 2).set(expires='12345'))
  206. def test_valid_expires_with_utc_makes_aware(self):
  207. with patch('celery.worker.request.maybe_make_aware') as mma:
  208. self.get_request(self.add.s(2, 2).set(expires=10),
  209. maybe_make_aware=mma)
  210. mma.assert_called()
  211. def test_maybe_expire_when_expires_is_None(self):
  212. req = self.get_request(self.add.s(2, 2))
  213. self.assertFalse(req.maybe_expire())
  214. def test_on_retry_acks_if_late(self):
  215. self.add.acks_late = True
  216. req = self.get_request(self.add.s(2, 2))
  217. req.on_retry(Mock())
  218. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  219. def test_on_failure_Termianted(self):
  220. einfo = None
  221. try:
  222. raise Terminated('9')
  223. except Terminated:
  224. einfo = ExceptionInfo()
  225. self.assertIsNotNone(einfo)
  226. req = self.get_request(self.add.s(2, 2))
  227. req.on_failure(einfo)
  228. req.eventer.send.assert_called_with(
  229. 'task-revoked',
  230. uuid=req.id, terminated=True, signum='9', expired=False,
  231. )
  232. def test_on_failure_propagates_MemoryError(self):
  233. einfo = None
  234. try:
  235. raise MemoryError()
  236. except MemoryError:
  237. einfo = ExceptionInfo(internal=True)
  238. self.assertIsNotNone(einfo)
  239. req = self.get_request(self.add.s(2, 2))
  240. with self.assertRaises(MemoryError):
  241. req.on_failure(einfo)
  242. def test_on_failure_Ignore_acknowledges(self):
  243. einfo = None
  244. try:
  245. raise Ignore()
  246. except Ignore:
  247. einfo = ExceptionInfo(internal=True)
  248. self.assertIsNotNone(einfo)
  249. req = self.get_request(self.add.s(2, 2))
  250. req.on_failure(einfo)
  251. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  252. def test_on_failure_Reject_rejects(self):
  253. einfo = None
  254. try:
  255. raise Reject()
  256. except Reject:
  257. einfo = ExceptionInfo(internal=True)
  258. self.assertIsNotNone(einfo)
  259. req = self.get_request(self.add.s(2, 2))
  260. req.on_failure(einfo)
  261. req.on_reject.assert_called_with(
  262. req_logger, req.connection_errors, False,
  263. )
  264. def test_on_failure_Reject_rejects_with_requeue(self):
  265. einfo = None
  266. try:
  267. raise Reject(requeue=True)
  268. except Reject:
  269. einfo = ExceptionInfo(internal=True)
  270. self.assertIsNotNone(einfo)
  271. req = self.get_request(self.add.s(2, 2))
  272. req.on_failure(einfo)
  273. req.on_reject.assert_called_with(
  274. req_logger, req.connection_errors, True,
  275. )
  276. def test_on_failure_WorkerLostError_rejects_with_requeue(self):
  277. einfo = None
  278. try:
  279. raise WorkerLostError()
  280. except:
  281. einfo = ExceptionInfo(internal=True)
  282. req = self.get_request(self.add.s(2, 2))
  283. req.task.acks_late = True
  284. req.task.reject_on_worker_lost = True
  285. req.delivery_info['redelivered'] = False
  286. req.on_failure(einfo)
  287. req.on_reject.assert_called_with(
  288. req_logger, req.connection_errors, True)
  289. def test_on_failure_WorkerLostError_redelivered_None(self):
  290. einfo = None
  291. try:
  292. raise WorkerLostError()
  293. except:
  294. einfo = ExceptionInfo(internal=True)
  295. req = self.get_request(self.add.s(2, 2))
  296. req.task.acks_late = True
  297. req.task.reject_on_worker_lost = True
  298. req.delivery_info['redelivered'] = None
  299. req.on_failure(einfo)
  300. req.on_reject.assert_called_with(
  301. req_logger, req.connection_errors, False)
  302. def test_tzlocal_is_cached(self):
  303. req = self.get_request(self.add.s(2, 2))
  304. req._tzlocal = 'foo'
  305. self.assertEqual(req.tzlocal, 'foo')
  306. def test_task_wrapper_repr(self):
  307. self.assertTrue(repr(self.xRequest()))
  308. def test_sets_store_errors(self):
  309. self.mytask.ignore_result = True
  310. job = self.xRequest()
  311. self.assertFalse(job.store_errors)
  312. self.mytask.store_errors_even_if_ignored = True
  313. job = self.xRequest()
  314. self.assertTrue(job.store_errors)
  315. def test_send_event(self):
  316. job = self.xRequest()
  317. job.eventer = Mock(name='.eventer')
  318. job.send_event('task-frobulated')
  319. job.eventer.send.assert_called_with('task-frobulated', uuid=job.id)
  320. def test_on_retry(self):
  321. job = self.get_request(self.mytask.s(1, f='x'))
  322. job.eventer = Mock(name='.eventer')
  323. try:
  324. raise Retry('foo', KeyError('moofoobar'))
  325. except:
  326. einfo = ExceptionInfo()
  327. job.on_failure(einfo)
  328. job.eventer.send.assert_called_with(
  329. 'task-retried',
  330. uuid=job.id,
  331. exception=safe_repr(einfo.exception.exc),
  332. traceback=safe_str(einfo.traceback),
  333. )
  334. prev, module._does_info = module._does_info, False
  335. try:
  336. job.on_failure(einfo)
  337. finally:
  338. module._does_info = prev
  339. einfo.internal = True
  340. job.on_failure(einfo)
  341. def test_compat_properties(self):
  342. job = self.xRequest()
  343. self.assertEqual(job.task_id, job.id)
  344. self.assertEqual(job.task_name, job.name)
  345. job.task_id = 'ID'
  346. self.assertEqual(job.id, 'ID')
  347. job.task_name = 'NAME'
  348. self.assertEqual(job.name, 'NAME')
  349. def test_terminate__pool_ref(self):
  350. pool = Mock()
  351. signum = signal.SIGTERM
  352. job = self.get_request(self.mytask.s(1, f='x'))
  353. job._apply_result = Mock(name='_apply_result')
  354. with self.assert_signal_called(
  355. task_revoked, sender=job.task, request=job,
  356. terminated=True, expired=False, signum=signum):
  357. job.time_start = monotonic()
  358. job.worker_pid = 314
  359. job.terminate(pool, signal='TERM')
  360. job._apply_result().terminate.assert_called_with(signum)
  361. job._apply_result = Mock(name='_apply_result2')
  362. job._apply_result.return_value = None
  363. job.terminate(pool, signal='TERM')
  364. def test_terminate__task_started(self):
  365. pool = Mock()
  366. signum = signal.SIGTERM
  367. job = self.get_request(self.mytask.s(1, f='x'))
  368. with self.assert_signal_called(
  369. task_revoked, sender=job.task, request=job,
  370. terminated=True, expired=False, signum=signum):
  371. job.time_start = monotonic()
  372. job.worker_pid = 313
  373. job.terminate(pool, signal='TERM')
  374. pool.terminate_job.assert_called_with(job.worker_pid, signum)
  375. def test_terminate__task_reserved(self):
  376. pool = Mock()
  377. job = self.get_request(self.mytask.s(1, f='x'))
  378. job.time_start = None
  379. job.terminate(pool, signal='TERM')
  380. pool.terminate_job.assert_not_called()
  381. self.assertTupleEqual(job._terminate_on_ack, (pool, 15))
  382. job.terminate(pool, signal='TERM')
  383. def test_revoked_expires_expired(self):
  384. job = self.get_request(self.mytask.s(1, f='x').set(
  385. expires=datetime.utcnow() - timedelta(days=1)
  386. ))
  387. with self.assert_signal_called(
  388. task_revoked, sender=job.task, request=job,
  389. terminated=False, expired=True, signum=None):
  390. job.revoked()
  391. self.assertIn(job.id, revoked)
  392. self.assertEqual(
  393. self.mytask.backend.get_status(job.id),
  394. states.REVOKED,
  395. )
  396. def test_revoked_expires_not_expired(self):
  397. job = self.xRequest(
  398. expires=datetime.utcnow() + timedelta(days=1),
  399. )
  400. job.revoked()
  401. self.assertNotIn(job.id, revoked)
  402. self.assertNotEqual(
  403. self.mytask.backend.get_status(job.id),
  404. states.REVOKED,
  405. )
  406. def test_revoked_expires_ignore_result(self):
  407. self.mytask.ignore_result = True
  408. job = self.xRequest(
  409. expires=datetime.utcnow() - timedelta(days=1),
  410. )
  411. job.revoked()
  412. self.assertIn(job.id, revoked)
  413. self.assertNotEqual(
  414. self.mytask.backend.get_status(job.id), states.REVOKED,
  415. )
  416. def test_already_revoked(self):
  417. job = self.xRequest()
  418. job._already_revoked = True
  419. self.assertTrue(job.revoked())
  420. def test_revoked(self):
  421. job = self.xRequest()
  422. with self.assert_signal_called(
  423. task_revoked, sender=job.task, request=job,
  424. terminated=False, expired=False, signum=None):
  425. revoked.add(job.id)
  426. self.assertTrue(job.revoked())
  427. self.assertTrue(job._already_revoked)
  428. self.assertTrue(job.acknowledged)
  429. def test_execute_does_not_execute_revoked(self):
  430. job = self.xRequest()
  431. revoked.add(job.id)
  432. job.execute()
  433. def test_execute_acks_late(self):
  434. self.mytask_raising.acks_late = True
  435. job = self.xRequest(
  436. name=self.mytask_raising.name,
  437. kwargs={},
  438. )
  439. job.execute()
  440. self.assertTrue(job.acknowledged)
  441. job.execute()
  442. def test_execute_using_pool_does_not_execute_revoked(self):
  443. job = self.xRequest()
  444. revoked.add(job.id)
  445. with self.assertRaises(TaskRevokedError):
  446. job.execute_using_pool(None)
  447. def test_on_accepted_acks_early(self):
  448. job = self.xRequest()
  449. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  450. self.assertTrue(job.acknowledged)
  451. prev, module._does_debug = module._does_debug, False
  452. try:
  453. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  454. finally:
  455. module._does_debug = prev
  456. def test_on_accepted_acks_late(self):
  457. job = self.xRequest()
  458. self.mytask.acks_late = True
  459. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  460. self.assertFalse(job.acknowledged)
  461. def test_on_accepted_terminates(self):
  462. signum = signal.SIGTERM
  463. pool = Mock()
  464. job = self.xRequest()
  465. with self.assert_signal_called(
  466. task_revoked, sender=job.task, request=job,
  467. terminated=True, expired=False, signum=signum):
  468. job.terminate(pool, signal='TERM')
  469. self.assertFalse(pool.terminate_job.call_count)
  470. job.on_accepted(pid=314, time_accepted=monotonic())
  471. pool.terminate_job.assert_called_with(314, signum)
  472. def test_on_success_acks_early(self):
  473. job = self.xRequest()
  474. job.time_start = 1
  475. job.on_success((0, 42, 0.001))
  476. prev, module._does_info = module._does_info, False
  477. try:
  478. job.on_success((0, 42, 0.001))
  479. self.assertFalse(job.acknowledged)
  480. finally:
  481. module._does_info = prev
  482. def test_on_success_BaseException(self):
  483. job = self.xRequest()
  484. job.time_start = 1
  485. with self.assertRaises(SystemExit):
  486. try:
  487. raise SystemExit()
  488. except SystemExit:
  489. job.on_success((1, ExceptionInfo(), 0.01))
  490. else:
  491. assert False
  492. def test_on_success_eventer(self):
  493. job = self.xRequest()
  494. job.time_start = 1
  495. job.eventer = Mock()
  496. job.eventer.send = Mock()
  497. job.on_success((0, 42, 0.001))
  498. job.eventer.send.assert_called()
  499. def test_on_success_when_failure(self):
  500. job = self.xRequest()
  501. job.time_start = 1
  502. job.on_failure = Mock()
  503. try:
  504. raise KeyError('foo')
  505. except Exception:
  506. job.on_success((1, ExceptionInfo(), 0.001))
  507. job.on_failure.assert_called()
  508. def test_on_success_acks_late(self):
  509. job = self.xRequest()
  510. job.time_start = 1
  511. self.mytask.acks_late = True
  512. job.on_success((0, 42, 0.001))
  513. self.assertTrue(job.acknowledged)
  514. def test_on_failure_WorkerLostError(self):
  515. def get_ei():
  516. try:
  517. raise WorkerLostError('do re mi')
  518. except WorkerLostError:
  519. return ExceptionInfo()
  520. job = self.xRequest()
  521. exc_info = get_ei()
  522. job.on_failure(exc_info)
  523. self.assertEqual(
  524. self.mytask.backend.get_status(job.id), states.FAILURE,
  525. )
  526. self.mytask.ignore_result = True
  527. exc_info = get_ei()
  528. job = self.xRequest()
  529. job.on_failure(exc_info)
  530. self.assertEqual(
  531. self.mytask.backend.get_status(job.id), states.PENDING,
  532. )
  533. def test_on_failure_acks_late(self):
  534. job = self.xRequest()
  535. job.time_start = 1
  536. self.mytask.acks_late = True
  537. try:
  538. raise KeyError('foo')
  539. except KeyError:
  540. exc_info = ExceptionInfo()
  541. job.on_failure(exc_info)
  542. self.assertTrue(job.acknowledged)
  543. def test_from_message_invalid_kwargs(self):
  544. m = TaskMessage(self.mytask.name, args=(), kwargs='foo')
  545. req = Request(m, app=self.app)
  546. with self.assertRaises(InvalidTaskError):
  547. raise req.execute().exception
  548. @patch('celery.worker.request.error')
  549. @patch('celery.worker.request.warn')
  550. def test_on_timeout(self, warn, error):
  551. job = self.xRequest()
  552. job.acknowledge = Mock(name='ack')
  553. job.task.acks_late = True
  554. job.on_timeout(soft=True, timeout=1337)
  555. self.assertIn('Soft time limit', warn.call_args[0][0])
  556. job.on_timeout(soft=False, timeout=1337)
  557. self.assertIn('Hard time limit', error.call_args[0][0])
  558. self.assertEqual(
  559. self.mytask.backend.get_status(job.id), states.FAILURE,
  560. )
  561. job.acknowledge.assert_called_with()
  562. self.mytask.ignore_result = True
  563. job = self.xRequest()
  564. job.on_timeout(soft=True, timeout=1336)
  565. self.assertEqual(
  566. self.mytask.backend.get_status(job.id), states.PENDING,
  567. )
  568. job = self.xRequest()
  569. job.acknowledge = Mock(name='ack')
  570. job.task.acks_late = False
  571. job.on_timeout(soft=True, timeout=1335)
  572. job.acknowledge.assert_not_called()
  573. def test_fast_trace_task(self):
  574. from celery.app import trace
  575. setup_worker_optimizations(self.app)
  576. self.assertIs(trace.trace_task_ret, trace._fast_trace_task)
  577. tid = uuid()
  578. message = TaskMessage(self.mytask.name, tid, args=[4])
  579. assert len(message.payload) == 3
  580. try:
  581. self.mytask.__trace__ = build_tracer(
  582. self.mytask.name, self.mytask, self.app.loader, 'test',
  583. app=self.app,
  584. )
  585. failed, res, runtime = trace.trace_task_ret(
  586. self.mytask.name, tid, message.headers, message.body,
  587. message.content_type, message.content_encoding)
  588. self.assertFalse(failed)
  589. self.assertEqual(res, repr(4 ** 4))
  590. self.assertIsNotNone(runtime)
  591. self.assertIsInstance(runtime, numbers.Real)
  592. finally:
  593. reset_worker_optimizations()
  594. self.assertIs(trace.trace_task_ret, trace._trace_task_ret)
  595. delattr(self.mytask, '__trace__')
  596. failed, res, runtime = trace.trace_task_ret(
  597. self.mytask.name, tid, message.headers, message.body,
  598. message.content_type, message.content_encoding, app=self.app,
  599. )
  600. self.assertFalse(failed)
  601. self.assertEqual(res, repr(4 ** 4))
  602. self.assertIsNotNone(runtime)
  603. self.assertIsInstance(runtime, numbers.Real)
  604. def test_trace_task_ret(self):
  605. self.mytask.__trace__ = build_tracer(
  606. self.mytask.name, self.mytask, self.app.loader, 'test',
  607. app=self.app,
  608. )
  609. tid = uuid()
  610. message = TaskMessage(self.mytask.name, tid, args=[4])
  611. _, R, _ = _trace_task_ret(
  612. self.mytask.name, tid, message.headers,
  613. message.body, message.content_type,
  614. message.content_encoding, app=self.app,
  615. )
  616. self.assertEqual(R, repr(4 ** 4))
  617. def test_trace_task_ret__no_trace(self):
  618. try:
  619. delattr(self.mytask, '__trace__')
  620. except AttributeError:
  621. pass
  622. tid = uuid()
  623. message = TaskMessage(self.mytask.name, tid, args=[4])
  624. _, R, _ = _trace_task_ret(
  625. self.mytask.name, tid, message.headers,
  626. message.body, message.content_type,
  627. message.content_encoding, app=self.app,
  628. )
  629. self.assertEqual(R, repr(4 ** 4))
  630. def test_trace_catches_exception(self):
  631. @self.app.task(request=None, shared=False)
  632. def raising():
  633. raise KeyError('baz')
  634. with self.assertWarnsRegex(RuntimeWarning,
  635. r'Exception raised outside'):
  636. res = trace_task(raising, uuid(), [], {}, app=self.app)[0]
  637. self.assertIsInstance(res, ExceptionInfo)
  638. def test_worker_task_trace_handle_retry(self):
  639. tid = uuid()
  640. self.mytask.push_request(id=tid)
  641. try:
  642. raise ValueError('foo')
  643. except Exception as exc:
  644. try:
  645. raise Retry(str(exc), exc=exc)
  646. except Retry as exc:
  647. w = TraceInfo(states.RETRY, exc)
  648. w.handle_retry(
  649. self.mytask, self.mytask.request, store_errors=False,
  650. )
  651. self.assertEqual(
  652. self.mytask.backend.get_status(tid), states.PENDING,
  653. )
  654. w.handle_retry(
  655. self.mytask, self.mytask.request, store_errors=True,
  656. )
  657. self.assertEqual(
  658. self.mytask.backend.get_status(tid), states.RETRY,
  659. )
  660. finally:
  661. self.mytask.pop_request()
  662. def test_worker_task_trace_handle_failure(self):
  663. tid = uuid()
  664. self.mytask.push_request()
  665. try:
  666. self.mytask.request.id = tid
  667. try:
  668. raise ValueError('foo')
  669. except Exception as exc:
  670. w = TraceInfo(states.FAILURE, exc)
  671. w.handle_failure(
  672. self.mytask, self.mytask.request, store_errors=False,
  673. )
  674. self.assertEqual(
  675. self.mytask.backend.get_status(tid), states.PENDING,
  676. )
  677. w.handle_failure(
  678. self.mytask, self.mytask.request, store_errors=True,
  679. )
  680. self.assertEqual(
  681. self.mytask.backend.get_status(tid), states.FAILURE,
  682. )
  683. finally:
  684. self.mytask.pop_request()
  685. def test_from_message(self):
  686. us = 'æØåveéðƒeæ'
  687. tid = uuid()
  688. m = TaskMessage(self.mytask.name, tid, args=[2], kwargs={us: 'bar'})
  689. job = Request(m, app=self.app)
  690. self.assertIsInstance(job, Request)
  691. self.assertEqual(job.name, self.mytask.name)
  692. self.assertEqual(job.id, tid)
  693. self.assertIs(job.message, m)
  694. def test_from_message_empty_args(self):
  695. tid = uuid()
  696. m = TaskMessage(self.mytask.name, tid, args=[], kwargs={})
  697. job = Request(m, app=self.app)
  698. self.assertIsInstance(job, Request)
  699. def test_from_message_missing_required_fields(self):
  700. m = TaskMessage(self.mytask.name)
  701. m.headers.clear()
  702. with self.assertRaises(KeyError):
  703. Request(m, app=self.app)
  704. def test_from_message_nonexistant_task(self):
  705. m = TaskMessage(
  706. 'cu.mytask.doesnotexist',
  707. args=[2], kwargs={'æØåveéðƒeæ': 'bar'},
  708. )
  709. with self.assertRaises(KeyError):
  710. Request(m, app=self.app)
  711. def test_execute(self):
  712. tid = uuid()
  713. job = self.xRequest(id=tid, args=[4], kwargs={})
  714. self.assertEqual(job.execute(), 256)
  715. meta = self.mytask.backend.get_task_meta(tid)
  716. self.assertEqual(meta['status'], states.SUCCESS)
  717. self.assertEqual(meta['result'], 256)
  718. def test_execute_success_no_kwargs(self):
  719. @self.app.task # traverses coverage for decorator without parens
  720. def mytask_no_kwargs(i):
  721. return i ** i
  722. tid = uuid()
  723. job = self.xRequest(
  724. name=mytask_no_kwargs.name,
  725. id=tid,
  726. args=[4],
  727. kwargs={},
  728. )
  729. self.assertEqual(job.execute(), 256)
  730. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  731. self.assertEqual(meta['result'], 256)
  732. self.assertEqual(meta['status'], states.SUCCESS)
  733. def test_execute_ack(self):
  734. scratch = {'ACK': False}
  735. def on_ack(*args, **kwargs):
  736. scratch['ACK'] = True
  737. tid = uuid()
  738. job = self.xRequest(id=tid, args=[4], on_ack=on_ack)
  739. self.assertEqual(job.execute(), 256)
  740. meta = self.mytask.backend.get_task_meta(tid)
  741. self.assertTrue(scratch['ACK'])
  742. self.assertEqual(meta['result'], 256)
  743. self.assertEqual(meta['status'], states.SUCCESS)
  744. def test_execute_fail(self):
  745. tid = uuid()
  746. job = self.xRequest(
  747. name=self.mytask_raising.name,
  748. id=tid,
  749. args=[4],
  750. kwargs={},
  751. )
  752. self.assertIsInstance(job.execute(), ExceptionInfo)
  753. assert self.mytask_raising.backend.serializer == 'pickle'
  754. meta = self.mytask_raising.backend.get_task_meta(tid)
  755. self.assertEqual(meta['status'], states.FAILURE)
  756. self.assertIsInstance(meta['result'], KeyError)
  757. def test_execute_using_pool(self):
  758. tid = uuid()
  759. job = self.xRequest(id=tid, args=[4])
  760. class MockPool(BasePool):
  761. target = None
  762. args = None
  763. kwargs = None
  764. def __init__(self, *args, **kwargs):
  765. pass
  766. def apply_async(self, target, args=None, kwargs=None,
  767. *margs, **mkwargs):
  768. self.target = target
  769. self.args = args
  770. self.kwargs = kwargs
  771. p = MockPool()
  772. job.execute_using_pool(p)
  773. self.assertTrue(p.target)
  774. self.assertEqual(p.args[0], self.mytask.name)
  775. self.assertEqual(p.args[1], tid)
  776. self.assertEqual(p.args[3], job.message.body)
  777. def _test_on_failure(self, exception, **kwargs):
  778. tid = uuid()
  779. job = self.xRequest(id=tid, args=[4])
  780. job.send_event = Mock(name='send_event')
  781. job.task.backend.mark_as_failure = Mock(name='mark_as_failure')
  782. try:
  783. raise exception
  784. except type(exception):
  785. exc_info = ExceptionInfo()
  786. job.on_failure(exc_info, **kwargs)
  787. job.send_event.assert_called()
  788. return job
  789. def test_on_failure(self):
  790. self._test_on_failure(Exception('Inside unit tests'))
  791. def test_on_failure__unicode_exception(self):
  792. self._test_on_failure(Exception('Бобры атакуют'))
  793. def test_on_failure__utf8_exception(self):
  794. self._test_on_failure(Exception(
  795. from_utf8('Бобры атакуют')))
  796. def test_on_failure__WorkerLostError(self):
  797. exc = WorkerLostError()
  798. job = self._test_on_failure(exc)
  799. job.task.backend.mark_as_failure.assert_called_with(
  800. job.id, exc, request=job, store_result=True,
  801. )
  802. def test_on_failure__return_ok(self):
  803. self._test_on_failure(KeyError(), return_ok=True)
  804. def test_reject(self):
  805. job = self.xRequest(id=uuid())
  806. job.on_reject = Mock(name='on_reject')
  807. job.reject(requeue=True)
  808. job.on_reject.assert_called_with(
  809. req_logger, job.connection_errors, True,
  810. )
  811. self.assertTrue(job.acknowledged)
  812. job.on_reject.reset_mock()
  813. job.reject(requeue=True)
  814. job.on_reject.assert_not_called()
  815. def test_group(self):
  816. gid = uuid()
  817. job = self.xRequest(id=uuid(), group=gid)
  818. self.assertEqual(job.group, gid)
  819. class test_create_request_class(RequestCase):
  820. def setup(self):
  821. RequestCase.setup(self)
  822. self.task = Mock(name='task')
  823. self.pool = Mock(name='pool')
  824. self.eventer = Mock(name='eventer')
  825. def create_request_cls(self, **kwargs):
  826. return create_request_cls(
  827. Request, self.task, self.pool, 'foo', self.eventer, **kwargs
  828. )
  829. def zRequest(self, Request=None, revoked_tasks=None, ref=None, **kwargs):
  830. return self.xRequest(
  831. Request=Request or self.create_request_cls(
  832. ref=ref,
  833. revoked_tasks=revoked_tasks,
  834. ),
  835. **kwargs)
  836. def test_on_success(self):
  837. self.zRequest(id=uuid()).on_success((False, 'hey', 3.1222))
  838. def test_on_success__SystemExit(self,
  839. errors=(SystemExit, KeyboardInterrupt)):
  840. for exc in errors:
  841. einfo = None
  842. try:
  843. raise exc()
  844. except exc:
  845. einfo = ExceptionInfo()
  846. with self.assertRaises(exc):
  847. self.zRequest(id=uuid()).on_success((True, einfo, 1.0))
  848. def test_on_success__calls_failure(self):
  849. job = self.zRequest(id=uuid())
  850. einfo = Mock(name='einfo')
  851. job.on_failure = Mock(name='on_failure')
  852. job.on_success((True, einfo, 1.0))
  853. job.on_failure.assert_called_with(einfo, return_ok=True)
  854. def test_on_success__acks_late_enabled(self):
  855. self.task.acks_late = True
  856. job = self.zRequest(id=uuid())
  857. job.acknowledge = Mock(name='ack')
  858. job.on_success((False, 'foo', 1.0))
  859. job.acknowledge.assert_called_with()
  860. def test_on_success__acks_late_disabled(self):
  861. self.task.acks_late = False
  862. job = self.zRequest(id=uuid())
  863. job.acknowledge = Mock(name='ack')
  864. job.on_success((False, 'foo', 1.0))
  865. job.acknowledge.assert_not_called()
  866. def test_on_success__no_events(self):
  867. self.eventer = None
  868. job = self.zRequest(id=uuid())
  869. job.send_event = Mock(name='send_event')
  870. job.on_success((False, 'foo', 1.0))
  871. job.send_event.assert_not_called()
  872. def test_on_success__with_events(self):
  873. job = self.zRequest(id=uuid())
  874. job.send_event = Mock(name='send_event')
  875. job.on_success((False, 'foo', 1.0))
  876. job.send_event.assert_called_with(
  877. 'task-succeeded', result='foo', runtime=1.0,
  878. )
  879. def test_execute_using_pool__revoked(self):
  880. tid = uuid()
  881. job = self.zRequest(id=tid, revoked_tasks={tid})
  882. job.revoked = Mock()
  883. job.revoked.return_value = True
  884. with self.assertRaises(TaskRevokedError):
  885. job.execute_using_pool(self.pool)
  886. def test_execute_using_pool__expired(self):
  887. tid = uuid()
  888. job = self.zRequest(id=tid, revoked_tasks=set())
  889. job.expires = 1232133
  890. job.revoked = Mock()
  891. job.revoked.return_value = True
  892. with self.assertRaises(TaskRevokedError):
  893. job.execute_using_pool(self.pool)
  894. def test_execute_using_pool(self):
  895. from celery.app.trace import trace_task_ret as trace
  896. weakref_ref = Mock(name='weakref.ref')
  897. job = self.zRequest(id=uuid(), revoked_tasks=set(), ref=weakref_ref)
  898. job.execute_using_pool(self.pool)
  899. self.pool.apply_async.assert_called_with(
  900. trace,
  901. args=(job.type, job.id, job.request_dict, job.body,
  902. job.content_type, job.content_encoding),
  903. accept_callback=job.on_accepted,
  904. timeout_callback=job.on_timeout,
  905. callback=job.on_success,
  906. error_callback=job.on_failure,
  907. soft_timeout=self.task.soft_time_limit,
  908. timeout=self.task.time_limit,
  909. correlation_id=job.id,
  910. )
  911. self.assertTrue(job._apply_result)
  912. weakref_ref.assert_called_with(self.pool.apply_async())
  913. self.assertIs(job._apply_result, weakref_ref())