test_request.py 33 KB


  1. # -*- coding: utf-8 -*-
  2. import numbers
  3. import os
  4. import pytest
  5. import signal
  6. import socket
  7. import sys
  8. from datetime import datetime, timedelta
  9. from time import monotonic
  10. from case import Mock, patch
  11. from billiard.einfo import ExceptionInfo
  12. from kombu.utils.encoding import default_encode, from_utf8, safe_str, safe_repr
  13. from kombu.utils.uuid import uuid
  14. from celery import states
  15. from celery.app.trace import (
  16. trace_task,
  17. _trace_task_ret,
  18. TraceInfo,
  19. mro_lookup,
  20. build_tracer,
  21. setup_worker_optimizations,
  22. reset_worker_optimizations,
  23. )
  24. from celery.exceptions import (
  25. Ignore,
  26. InvalidTaskError,
  27. Reject,
  28. Retry,
  29. TaskRevokedError,
  30. Terminated,
  31. WorkerLostError,
  32. )
  33. from celery.signals import task_revoked
  34. from celery.worker import request as module
  35. from celery.worker.request import (
  36. Request, create_request_cls, logger as req_logger,
  37. )
  38. from celery.worker.state import revoked
  39. class RequestCase:
  40. def setup(self):
  41. self.app.conf.result_serializer = 'pickle'
  42. @self.app.task(shared=False)
  43. def add(x, y, **kw_):
  44. return x + y
  45. self.add = add
  46. @self.app.task(shared=False)
  47. def mytask(i, **kwargs):
  48. return i ** i
  49. self.mytask = mytask
  50. @self.app.task(shared=False)
  51. def mytask_raising(i):
  52. raise KeyError(i)
  53. self.mytask_raising = mytask_raising
  54. def xRequest(self, name=None, id=None, args=None, kwargs=None,
  55. on_ack=None, on_reject=None, Request=Request, **head):
  56. args = [1] if args is None else args
  57. kwargs = {'f': 'x'} if kwargs is None else kwargs
  58. on_ack = on_ack or Mock(name='on_ack')
  59. on_reject = on_reject or Mock(name='on_reject')
  60. message = self.TaskMessage(
  61. name or self.mytask.name, id, args=args, kwargs=kwargs, **head
  62. )
  63. return Request(message, app=self.app,
  64. on_ack=on_ack, on_reject=on_reject)
  65. class test_mro_lookup:
  66. def test_order(self):
  67. class A:
  68. pass
  69. class B(A):
  70. pass
  71. class C(B):
  72. pass
  73. class D(C):
  74. @classmethod
  75. def mro(cls):
  76. return ()
  77. A.x = 10
  78. assert mro_lookup(C, 'x') == A
  79. assert mro_lookup(C, 'x', stop={A}) is None
  80. B.x = 10
  81. assert mro_lookup(C, 'x') == B
  82. C.x = 10
  83. assert mro_lookup(C, 'x') == C
  84. assert mro_lookup(D, 'x') is None
  85. def jail(app, task_id, name, args, kwargs):
  86. request = {'id': task_id}
  87. task = app.tasks[name]
  88. task.__trace__ = None # rebuild
  89. return trace_task(
  90. task, task_id, args, kwargs, request=request, eager=False, app=app,
  91. ).retval
  92. @pytest.mark.skipif(sys.version_info[0] > 3, reason='Py2 only')
  93. class test_default_encode:
  94. def test_jython(self):
  95. prev, sys.platform = sys.platform, 'java 1.6.1'
  96. try:
  97. assert default_encode(b'foo') == b'foo'
  98. finally:
  99. sys.platform = prev
  100. def test_cpython(self):
  101. prev, sys.platform = sys.platform, 'darwin'
  102. gfe, sys.getfilesystemencoding = (
  103. sys.getfilesystemencoding,
  104. lambda: 'utf-8',
  105. )
  106. try:
  107. assert default_encode(b'foo') == b'foo'
  108. finally:
  109. sys.platform = prev
  110. sys.getfilesystemencoding = gfe
  111. class test_Retry:
  112. def test_retry_semipredicate(self):
  113. try:
  114. raise Exception('foo')
  115. except Exception as exc:
  116. ret = Retry('Retrying task', exc)
  117. assert ret.exc == exc
  118. class test_trace_task(RequestCase):
  119. def test_process_cleanup_fails(self, patching):
  120. _logger = patching('celery.app.trace.logger')
  121. self.mytask.backend = Mock()
  122. self.mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  123. tid = uuid()
  124. ret = jail(self.app, tid, self.mytask.name, [2], {})
  125. assert ret == 4
  126. self.mytask.backend.mark_as_done.assert_called()
  127. assert 'Process cleanup failed' in _logger.error.call_args[0][0]
  128. def test_process_cleanup_BaseException(self):
  129. self.mytask.backend = Mock()
  130. self.mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  131. with pytest.raises(SystemExit):
  132. jail(self.app, uuid(), self.mytask.name, [2], {})
  133. def test_execute_jail_success(self):
  134. ret = jail(self.app, uuid(), self.mytask.name, [2], {})
  135. assert ret == 4
  136. def test_marked_as_started(self):
  137. _started = []
  138. def store_result(tid, meta, state, **kwargs):
  139. if state == states.STARTED:
  140. _started.append(tid)
  141. self.mytask.backend.store_result = Mock(name='store_result')
  142. self.mytask.backend.store_result.side_effect = store_result
  143. self.mytask.track_started = True
  144. tid = uuid()
  145. jail(self.app, tid, self.mytask.name, [2], {})
  146. assert tid in _started
  147. self.mytask.ignore_result = True
  148. tid = uuid()
  149. jail(self.app, tid, self.mytask.name, [2], {})
  150. assert tid not in _started
  151. def test_execute_jail_failure(self):
  152. ret = jail(
  153. self.app, uuid(), self.mytask_raising.name, [4], {},
  154. )
  155. assert isinstance(ret, ExceptionInfo)
  156. assert ret.exception.args == (4,)
  157. def test_execute_ignore_result(self):
  158. @self.app.task(shared=False, ignore_result=True)
  159. def ignores_result(i):
  160. return i ** i
  161. task_id = uuid()
  162. ret = jail(self.app, task_id, ignores_result.name, [4], {})
  163. assert ret == 256
  164. assert not self.app.AsyncResult(task_id).ready()
  165. class test_Request(RequestCase):
  166. def get_request(self, sig, Request=Request, **kwargs):
  167. return Request(
  168. self.task_message_from_sig(self.app, sig),
  169. on_ack=Mock(name='on_ack'),
  170. on_reject=Mock(name='on_reject'),
  171. eventer=Mock(name='eventer'),
  172. app=self.app,
  173. connection_errors=(socket.error,),
  174. task=sig.type,
  175. **kwargs
  176. )
  177. def test_shadow(self):
  178. assert self.get_request(
  179. self.add.s(2, 2).set(shadow='fooxyz')).name == 'fooxyz'
  180. def test_invalid_eta_raises_InvalidTaskError(self):
  181. with pytest.raises(InvalidTaskError):
  182. self.get_request(self.add.s(2, 2).set(eta='12345'))
  183. def test_invalid_expires_raises_InvalidTaskError(self):
  184. with pytest.raises(InvalidTaskError):
  185. self.get_request(self.add.s(2, 2).set(expires='12345'))
  186. def test_valid_expires_with_utc_makes_aware(self):
  187. with patch('celery.worker.request.maybe_make_aware') as mma:
  188. self.get_request(self.add.s(2, 2).set(expires=10),
  189. maybe_make_aware=mma)
  190. mma.assert_called()
  191. def test_maybe_expire_when_expires_is_None(self):
  192. req = self.get_request(self.add.s(2, 2))
  193. assert not req.maybe_expire()
  194. def test_on_retry_acks_if_late(self):
  195. self.add.acks_late = True
  196. req = self.get_request(self.add.s(2, 2))
  197. req.on_retry(Mock())
  198. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  199. def test_on_failure_Termianted(self):
  200. einfo = None
  201. try:
  202. raise Terminated('9')
  203. except Terminated:
  204. einfo = ExceptionInfo()
  205. assert einfo is not None
  206. req = self.get_request(self.add.s(2, 2))
  207. req.on_failure(einfo)
  208. req.eventer.send.assert_called_with(
  209. 'task-revoked',
  210. uuid=req.id, terminated=True, signum='9', expired=False,
  211. )
  212. def test_on_failure_propagates_MemoryError(self):
  213. einfo = None
  214. try:
  215. raise MemoryError()
  216. except MemoryError:
  217. einfo = ExceptionInfo(internal=True)
  218. assert einfo is not None
  219. req = self.get_request(self.add.s(2, 2))
  220. with pytest.raises(MemoryError):
  221. req.on_failure(einfo)
  222. def test_on_failure_Ignore_acknowledges(self):
  223. einfo = None
  224. try:
  225. raise Ignore()
  226. except Ignore:
  227. einfo = ExceptionInfo(internal=True)
  228. assert einfo is not None
  229. req = self.get_request(self.add.s(2, 2))
  230. req.on_failure(einfo)
  231. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  232. def test_on_failure_Reject_rejects(self):
  233. einfo = None
  234. try:
  235. raise Reject()
  236. except Reject:
  237. einfo = ExceptionInfo(internal=True)
  238. assert einfo is not None
  239. req = self.get_request(self.add.s(2, 2))
  240. req.on_failure(einfo)
  241. req.on_reject.assert_called_with(
  242. req_logger, req.connection_errors, False,
  243. )
  244. def test_on_failure_Reject_rejects_with_requeue(self):
  245. einfo = None
  246. try:
  247. raise Reject(requeue=True)
  248. except Reject:
  249. einfo = ExceptionInfo(internal=True)
  250. assert einfo is not None
  251. req = self.get_request(self.add.s(2, 2))
  252. req.on_failure(einfo)
  253. req.on_reject.assert_called_with(
  254. req_logger, req.connection_errors, True,
  255. )
  256. def test_on_failure_WorkerLostError_rejects_with_requeue(self):
  257. einfo = None
  258. try:
  259. raise WorkerLostError()
  260. except:
  261. einfo = ExceptionInfo(internal=True)
  262. req = self.get_request(self.add.s(2, 2))
  263. req.task.acks_late = True
  264. req.task.reject_on_worker_lost = True
  265. req.delivery_info['redelivered'] = False
  266. req.on_failure(einfo)
  267. req.on_reject.assert_called_with(
  268. req_logger, req.connection_errors, True)
  269. def test_on_failure_WorkerLostError_redelivered_None(self):
  270. einfo = None
  271. try:
  272. raise WorkerLostError()
  273. except:
  274. einfo = ExceptionInfo(internal=True)
  275. req = self.get_request(self.add.s(2, 2))
  276. req.task.acks_late = True
  277. req.task.reject_on_worker_lost = True
  278. req.delivery_info['redelivered'] = None
  279. req.on_failure(einfo)
  280. req.on_reject.assert_called_with(
  281. req_logger, req.connection_errors, True)
  282. def test_tzlocal_is_cached(self):
  283. req = self.get_request(self.add.s(2, 2))
  284. req._tzlocal = 'foo'
  285. assert req.tzlocal == 'foo'
  286. def test_task_wrapper_repr(self):
  287. assert repr(self.xRequest())
  288. def test_sets_store_errors(self):
  289. self.mytask.ignore_result = True
  290. job = self.xRequest()
  291. assert not job.store_errors
  292. self.mytask.store_errors_even_if_ignored = True
  293. job = self.xRequest()
  294. assert job.store_errors
  295. def test_send_event(self):
  296. job = self.xRequest()
  297. job.eventer = Mock(name='.eventer')
  298. job.send_event('task-frobulated')
  299. job.eventer.send.assert_called_with('task-frobulated', uuid=job.id)
  300. def test_send_events__disabled_at_task_level(self):
  301. job = self.xRequest()
  302. job.task.send_events = False
  303. job.eventer = Mock(name='.eventer')
  304. job.send_event('task-frobulated')
  305. job.eventer.send.assert_not_called()
  306. def test_on_retry(self):
  307. job = self.get_request(self.mytask.s(1, f='x'))
  308. job.eventer = Mock(name='.eventer')
  309. try:
  310. raise Retry('foo', KeyError('moofoobar'))
  311. except:
  312. einfo = ExceptionInfo()
  313. job.on_failure(einfo)
  314. job.eventer.send.assert_called_with(
  315. 'task-retried',
  316. uuid=job.id,
  317. exception=safe_repr(einfo.exception.exc),
  318. traceback=safe_str(einfo.traceback),
  319. )
  320. prev, module._does_info = module._does_info, False
  321. try:
  322. job.on_failure(einfo)
  323. finally:
  324. module._does_info = prev
  325. einfo.internal = True
  326. job.on_failure(einfo)
  327. def test_terminate__pool_ref(self):
  328. pool = Mock()
  329. signum = signal.SIGTERM
  330. job = self.get_request(self.mytask.s(1, f='x'))
  331. job._apply_result = Mock(name='_apply_result')
  332. with self.assert_signal_called(
  333. task_revoked, sender=job.task, request=job,
  334. terminated=True, expired=False, signum=signum):
  335. job.time_start = monotonic()
  336. job.worker_pid = 314
  337. job.terminate(pool, signal='TERM')
  338. job._apply_result().terminate.assert_called_with(signum)
  339. job._apply_result = Mock(name='_apply_result2')
  340. job._apply_result.return_value = None
  341. job.terminate(pool, signal='TERM')
  342. def test_terminate__task_started(self):
  343. pool = Mock()
  344. signum = signal.SIGTERM
  345. job = self.get_request(self.mytask.s(1, f='x'))
  346. with self.assert_signal_called(
  347. task_revoked, sender=job.task, request=job,
  348. terminated=True, expired=False, signum=signum):
  349. job.time_start = monotonic()
  350. job.worker_pid = 313
  351. job.terminate(pool, signal='TERM')
  352. pool.terminate_job.assert_called_with(job.worker_pid, signum)
  353. def test_terminate__task_reserved(self):
  354. pool = Mock()
  355. job = self.get_request(self.mytask.s(1, f='x'))
  356. job.time_start = None
  357. job.terminate(pool, signal='TERM')
  358. pool.terminate_job.assert_not_called()
  359. assert job._terminate_on_ack == (pool, 15)
  360. job.terminate(pool, signal='TERM')
  361. def test_revoked_expires_expired(self):
  362. job = self.get_request(self.mytask.s(1, f='x').set(
  363. expires=datetime.utcnow() - timedelta(days=1)
  364. ))
  365. with self.assert_signal_called(
  366. task_revoked, sender=job.task, request=job,
  367. terminated=False, expired=True, signum=None):
  368. job.revoked()
  369. assert job.id in revoked
  370. assert self.mytask.backend.get_status(job.id) == states.REVOKED
  371. def test_revoked_expires_not_expired(self):
  372. job = self.xRequest(
  373. expires=datetime.utcnow() + timedelta(days=1),
  374. )
  375. job.revoked()
  376. assert job.id not in revoked
  377. assert self.mytask.backend.get_state(job.id) != states.REVOKED
  378. def test_revoked_expires_ignore_result(self):
  379. self.mytask.ignore_result = True
  380. job = self.xRequest(
  381. expires=datetime.utcnow() - timedelta(days=1),
  382. )
  383. job.revoked()
  384. assert job.id in revoked
  385. assert self.mytask.backend.get_state(job.id) != states.REVOKED
  386. def test_already_revoked(self):
  387. job = self.xRequest()
  388. job._already_revoked = True
  389. assert job.revoked()
  390. def test_revoked(self):
  391. job = self.xRequest()
  392. with self.assert_signal_called(
  393. task_revoked, sender=job.task, request=job,
  394. terminated=False, expired=False, signum=None):
  395. revoked.add(job.id)
  396. assert job.revoked()
  397. assert job._already_revoked
  398. assert job.acknowledged
  399. def test_execute_does_not_execute_revoked(self):
  400. job = self.xRequest()
  401. revoked.add(job.id)
  402. job.execute()
  403. def test_execute_acks_late(self):
  404. self.mytask_raising.acks_late = True
  405. job = self.xRequest(
  406. name=self.mytask_raising.name,
  407. kwargs={},
  408. )
  409. job.execute()
  410. assert job.acknowledged
  411. job.execute()
  412. def test_execute_using_pool_does_not_execute_revoked(self):
  413. job = self.xRequest()
  414. revoked.add(job.id)
  415. with pytest.raises(TaskRevokedError):
  416. job.execute_using_pool(None)
  417. def test_on_accepted_acks_early(self):
  418. job = self.xRequest()
  419. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  420. assert job.acknowledged
  421. prev, module._does_debug = module._does_debug, False
  422. try:
  423. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  424. finally:
  425. module._does_debug = prev
  426. def test_on_accepted_acks_late(self):
  427. job = self.xRequest()
  428. self.mytask.acks_late = True
  429. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  430. assert not job.acknowledged
  431. def test_on_accepted_terminates(self):
  432. signum = signal.SIGTERM
  433. pool = Mock()
  434. job = self.xRequest()
  435. with self.assert_signal_called(
  436. task_revoked, sender=job.task, request=job,
  437. terminated=True, expired=False, signum=signum):
  438. job.terminate(pool, signal='TERM')
  439. assert not pool.terminate_job.call_count
  440. job.on_accepted(pid=314, time_accepted=monotonic())
  441. pool.terminate_job.assert_called_with(314, signum)
  442. def test_on_success_acks_early(self):
  443. job = self.xRequest()
  444. job.time_start = 1
  445. job.on_success((0, 42, 0.001))
  446. prev, module._does_info = module._does_info, False
  447. try:
  448. job.on_success((0, 42, 0.001))
  449. assert not job.acknowledged
  450. finally:
  451. module._does_info = prev
  452. def test_on_success_BaseException(self):
  453. job = self.xRequest()
  454. job.time_start = 1
  455. with pytest.raises(SystemExit):
  456. try:
  457. raise SystemExit()
  458. except SystemExit:
  459. job.on_success((1, ExceptionInfo(), 0.01))
  460. else:
  461. assert False
  462. def test_on_success_eventer(self):
  463. job = self.xRequest()
  464. job.time_start = 1
  465. job.eventer = Mock()
  466. job.eventer.send = Mock()
  467. job.on_success((0, 42, 0.001))
  468. job.eventer.send.assert_called()
  469. def test_on_success_when_failure(self):
  470. job = self.xRequest()
  471. job.time_start = 1
  472. job.on_failure = Mock()
  473. try:
  474. raise KeyError('foo')
  475. except Exception:
  476. job.on_success((1, ExceptionInfo(), 0.001))
  477. job.on_failure.assert_called()
  478. def test_on_success_acks_late(self):
  479. job = self.xRequest()
  480. job.time_start = 1
  481. self.mytask.acks_late = True
  482. job.on_success((0, 42, 0.001))
  483. assert job.acknowledged
  484. def test_on_failure_WorkerLostError(self):
  485. def get_ei():
  486. try:
  487. raise WorkerLostError('do re mi')
  488. except WorkerLostError:
  489. return ExceptionInfo()
  490. job = self.xRequest()
  491. exc_info = get_ei()
  492. job.on_failure(exc_info)
  493. assert self.mytask.backend.get_state(job.id) == states.FAILURE
  494. self.mytask.ignore_result = True
  495. exc_info = get_ei()
  496. job = self.xRequest()
  497. job.on_failure(exc_info)
  498. assert self.mytask.backend.get_state(job.id) == states.PENDING
  499. def test_on_failure_acks_late(self):
  500. job = self.xRequest()
  501. job.time_start = 1
  502. self.mytask.acks_late = True
  503. try:
  504. raise KeyError('foo')
  505. except KeyError:
  506. exc_info = ExceptionInfo()
  507. job.on_failure(exc_info)
  508. assert job.acknowledged
  509. def test_from_message_invalid_kwargs(self):
  510. m = self.TaskMessage(self.mytask.name, args=(), kwargs='foo')
  511. req = Request(m, app=self.app)
  512. with pytest.raises(InvalidTaskError):
  513. raise req.execute().exception
  514. def test_on_timeout(self, patching):
  515. warn = patching('celery.worker.request.warn')
  516. error = patching('celery.worker.request.error')
  517. job = self.xRequest()
  518. job.acknowledge = Mock(name='ack')
  519. job.task.acks_late = True
  520. job.on_timeout(soft=True, timeout=1337)
  521. assert 'Soft time limit' in warn.call_args[0][0]
  522. job.on_timeout(soft=False, timeout=1337)
  523. assert 'Hard time limit' in error.call_args[0][0]
  524. assert self.mytask.backend.get_state(job.id) == states.FAILURE
  525. job.acknowledge.assert_called_with()
  526. self.mytask.ignore_result = True
  527. job = self.xRequest()
  528. job.on_timeout(soft=True, timeout=1336)
  529. assert self.mytask.backend.get_state(job.id) == states.PENDING
  530. job = self.xRequest()
  531. job.acknowledge = Mock(name='ack')
  532. job.task.acks_late = False
  533. job.on_timeout(soft=True, timeout=1335)
  534. job.acknowledge.assert_not_called()
  535. def test_fast_trace_task(self):
  536. from celery.app import trace
  537. setup_worker_optimizations(self.app)
  538. assert trace.trace_task_ret is trace._fast_trace_task
  539. tid = uuid()
  540. message = self.TaskMessage(self.mytask.name, tid, args=[4])
  541. assert len(message.payload) == 3
  542. try:
  543. self.mytask.__trace__ = build_tracer(
  544. self.mytask.name, self.mytask, self.app.loader, 'test',
  545. app=self.app,
  546. )
  547. failed, res, runtime = trace.trace_task_ret(
  548. self.mytask.name, tid, message.headers, message.body,
  549. message.content_type, message.content_encoding)
  550. assert not failed
  551. assert res == repr(4 ** 4)
  552. assert runtime is not None
  553. assert isinstance(runtime, numbers.Real)
  554. finally:
  555. reset_worker_optimizations()
  556. assert trace.trace_task_ret is trace._trace_task_ret
  557. delattr(self.mytask, '__trace__')
  558. failed, res, runtime = trace.trace_task_ret(
  559. self.mytask.name, tid, message.headers, message.body,
  560. message.content_type, message.content_encoding, app=self.app,
  561. )
  562. assert not failed
  563. assert res == repr(4 ** 4)
  564. assert runtime is not None
  565. assert isinstance(runtime, numbers.Real)
  566. def test_trace_task_ret(self):
  567. self.mytask.__trace__ = build_tracer(
  568. self.mytask.name, self.mytask, self.app.loader, 'test',
  569. app=self.app,
  570. )
  571. tid = uuid()
  572. message = self.TaskMessage(self.mytask.name, tid, args=[4])
  573. _, R, _ = _trace_task_ret(
  574. self.mytask.name, tid, message.headers,
  575. message.body, message.content_type,
  576. message.content_encoding, app=self.app,
  577. )
  578. assert R == repr(4 ** 4)
  579. def test_trace_task_ret__no_trace(self):
  580. try:
  581. delattr(self.mytask, '__trace__')
  582. except AttributeError:
  583. pass
  584. tid = uuid()
  585. message = self.TaskMessage(self.mytask.name, tid, args=[4])
  586. _, R, _ = _trace_task_ret(
  587. self.mytask.name, tid, message.headers,
  588. message.body, message.content_type,
  589. message.content_encoding, app=self.app,
  590. )
  591. assert R == repr(4 ** 4)
  592. def test_trace_catches_exception(self):
  593. @self.app.task(request=None, shared=False)
  594. def raising():
  595. raise KeyError('baz')
  596. with pytest.warns(RuntimeWarning):
  597. res = trace_task(raising, uuid(), [], {}, app=self.app)[0]
  598. assert isinstance(res, ExceptionInfo)
  599. def test_worker_task_trace_handle_retry(self):
  600. tid = uuid()
  601. self.mytask.push_request(id=tid)
  602. try:
  603. raise ValueError('foo')
  604. except Exception as exc:
  605. try:
  606. raise Retry(str(exc), exc=exc)
  607. except Retry as exc:
  608. w = TraceInfo(states.RETRY, exc)
  609. w.handle_retry(
  610. self.mytask, self.mytask.request, store_errors=False,
  611. )
  612. assert self.mytask.backend.get_state(tid) == states.PENDING
  613. w.handle_retry(
  614. self.mytask, self.mytask.request, store_errors=True,
  615. )
  616. assert self.mytask.backend.get_state(tid) == states.RETRY
  617. finally:
  618. self.mytask.pop_request()
  619. def test_worker_task_trace_handle_failure(self):
  620. tid = uuid()
  621. self.mytask.push_request()
  622. try:
  623. self.mytask.request.id = tid
  624. try:
  625. raise ValueError('foo')
  626. except Exception as exc:
  627. w = TraceInfo(states.FAILURE, exc)
  628. w.handle_failure(
  629. self.mytask, self.mytask.request, store_errors=False,
  630. )
  631. assert self.mytask.backend.get_state(tid) == states.PENDING
  632. w.handle_failure(
  633. self.mytask, self.mytask.request, store_errors=True,
  634. )
  635. assert self.mytask.backend.get_state(tid) == states.FAILURE
  636. finally:
  637. self.mytask.pop_request()
  638. def test_from_message(self):
  639. us = 'æØåveéðƒeæ'
  640. tid = uuid()
  641. m = self.TaskMessage(
  642. self.mytask.name, tid, args=[2], kwargs={us: 'bar'},
  643. )
  644. job = Request(m, app=self.app)
  645. assert isinstance(job, Request)
  646. assert job.name == self.mytask.name
  647. assert job.id == tid
  648. assert job.message is m
  649. def test_from_message_empty_args(self):
  650. tid = uuid()
  651. m = self.TaskMessage(self.mytask.name, tid, args=[], kwargs={})
  652. job = Request(m, app=self.app)
  653. assert isinstance(job, Request)
  654. def test_from_message_missing_required_fields(self):
  655. m = self.TaskMessage(self.mytask.name)
  656. m.headers.clear()
  657. with pytest.raises(KeyError):
  658. Request(m, app=self.app)
  659. def test_from_message_nonexistant_task(self):
  660. m = self.TaskMessage(
  661. 'cu.mytask.doesnotexist',
  662. args=[2], kwargs={'æØåveéðƒeæ': 'bar'},
  663. )
  664. with pytest.raises(KeyError):
  665. Request(m, app=self.app)
  666. def test_execute(self):
  667. tid = uuid()
  668. job = self.xRequest(id=tid, args=[4], kwargs={})
  669. assert job.execute() == 256
  670. meta = self.mytask.backend.get_task_meta(tid)
  671. assert meta['status'] == states.SUCCESS
  672. assert meta['result'] == 256
  673. def test_execute_success_no_kwargs(self):
  674. @self.app.task # traverses coverage for decorator without parens
  675. def mytask_no_kwargs(i):
  676. return i ** i
  677. tid = uuid()
  678. job = self.xRequest(
  679. name=mytask_no_kwargs.name,
  680. id=tid,
  681. args=[4],
  682. kwargs={},
  683. )
  684. assert job.execute() == 256
  685. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  686. assert meta['result'] == 256
  687. assert meta['status'] == states.SUCCESS
  688. def test_execute_ack(self):
  689. scratch = {'ACK': False}
  690. def on_ack(*args, **kwargs):
  691. scratch['ACK'] = True
  692. tid = uuid()
  693. job = self.xRequest(id=tid, args=[4], on_ack=on_ack)
  694. assert job.execute() == 256
  695. meta = self.mytask.backend.get_task_meta(tid)
  696. assert scratch['ACK']
  697. assert meta['result'] == 256
  698. assert meta['status'] == states.SUCCESS
  699. def test_execute_fail(self):
  700. tid = uuid()
  701. job = self.xRequest(
  702. name=self.mytask_raising.name,
  703. id=tid,
  704. args=[4],
  705. kwargs={},
  706. )
  707. assert isinstance(job.execute(), ExceptionInfo)
  708. assert self.mytask_raising.backend.serializer == 'pickle'
  709. meta = self.mytask_raising.backend.get_task_meta(tid)
  710. assert meta['status'] == states.FAILURE
  711. assert isinstance(meta['result'], KeyError)
  712. def test_execute_using_pool(self):
  713. tid = uuid()
  714. job = self.xRequest(id=tid, args=[4])
  715. p = Mock()
  716. job.execute_using_pool(p)
  717. p.apply_async.assert_called_once()
  718. args = p.apply_async.call_args[1]['args']
  719. assert args[0] == self.mytask.name
  720. assert args[1] == tid
  721. assert args[2] == job.request_dict
  722. assert args[3] == job.message.body
  723. def _test_on_failure(self, exception, **kwargs):
  724. tid = uuid()
  725. job = self.xRequest(id=tid, args=[4])
  726. job.send_event = Mock(name='send_event')
  727. job.task.backend.mark_as_failure = Mock(name='mark_as_failure')
  728. try:
  729. raise exception
  730. except type(exception):
  731. exc_info = ExceptionInfo()
  732. job.on_failure(exc_info, **kwargs)
  733. job.send_event.assert_called()
  734. return job
  735. def test_on_failure(self):
  736. self._test_on_failure(Exception('Inside unit tests'))
  737. def test_on_failure__unicode_exception(self):
  738. self._test_on_failure(Exception('Бобры атакуют'))
  739. def test_on_failure__utf8_exception(self):
  740. self._test_on_failure(Exception(
  741. from_utf8('Бобры атакуют')))
  742. def test_on_failure__WorkerLostError(self):
  743. exc = WorkerLostError()
  744. job = self._test_on_failure(exc)
  745. job.task.backend.mark_as_failure.assert_called_with(
  746. job.id, exc, request=job, store_result=True,
  747. )
  748. def test_on_failure__return_ok(self):
  749. self._test_on_failure(KeyError(), return_ok=True)
  750. def test_reject(self):
  751. job = self.xRequest(id=uuid())
  752. job.on_reject = Mock(name='on_reject')
  753. job.reject(requeue=True)
  754. job.on_reject.assert_called_with(
  755. req_logger, job.connection_errors, True,
  756. )
  757. assert job.acknowledged
  758. job.on_reject.reset_mock()
  759. job.reject(requeue=True)
  760. job.on_reject.assert_not_called()
  761. def test_group(self):
  762. gid = uuid()
  763. job = self.xRequest(id=uuid(), group=gid)
  764. assert job.group == gid
  765. class test_create_request_class(RequestCase):
  766. def setup(self):
  767. self.task = Mock(name='task')
  768. self.pool = Mock(name='pool')
  769. self.eventer = Mock(name='eventer')
  770. RequestCase.setup(self)
  771. def create_request_cls(self, **kwargs):
  772. return create_request_cls(
  773. Request, self.task, self.pool, 'foo', self.eventer, **kwargs
  774. )
  775. def zRequest(self, Request=None, revoked_tasks=None, ref=None, **kwargs):
  776. return self.xRequest(
  777. Request=Request or self.create_request_cls(
  778. ref=ref,
  779. revoked_tasks=revoked_tasks,
  780. ),
  781. **kwargs)
  782. def test_on_success(self):
  783. self.zRequest(id=uuid()).on_success((False, 'hey', 3.1222))
  784. def test_on_success__SystemExit(self,
  785. errors=(SystemExit, KeyboardInterrupt)):
  786. for exc in errors:
  787. einfo = None
  788. try:
  789. raise exc()
  790. except exc:
  791. einfo = ExceptionInfo()
  792. with pytest.raises(exc):
  793. self.zRequest(id=uuid()).on_success((True, einfo, 1.0))
  794. def test_on_success__calls_failure(self):
  795. job = self.zRequest(id=uuid())
  796. einfo = Mock(name='einfo')
  797. job.on_failure = Mock(name='on_failure')
  798. job.on_success((True, einfo, 1.0))
  799. job.on_failure.assert_called_with(einfo, return_ok=True)
  800. def test_on_success__acks_late_enabled(self):
  801. self.task.acks_late = True
  802. job = self.zRequest(id=uuid())
  803. job.acknowledge = Mock(name='ack')
  804. job.on_success((False, 'foo', 1.0))
  805. job.acknowledge.assert_called_with()
  806. def test_on_success__acks_late_disabled(self):
  807. self.task.acks_late = False
  808. job = self.zRequest(id=uuid())
  809. job.acknowledge = Mock(name='ack')
  810. job.on_success((False, 'foo', 1.0))
  811. job.acknowledge.assert_not_called()
  812. def test_on_success__no_events(self):
  813. self.eventer = None
  814. job = self.zRequest(id=uuid())
  815. job.send_event = Mock(name='send_event')
  816. job.on_success((False, 'foo', 1.0))
  817. job.send_event.assert_not_called()
  818. def test_on_success__with_events(self):
  819. job = self.zRequest(id=uuid())
  820. job.send_event = Mock(name='send_event')
  821. job.on_success((False, 'foo', 1.0))
  822. job.send_event.assert_called_with(
  823. 'task-succeeded', result='foo', runtime=1.0,
  824. )
  825. def test_execute_using_pool__revoked(self):
  826. tid = uuid()
  827. job = self.zRequest(id=tid, revoked_tasks={tid})
  828. job.revoked = Mock()
  829. job.revoked.return_value = True
  830. with pytest.raises(TaskRevokedError):
  831. job.execute_using_pool(self.pool)
  832. def test_execute_using_pool__expired(self):
  833. tid = uuid()
  834. job = self.zRequest(id=tid, revoked_tasks=set())
  835. job.expires = 1232133
  836. job.revoked = Mock()
  837. job.revoked.return_value = True
  838. with pytest.raises(TaskRevokedError):
  839. job.execute_using_pool(self.pool)
  840. def test_execute_using_pool(self):
  841. from celery.app.trace import trace_task_ret as trace
  842. weakref_ref = Mock(name='weakref.ref')
  843. job = self.zRequest(id=uuid(), revoked_tasks=set(), ref=weakref_ref)
  844. job.execute_using_pool(self.pool)
  845. self.pool.apply_async.assert_called_with(
  846. trace,
  847. args=(job.type, job.id, job.request_dict, job.body,
  848. job.content_type, job.content_encoding),
  849. accept_callback=job.on_accepted,
  850. timeout_callback=job.on_timeout,
  851. callback=job.on_success,
  852. error_callback=job.on_failure,
  853. soft_timeout=self.task.soft_time_limit,
  854. timeout=self.task.time_limit,
  855. correlation_id=job.id,
  856. )
  857. assert job._apply_result
  858. weakref_ref.assert_called_with(self.pool.apply_async())
  859. assert job._apply_result is weakref_ref()