test_request.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import numbers
  4. import os
  5. import signal
  6. import socket
  7. import sys
  8. from datetime import datetime, timedelta
  9. from time import time
  10. import pytest
  11. from billiard.einfo import ExceptionInfo
  12. from case import Mock, patch
  13. from kombu.utils.encoding import (default_encode, from_utf8, safe_repr,
  14. safe_str)
  15. from kombu.utils.uuid import uuid
  16. from celery import states
  17. from celery.app.trace import (TraceInfo, _trace_task_ret, build_tracer,
  18. mro_lookup, reset_worker_optimizations,
  19. setup_worker_optimizations, trace_task)
  20. from celery.exceptions import (Ignore, InvalidTaskError, Reject, Retry,
  21. TaskRevokedError, Terminated, WorkerLostError)
  22. from celery.five import monotonic
  23. from celery.signals import task_revoked
  24. from celery.worker import request as module
  25. from celery.worker import strategy
  26. from celery.worker.request import Request, create_request_cls
  27. from celery.worker.request import logger as req_logger
  28. from celery.worker.state import revoked
  29. class RequestCase:
  30. def setup(self):
  31. self.app.conf.result_serializer = 'pickle'
  32. @self.app.task(shared=False)
  33. def add(x, y, **kw_):
  34. return x + y
  35. self.add = add
  36. @self.app.task(shared=False)
  37. def mytask(i, **kwargs):
  38. return i ** i
  39. self.mytask = mytask
  40. @self.app.task(shared=False)
  41. def mytask_raising(i):
  42. raise KeyError(i)
  43. self.mytask_raising = mytask_raising
  44. def xRequest(self, name=None, id=None, args=None, kwargs=None,
  45. on_ack=None, on_reject=None, Request=Request, **head):
  46. args = [1] if args is None else args
  47. kwargs = {'f': 'x'} if kwargs is None else kwargs
  48. on_ack = on_ack or Mock(name='on_ack')
  49. on_reject = on_reject or Mock(name='on_reject')
  50. message = self.TaskMessage(
  51. name or self.mytask.name, id, args=args, kwargs=kwargs, **head
  52. )
  53. return Request(message, app=self.app,
  54. on_ack=on_ack, on_reject=on_reject)
  55. class test_mro_lookup:
  56. def test_order(self):
  57. class A(object):
  58. pass
  59. class B(A):
  60. pass
  61. class C(B):
  62. pass
  63. class D(C):
  64. @classmethod
  65. def mro(cls):
  66. return ()
  67. A.x = 10
  68. assert mro_lookup(C, 'x') == A
  69. assert mro_lookup(C, 'x', stop={A}) is None
  70. B.x = 10
  71. assert mro_lookup(C, 'x') == B
  72. C.x = 10
  73. assert mro_lookup(C, 'x') == C
  74. assert mro_lookup(D, 'x') is None
  75. def jail(app, task_id, name, args, kwargs):
  76. request = {'id': task_id}
  77. task = app.tasks[name]
  78. task.__trace__ = None # rebuild
  79. return trace_task(
  80. task, task_id, args, kwargs, request=request, eager=False, app=app,
  81. ).retval
  82. @pytest.mark.skipif(sys.version_info[0] > 3, reason='Py2 only')
  83. class test_default_encode:
  84. def test_jython(self):
  85. prev, sys.platform = sys.platform, 'java 1.6.1'
  86. try:
  87. assert default_encode(b'foo') == b'foo'
  88. finally:
  89. sys.platform = prev
  90. def test_cpython(self):
  91. prev, sys.platform = sys.platform, 'darwin'
  92. gfe, sys.getfilesystemencoding = (
  93. sys.getfilesystemencoding,
  94. lambda: 'utf-8',
  95. )
  96. try:
  97. assert default_encode(b'foo') == b'foo'
  98. finally:
  99. sys.platform = prev
  100. sys.getfilesystemencoding = gfe
  101. class test_Retry:
  102. def test_retry_semipredicate(self):
  103. try:
  104. raise Exception('foo')
  105. except Exception as exc:
  106. ret = Retry('Retrying task', exc)
  107. assert ret.exc == exc
  108. class test_trace_task(RequestCase):
  109. def test_process_cleanup_fails(self, patching):
  110. _logger = patching('celery.app.trace.logger')
  111. self.mytask.backend = Mock()
  112. self.mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  113. tid = uuid()
  114. ret = jail(self.app, tid, self.mytask.name, [2], {})
  115. assert ret == 4
  116. self.mytask.backend.mark_as_done.assert_called()
  117. assert 'Process cleanup failed' in _logger.error.call_args[0][0]
  118. def test_process_cleanup_BaseException(self):
  119. self.mytask.backend = Mock()
  120. self.mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  121. with pytest.raises(SystemExit):
  122. jail(self.app, uuid(), self.mytask.name, [2], {})
  123. def test_execute_jail_success(self):
  124. ret = jail(self.app, uuid(), self.mytask.name, [2], {})
  125. assert ret == 4
  126. def test_marked_as_started(self):
  127. _started = []
  128. def store_result(tid, meta, state, **kwargs):
  129. if state == states.STARTED:
  130. _started.append(tid)
  131. self.mytask.backend.store_result = Mock(name='store_result')
  132. self.mytask.backend.store_result.side_effect = store_result
  133. self.mytask.track_started = True
  134. tid = uuid()
  135. jail(self.app, tid, self.mytask.name, [2], {})
  136. assert tid in _started
  137. self.mytask.ignore_result = True
  138. tid = uuid()
  139. jail(self.app, tid, self.mytask.name, [2], {})
  140. assert tid not in _started
  141. def test_execute_jail_failure(self):
  142. ret = jail(
  143. self.app, uuid(), self.mytask_raising.name, [4], {},
  144. )
  145. assert isinstance(ret, ExceptionInfo)
  146. assert ret.exception.args == (4,)
  147. def test_execute_ignore_result(self):
  148. @self.app.task(shared=False, ignore_result=True)
  149. def ignores_result(i):
  150. return i ** i
  151. task_id = uuid()
  152. ret = jail(self.app, task_id, ignores_result.name, [4], {})
  153. assert ret == 256
  154. assert not self.app.AsyncResult(task_id).ready()
  155. class test_Request(RequestCase):
  156. def get_request(self,
  157. sig,
  158. Request=Request,
  159. exclude_headers=None,
  160. **kwargs):
  161. msg = self.task_message_from_sig(self.app, sig)
  162. headers = None
  163. if exclude_headers:
  164. headers = msg.headers
  165. for header in exclude_headers:
  166. headers.pop(header)
  167. return Request(
  168. msg,
  169. on_ack=Mock(name='on_ack'),
  170. on_reject=Mock(name='on_reject'),
  171. eventer=Mock(name='eventer'),
  172. app=self.app,
  173. connection_errors=(socket.error,),
  174. task=sig.type,
  175. headers=headers,
  176. **kwargs
  177. )
  178. def test_shadow(self):
  179. assert self.get_request(
  180. self.add.s(2, 2).set(shadow='fooxyz')).name == 'fooxyz'
  181. def test_no_shadow_header(self):
  182. request = self.get_request(self.add.s(2, 2),
  183. exclude_headers=['shadow'])
  184. assert request.name == 't.unit.worker.test_request.add'
  185. def test_invalid_eta_raises_InvalidTaskError(self):
  186. with pytest.raises(InvalidTaskError):
  187. self.get_request(self.add.s(2, 2).set(eta='12345'))
  188. def test_invalid_expires_raises_InvalidTaskError(self):
  189. with pytest.raises(InvalidTaskError):
  190. self.get_request(self.add.s(2, 2).set(expires='12345'))
  191. def test_valid_expires_with_utc_makes_aware(self):
  192. with patch('celery.worker.request.maybe_make_aware') as mma:
  193. self.get_request(self.add.s(2, 2).set(expires=10),
  194. maybe_make_aware=mma)
  195. mma.assert_called()
  196. def test_maybe_expire_when_expires_is_None(self):
  197. req = self.get_request(self.add.s(2, 2))
  198. assert not req.maybe_expire()
  199. def test_on_retry_acks_if_late(self):
  200. self.add.acks_late = True
  201. req = self.get_request(self.add.s(2, 2))
  202. req.on_retry(Mock())
  203. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  204. def test_on_failure_Terminated(self):
  205. einfo = None
  206. try:
  207. raise Terminated('9')
  208. except Terminated:
  209. einfo = ExceptionInfo()
  210. assert einfo is not None
  211. req = self.get_request(self.add.s(2, 2))
  212. req.on_failure(einfo)
  213. req.eventer.send.assert_called_with(
  214. 'task-revoked',
  215. uuid=req.id, terminated=True, signum='9', expired=False,
  216. )
  217. def test_on_failure_propagates_MemoryError(self):
  218. einfo = None
  219. try:
  220. raise MemoryError()
  221. except MemoryError:
  222. einfo = ExceptionInfo(internal=True)
  223. assert einfo is not None
  224. req = self.get_request(self.add.s(2, 2))
  225. with pytest.raises(MemoryError):
  226. req.on_failure(einfo)
  227. def test_on_failure_Ignore_acknowledges(self):
  228. einfo = None
  229. try:
  230. raise Ignore()
  231. except Ignore:
  232. einfo = ExceptionInfo(internal=True)
  233. assert einfo is not None
  234. req = self.get_request(self.add.s(2, 2))
  235. req.on_failure(einfo)
  236. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  237. def test_on_failure_Reject_rejects(self):
  238. einfo = None
  239. try:
  240. raise Reject()
  241. except Reject:
  242. einfo = ExceptionInfo(internal=True)
  243. assert einfo is not None
  244. req = self.get_request(self.add.s(2, 2))
  245. req.on_failure(einfo)
  246. req.on_reject.assert_called_with(
  247. req_logger, req.connection_errors, False,
  248. )
  249. def test_on_failure_Reject_rejects_with_requeue(self):
  250. einfo = None
  251. try:
  252. raise Reject(requeue=True)
  253. except Reject:
  254. einfo = ExceptionInfo(internal=True)
  255. assert einfo is not None
  256. req = self.get_request(self.add.s(2, 2))
  257. req.on_failure(einfo)
  258. req.on_reject.assert_called_with(
  259. req_logger, req.connection_errors, True,
  260. )
  261. def test_on_failure_WorkerLostError_rejects_with_requeue(self):
  262. einfo = None
  263. try:
  264. raise WorkerLostError()
  265. except:
  266. einfo = ExceptionInfo(internal=True)
  267. req = self.get_request(self.add.s(2, 2))
  268. req.task.acks_late = True
  269. req.task.reject_on_worker_lost = True
  270. req.delivery_info['redelivered'] = False
  271. req.on_failure(einfo)
  272. req.on_reject.assert_called_with(
  273. req_logger, req.connection_errors, True)
  274. def test_on_failure_WorkerLostError_redelivered_None(self):
  275. einfo = None
  276. try:
  277. raise WorkerLostError()
  278. except:
  279. einfo = ExceptionInfo(internal=True)
  280. req = self.get_request(self.add.s(2, 2))
  281. req.task.acks_late = True
  282. req.task.reject_on_worker_lost = True
  283. req.delivery_info['redelivered'] = None
  284. req.on_failure(einfo)
  285. req.on_reject.assert_called_with(
  286. req_logger, req.connection_errors, True)
  287. def test_tzlocal_is_cached(self):
  288. req = self.get_request(self.add.s(2, 2))
  289. req._tzlocal = 'foo'
  290. assert req.tzlocal == 'foo'
  291. def test_task_wrapper_repr(self):
  292. assert repr(self.xRequest())
  293. def test_sets_store_errors(self):
  294. self.mytask.ignore_result = True
  295. job = self.xRequest()
  296. assert not job.store_errors
  297. self.mytask.store_errors_even_if_ignored = True
  298. job = self.xRequest()
  299. assert job.store_errors
  300. def test_send_event(self):
  301. job = self.xRequest()
  302. job.eventer = Mock(name='.eventer')
  303. job.send_event('task-frobulated')
  304. job.eventer.send.assert_called_with('task-frobulated', uuid=job.id)
  305. def test_send_events__disabled_at_task_level(self):
  306. job = self.xRequest()
  307. job.task.send_events = False
  308. job.eventer = Mock(name='.eventer')
  309. job.send_event('task-frobulated')
  310. job.eventer.send.assert_not_called()
  311. def test_on_retry(self):
  312. job = self.get_request(self.mytask.s(1, f='x'))
  313. job.eventer = Mock(name='.eventer')
  314. try:
  315. raise Retry('foo', KeyError('moofoobar'))
  316. except:
  317. einfo = ExceptionInfo()
  318. job.on_failure(einfo)
  319. job.eventer.send.assert_called_with(
  320. 'task-retried',
  321. uuid=job.id,
  322. exception=safe_repr(einfo.exception.exc),
  323. traceback=safe_str(einfo.traceback),
  324. )
  325. prev, module._does_info = module._does_info, False
  326. try:
  327. job.on_failure(einfo)
  328. finally:
  329. module._does_info = prev
  330. einfo.internal = True
  331. job.on_failure(einfo)
  332. def test_compat_properties(self):
  333. job = self.xRequest()
  334. assert job.task_id == job.id
  335. assert job.task_name == job.name
  336. job.task_id = 'ID'
  337. assert job.id == 'ID'
  338. job.task_name = 'NAME'
  339. assert job.name == 'NAME'
  340. def test_terminate__pool_ref(self):
  341. pool = Mock()
  342. signum = signal.SIGTERM
  343. job = self.get_request(self.mytask.s(1, f='x'))
  344. job._apply_result = Mock(name='_apply_result')
  345. with self.assert_signal_called(
  346. task_revoked, sender=job.task, request=job,
  347. terminated=True, expired=False, signum=signum):
  348. job.time_start = monotonic()
  349. job.worker_pid = 314
  350. job.terminate(pool, signal='TERM')
  351. job._apply_result().terminate.assert_called_with(signum)
  352. job._apply_result = Mock(name='_apply_result2')
  353. job._apply_result.return_value = None
  354. job.terminate(pool, signal='TERM')
  355. def test_terminate__task_started(self):
  356. pool = Mock()
  357. signum = signal.SIGTERM
  358. job = self.get_request(self.mytask.s(1, f='x'))
  359. with self.assert_signal_called(
  360. task_revoked, sender=job.task, request=job,
  361. terminated=True, expired=False, signum=signum):
  362. job.time_start = monotonic()
  363. job.worker_pid = 313
  364. job.terminate(pool, signal='TERM')
  365. pool.terminate_job.assert_called_with(job.worker_pid, signum)
  366. def test_terminate__task_reserved(self):
  367. pool = Mock()
  368. job = self.get_request(self.mytask.s(1, f='x'))
  369. job.time_start = None
  370. job.terminate(pool, signal='TERM')
  371. pool.terminate_job.assert_not_called()
  372. assert job._terminate_on_ack == (pool, 15)
  373. job.terminate(pool, signal='TERM')
  374. def test_revoked_expires_expired(self):
  375. job = self.get_request(self.mytask.s(1, f='x').set(
  376. expires=datetime.utcnow() - timedelta(days=1)
  377. ))
  378. with self.assert_signal_called(
  379. task_revoked, sender=job.task, request=job,
  380. terminated=False, expired=True, signum=None):
  381. job.revoked()
  382. assert job.id in revoked
  383. self.app.set_current()
  384. assert self.mytask.backend.get_status(job.id) == states.REVOKED
  385. def test_revoked_expires_not_expired(self):
  386. job = self.xRequest(
  387. expires=datetime.utcnow() + timedelta(days=1),
  388. )
  389. job.revoked()
  390. assert job.id not in revoked
  391. assert self.mytask.backend.get_status(job.id) != states.REVOKED
  392. def test_revoked_expires_ignore_result(self):
  393. self.mytask.ignore_result = True
  394. job = self.xRequest(
  395. expires=datetime.utcnow() - timedelta(days=1),
  396. )
  397. job.revoked()
  398. assert job.id in revoked
  399. assert self.mytask.backend.get_status(job.id) != states.REVOKED
  400. def test_already_revoked(self):
  401. job = self.xRequest()
  402. job._already_revoked = True
  403. assert job.revoked()
  404. def test_revoked(self):
  405. job = self.xRequest()
  406. with self.assert_signal_called(
  407. task_revoked, sender=job.task, request=job,
  408. terminated=False, expired=False, signum=None):
  409. revoked.add(job.id)
  410. assert job.revoked()
  411. assert job._already_revoked
  412. assert job.acknowledged
  413. def test_execute_does_not_execute_revoked(self):
  414. job = self.xRequest()
  415. revoked.add(job.id)
  416. job.execute()
  417. def test_execute_acks_late(self):
  418. self.mytask_raising.acks_late = True
  419. job = self.xRequest(
  420. name=self.mytask_raising.name,
  421. kwargs={},
  422. )
  423. job.execute()
  424. assert job.acknowledged
  425. job.execute()
  426. def test_execute_using_pool_does_not_execute_revoked(self):
  427. job = self.xRequest()
  428. revoked.add(job.id)
  429. with pytest.raises(TaskRevokedError):
  430. job.execute_using_pool(None)
  431. def test_on_accepted_acks_early(self):
  432. job = self.xRequest()
  433. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  434. assert job.acknowledged
  435. prev, module._does_debug = module._does_debug, False
  436. try:
  437. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  438. finally:
  439. module._does_debug = prev
  440. def test_on_accepted_acks_late(self):
  441. job = self.xRequest()
  442. self.mytask.acks_late = True
  443. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  444. assert not job.acknowledged
  445. def test_on_accepted_terminates(self):
  446. signum = signal.SIGTERM
  447. pool = Mock()
  448. job = self.xRequest()
  449. with self.assert_signal_called(
  450. task_revoked, sender=job.task, request=job,
  451. terminated=True, expired=False, signum=signum):
  452. job.terminate(pool, signal='TERM')
  453. assert not pool.terminate_job.call_count
  454. job.on_accepted(pid=314, time_accepted=monotonic())
  455. pool.terminate_job.assert_called_with(314, signum)
  456. def test_on_accepted_time_start(self):
  457. job = self.xRequest()
  458. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  459. assert time() - job.time_start < 1
  460. def test_on_success_acks_early(self):
  461. job = self.xRequest()
  462. job.time_start = 1
  463. job.on_success((0, 42, 0.001))
  464. prev, module._does_info = module._does_info, False
  465. try:
  466. job.on_success((0, 42, 0.001))
  467. assert not job.acknowledged
  468. finally:
  469. module._does_info = prev
  470. def test_on_success_BaseException(self):
  471. job = self.xRequest()
  472. job.time_start = 1
  473. with pytest.raises(SystemExit):
  474. try:
  475. raise SystemExit()
  476. except SystemExit:
  477. job.on_success((1, ExceptionInfo(), 0.01))
  478. else:
  479. assert False
  480. def test_on_success_eventer(self):
  481. job = self.xRequest()
  482. job.time_start = 1
  483. job.eventer = Mock()
  484. job.eventer.send = Mock()
  485. job.on_success((0, 42, 0.001))
  486. job.eventer.send.assert_called()
  487. def test_on_success_when_failure(self):
  488. job = self.xRequest()
  489. job.time_start = 1
  490. job.on_failure = Mock()
  491. try:
  492. raise KeyError('foo')
  493. except Exception:
  494. job.on_success((1, ExceptionInfo(), 0.001))
  495. job.on_failure.assert_called()
  496. def test_on_success_acks_late(self):
  497. job = self.xRequest()
  498. job.time_start = 1
  499. self.mytask.acks_late = True
  500. job.on_success((0, 42, 0.001))
  501. assert job.acknowledged
  502. def test_on_failure_WorkerLostError(self):
  503. def get_ei():
  504. try:
  505. raise WorkerLostError('do re mi')
  506. except WorkerLostError:
  507. return ExceptionInfo()
  508. job = self.xRequest()
  509. exc_info = get_ei()
  510. job.on_failure(exc_info)
  511. self.app.set_current()
  512. assert self.mytask.backend.get_status(job.id) == states.FAILURE
  513. self.mytask.ignore_result = True
  514. exc_info = get_ei()
  515. job = self.xRequest()
  516. job.on_failure(exc_info)
  517. assert self.mytask.backend.get_status(job.id) == states.PENDING
  518. def test_on_failure_acks_late(self):
  519. job = self.xRequest()
  520. job.time_start = 1
  521. self.mytask.acks_late = True
  522. try:
  523. raise KeyError('foo')
  524. except KeyError:
  525. exc_info = ExceptionInfo()
  526. job.on_failure(exc_info)
  527. assert job.acknowledged
  528. def test_on_failure_acks_on_failure_or_timeout(self):
  529. job = self.xRequest()
  530. job.time_start = 1
  531. self.mytask.acks_late = True
  532. self.mytask.acks_on_failure_or_timeout = False
  533. try:
  534. raise KeyError('foo')
  535. except KeyError:
  536. exc_info = ExceptionInfo()
  537. job.on_failure(exc_info)
  538. assert job.acknowledged is False
  539. def test_from_message_invalid_kwargs(self):
  540. m = self.TaskMessage(self.mytask.name, args=(), kwargs='foo')
  541. req = Request(m, app=self.app)
  542. with pytest.raises(InvalidTaskError):
  543. raise req.execute().exception
  544. def test_on_hard_timeout_acks_late(self, patching):
  545. error = patching('celery.worker.request.error')
  546. job = self.xRequest()
  547. job.acknowledge = Mock(name='ack')
  548. job.task.acks_late = True
  549. job.on_timeout(soft=False, timeout=1337)
  550. assert 'Hard time limit' in error.call_args[0][0]
  551. assert self.mytask.backend.get_status(job.id) == states.FAILURE
  552. job.acknowledge.assert_called_with()
  553. job = self.xRequest()
  554. job.acknowledge = Mock(name='ack')
  555. job.task.acks_late = False
  556. job.on_timeout(soft=False, timeout=1335)
  557. job.acknowledge.assert_not_called()
  558. def test_on_hard_timeout_acks_on_failure_or_timeout(self, patching):
  559. error = patching('celery.worker.request.error')
  560. job = self.xRequest()
  561. job.acknowledge = Mock(name='ack')
  562. job.task.acks_late = True
  563. job.task.acks_on_failure_or_timeout = True
  564. job.on_timeout(soft=False, timeout=1337)
  565. assert 'Hard time limit' in error.call_args[0][0]
  566. assert self.mytask.backend.get_status(job.id) == states.FAILURE
  567. job.acknowledge.assert_called_with()
  568. job = self.xRequest()
  569. job.acknowledge = Mock(name='ack')
  570. job.task.acks_late = True
  571. job.task.acks_on_failure_or_timeout = False
  572. job.on_timeout(soft=False, timeout=1337)
  573. assert 'Hard time limit' in error.call_args[0][0]
  574. assert self.mytask.backend.get_status(job.id) == states.FAILURE
  575. job.acknowledge.assert_not_called()
  576. job = self.xRequest()
  577. job.acknowledge = Mock(name='ack')
  578. job.task.acks_late = False
  579. job.task.acks_on_failure_or_timeout = True
  580. job.on_timeout(soft=False, timeout=1335)
  581. job.acknowledge.assert_not_called()
  582. def test_on_soft_timeout(self, patching):
  583. warn = patching('celery.worker.request.warn')
  584. job = self.xRequest()
  585. job.acknowledge = Mock(name='ack')
  586. job.task.acks_late = True
  587. job.on_timeout(soft=True, timeout=1337)
  588. assert 'Soft time limit' in warn.call_args[0][0]
  589. assert self.mytask.backend.get_status(job.id) == states.PENDING
  590. job.acknowledge.assert_not_called()
  591. self.mytask.ignore_result = True
  592. job = self.xRequest()
  593. job.on_timeout(soft=True, timeout=1336)
  594. assert self.mytask.backend.get_status(job.id) == states.PENDING
  595. def test_fast_trace_task(self):
  596. from celery.app import trace
  597. setup_worker_optimizations(self.app)
  598. assert trace.trace_task_ret is trace._fast_trace_task
  599. tid = uuid()
  600. message = self.TaskMessage(self.mytask.name, tid, args=[4])
  601. assert len(message.payload) == 3
  602. try:
  603. self.mytask.__trace__ = build_tracer(
  604. self.mytask.name, self.mytask, self.app.loader, 'test',
  605. app=self.app,
  606. )
  607. failed, res, runtime = trace.trace_task_ret(
  608. self.mytask.name, tid, message.headers, message.body,
  609. message.content_type, message.content_encoding)
  610. assert not failed
  611. assert res == repr(4 ** 4)
  612. assert runtime is not None
  613. assert isinstance(runtime, numbers.Real)
  614. finally:
  615. reset_worker_optimizations()
  616. assert trace.trace_task_ret is trace._trace_task_ret
  617. delattr(self.mytask, '__trace__')
  618. failed, res, runtime = trace.trace_task_ret(
  619. self.mytask.name, tid, message.headers, message.body,
  620. message.content_type, message.content_encoding, app=self.app,
  621. )
  622. assert not failed
  623. assert res == repr(4 ** 4)
  624. assert runtime is not None
  625. assert isinstance(runtime, numbers.Real)
  626. def test_trace_task_ret(self):
  627. self.mytask.__trace__ = build_tracer(
  628. self.mytask.name, self.mytask, self.app.loader, 'test',
  629. app=self.app,
  630. )
  631. tid = uuid()
  632. message = self.TaskMessage(self.mytask.name, tid, args=[4])
  633. _, R, _ = _trace_task_ret(
  634. self.mytask.name, tid, message.headers,
  635. message.body, message.content_type,
  636. message.content_encoding, app=self.app,
  637. )
  638. assert R == repr(4 ** 4)
  639. def test_trace_task_ret__no_trace(self):
  640. try:
  641. delattr(self.mytask, '__trace__')
  642. except AttributeError:
  643. pass
  644. tid = uuid()
  645. message = self.TaskMessage(self.mytask.name, tid, args=[4])
  646. _, R, _ = _trace_task_ret(
  647. self.mytask.name, tid, message.headers,
  648. message.body, message.content_type,
  649. message.content_encoding, app=self.app,
  650. )
  651. assert R == repr(4 ** 4)
  652. def test_trace_catches_exception(self):
  653. @self.app.task(request=None, shared=False)
  654. def raising():
  655. raise KeyError('baz')
  656. with pytest.warns(RuntimeWarning):
  657. res = trace_task(raising, uuid(), [], {}, app=self.app)[0]
  658. assert isinstance(res, ExceptionInfo)
  659. def test_worker_task_trace_handle_retry(self):
  660. tid = uuid()
  661. self.mytask.push_request(id=tid)
  662. try:
  663. raise ValueError('foo')
  664. except Exception as exc:
  665. try:
  666. raise Retry(str(exc), exc=exc)
  667. except Retry as exc:
  668. w = TraceInfo(states.RETRY, exc)
  669. w.handle_retry(
  670. self.mytask, self.mytask.request, store_errors=False,
  671. )
  672. assert self.mytask.backend.get_status(tid) == states.PENDING
  673. w.handle_retry(
  674. self.mytask, self.mytask.request, store_errors=True,
  675. )
  676. assert self.mytask.backend.get_status(tid) == states.RETRY
  677. finally:
  678. self.mytask.pop_request()
  679. def test_worker_task_trace_handle_failure(self):
  680. tid = uuid()
  681. self.mytask.push_request()
  682. try:
  683. self.mytask.request.id = tid
  684. try:
  685. raise ValueError('foo')
  686. except Exception as exc:
  687. w = TraceInfo(states.FAILURE, exc)
  688. w.handle_failure(
  689. self.mytask, self.mytask.request, store_errors=False,
  690. )
  691. assert self.mytask.backend.get_status(tid) == states.PENDING
  692. w.handle_failure(
  693. self.mytask, self.mytask.request, store_errors=True,
  694. )
  695. assert self.mytask.backend.get_status(tid) == states.FAILURE
  696. finally:
  697. self.mytask.pop_request()
  698. def test_from_message(self):
  699. us = 'æØåveéðƒeæ'
  700. tid = uuid()
  701. m = self.TaskMessage(
  702. self.mytask.name, tid, args=[2], kwargs={us: 'bar'},
  703. )
  704. job = Request(m, app=self.app)
  705. assert isinstance(job, Request)
  706. assert job.name == self.mytask.name
  707. assert job.id == tid
  708. assert job.message is m
  709. def test_from_message_empty_args(self):
  710. tid = uuid()
  711. m = self.TaskMessage(self.mytask.name, tid, args=[], kwargs={})
  712. job = Request(m, app=self.app)
  713. assert isinstance(job, Request)
  714. def test_from_message_missing_required_fields(self):
  715. m = self.TaskMessage(self.mytask.name)
  716. m.headers.clear()
  717. with pytest.raises(KeyError):
  718. Request(m, app=self.app)
  719. def test_from_message_nonexistant_task(self):
  720. m = self.TaskMessage(
  721. 'cu.mytask.doesnotexist',
  722. args=[2], kwargs={'æØåveéðƒeæ': 'bar'},
  723. )
  724. with pytest.raises(KeyError):
  725. Request(m, app=self.app)
  726. def test_execute(self):
  727. tid = uuid()
  728. job = self.xRequest(id=tid, args=[4], kwargs={})
  729. assert job.execute() == 256
  730. meta = self.mytask.backend.get_task_meta(tid)
  731. assert meta['status'] == states.SUCCESS
  732. assert meta['result'] == 256
  733. def test_execute_success_no_kwargs(self):
  734. @self.app.task # traverses coverage for decorator without parens
  735. def mytask_no_kwargs(i):
  736. return i ** i
  737. tid = uuid()
  738. job = self.xRequest(
  739. name=mytask_no_kwargs.name,
  740. id=tid,
  741. args=[4],
  742. kwargs={},
  743. )
  744. assert job.execute() == 256
  745. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  746. assert meta['result'] == 256
  747. assert meta['status'] == states.SUCCESS
  748. def test_execute_ack(self):
  749. scratch = {'ACK': False}
  750. def on_ack(*args, **kwargs):
  751. scratch['ACK'] = True
  752. tid = uuid()
  753. job = self.xRequest(id=tid, args=[4], on_ack=on_ack)
  754. assert job.execute() == 256
  755. meta = self.mytask.backend.get_task_meta(tid)
  756. assert scratch['ACK']
  757. assert meta['result'] == 256
  758. assert meta['status'] == states.SUCCESS
  759. def test_execute_fail(self):
  760. tid = uuid()
  761. job = self.xRequest(
  762. name=self.mytask_raising.name,
  763. id=tid,
  764. args=[4],
  765. kwargs={},
  766. )
  767. assert isinstance(job.execute(), ExceptionInfo)
  768. assert self.mytask_raising.backend.serializer == 'pickle'
  769. meta = self.mytask_raising.backend.get_task_meta(tid)
  770. assert meta['status'] == states.FAILURE
  771. assert isinstance(meta['result'], KeyError)
  772. def test_execute_using_pool(self):
  773. tid = uuid()
  774. job = self.xRequest(id=tid, args=[4])
  775. p = Mock()
  776. job.execute_using_pool(p)
  777. p.apply_async.assert_called_once()
  778. args = p.apply_async.call_args[1]['args']
  779. assert args[0] == self.mytask.name
  780. assert args[1] == tid
  781. assert args[2] == job.request_dict
  782. assert args[3] == job.message.body
  783. def _test_on_failure(self, exception, **kwargs):
  784. tid = uuid()
  785. job = self.xRequest(id=tid, args=[4])
  786. job.send_event = Mock(name='send_event')
  787. job.task.backend.mark_as_failure = Mock(name='mark_as_failure')
  788. try:
  789. raise exception
  790. except type(exception):
  791. exc_info = ExceptionInfo()
  792. job.on_failure(exc_info, **kwargs)
  793. job.send_event.assert_called()
  794. return job
  795. def test_on_failure(self):
  796. self._test_on_failure(Exception('Inside unit tests'))
  797. def test_on_failure__unicode_exception(self):
  798. self._test_on_failure(Exception('Бобры атакуют'))
  799. def test_on_failure__utf8_exception(self):
  800. self._test_on_failure(Exception(
  801. from_utf8('Бобры атакуют')))
  802. def test_on_failure__WorkerLostError(self):
  803. exc = WorkerLostError()
  804. job = self._test_on_failure(exc)
  805. job.task.backend.mark_as_failure.assert_called_with(
  806. job.id, exc, request=job, store_result=True,
  807. )
  808. def test_on_failure__return_ok(self):
  809. self._test_on_failure(KeyError(), return_ok=True)
  810. def test_reject(self):
  811. job = self.xRequest(id=uuid())
  812. job.on_reject = Mock(name='on_reject')
  813. job.reject(requeue=True)
  814. job.on_reject.assert_called_with(
  815. req_logger, job.connection_errors, True,
  816. )
  817. assert job.acknowledged
  818. job.on_reject.reset_mock()
  819. job.reject(requeue=True)
  820. job.on_reject.assert_not_called()
  821. def test_group(self):
  822. gid = uuid()
  823. job = self.xRequest(id=uuid(), group=gid)
  824. assert job.group == gid
  825. class test_create_request_class(RequestCase):
  826. def setup(self):
  827. self.task = Mock(name='task')
  828. self.pool = Mock(name='pool')
  829. self.eventer = Mock(name='eventer')
  830. RequestCase.setup(self)
  831. def create_request_cls(self, **kwargs):
  832. return create_request_cls(
  833. Request, self.task, self.pool, 'foo', self.eventer, **kwargs
  834. )
  835. def zRequest(self, Request=None, revoked_tasks=None, ref=None, **kwargs):
  836. return self.xRequest(
  837. Request=Request or self.create_request_cls(
  838. ref=ref,
  839. revoked_tasks=revoked_tasks,
  840. ),
  841. **kwargs)
  842. def test_on_success(self):
  843. self.zRequest(id=uuid()).on_success((False, 'hey', 3.1222))
  844. def test_on_success__SystemExit(self,
  845. errors=(SystemExit, KeyboardInterrupt)):
  846. for exc in errors:
  847. einfo = None
  848. try:
  849. raise exc()
  850. except exc:
  851. einfo = ExceptionInfo()
  852. with pytest.raises(exc):
  853. self.zRequest(id=uuid()).on_success((True, einfo, 1.0))
  854. def test_on_success__calls_failure(self):
  855. job = self.zRequest(id=uuid())
  856. einfo = Mock(name='einfo')
  857. job.on_failure = Mock(name='on_failure')
  858. job.on_success((True, einfo, 1.0))
  859. job.on_failure.assert_called_with(einfo, return_ok=True)
  860. def test_on_success__acks_late_enabled(self):
  861. self.task.acks_late = True
  862. job = self.zRequest(id=uuid())
  863. job.acknowledge = Mock(name='ack')
  864. job.on_success((False, 'foo', 1.0))
  865. job.acknowledge.assert_called_with()
  866. def test_on_success__acks_late_disabled(self):
  867. self.task.acks_late = False
  868. job = self.zRequest(id=uuid())
  869. job.acknowledge = Mock(name='ack')
  870. job.on_success((False, 'foo', 1.0))
  871. job.acknowledge.assert_not_called()
  872. def test_on_success__no_events(self):
  873. self.eventer = None
  874. job = self.zRequest(id=uuid())
  875. job.send_event = Mock(name='send_event')
  876. job.on_success((False, 'foo', 1.0))
  877. job.send_event.assert_not_called()
  878. def test_on_success__with_events(self):
  879. job = self.zRequest(id=uuid())
  880. job.send_event = Mock(name='send_event')
  881. job.on_success((False, 'foo', 1.0))
  882. job.send_event.assert_called_with(
  883. 'task-succeeded', result='foo', runtime=1.0,
  884. )
  885. def test_execute_using_pool__revoked(self):
  886. tid = uuid()
  887. job = self.zRequest(id=tid, revoked_tasks={tid})
  888. job.revoked = Mock()
  889. job.revoked.return_value = True
  890. with pytest.raises(TaskRevokedError):
  891. job.execute_using_pool(self.pool)
  892. def test_execute_using_pool__expired(self):
  893. tid = uuid()
  894. job = self.zRequest(id=tid, revoked_tasks=set())
  895. job.expires = 1232133
  896. job.revoked = Mock()
  897. job.revoked.return_value = True
  898. with pytest.raises(TaskRevokedError):
  899. job.execute_using_pool(self.pool)
  900. def test_execute_using_pool(self):
  901. from celery.app.trace import trace_task_ret as trace
  902. weakref_ref = Mock(name='weakref.ref')
  903. job = self.zRequest(id=uuid(), revoked_tasks=set(), ref=weakref_ref)
  904. job.execute_using_pool(self.pool)
  905. self.pool.apply_async.assert_called_with(
  906. trace,
  907. args=(job.type, job.id, job.request_dict, job.body,
  908. job.content_type, job.content_encoding),
  909. accept_callback=job.on_accepted,
  910. timeout_callback=job.on_timeout,
  911. callback=job.on_success,
  912. error_callback=job.on_failure,
  913. soft_timeout=self.task.soft_time_limit,
  914. timeout=self.task.time_limit,
  915. correlation_id=job.id,
  916. )
  917. assert job._apply_result
  918. weakref_ref.assert_called_with(self.pool.apply_async())
  919. assert job._apply_result is weakref_ref()
  920. def test_execute_using_pool_with_none_timelimit_header(self):
  921. from celery.app.trace import trace_task_ret as trace
  922. weakref_ref = Mock(name='weakref.ref')
  923. job = self.zRequest(id=uuid(),
  924. revoked_tasks=set(),
  925. ref=weakref_ref,
  926. headers={'timelimit': None})
  927. job.execute_using_pool(self.pool)
  928. self.pool.apply_async.assert_called_with(
  929. trace,
  930. args=(job.type, job.id, job.request_dict, job.body,
  931. job.content_type, job.content_encoding),
  932. accept_callback=job.on_accepted,
  933. timeout_callback=job.on_timeout,
  934. callback=job.on_success,
  935. error_callback=job.on_failure,
  936. soft_timeout=self.task.soft_time_limit,
  937. timeout=self.task.time_limit,
  938. correlation_id=job.id,
  939. )
  940. assert job._apply_result
  941. weakref_ref.assert_called_with(self.pool.apply_async())
  942. assert job._apply_result is weakref_ref()
  943. def test_execute_using_pool__defaults_of_hybrid_to_proto2(self):
  944. weakref_ref = Mock(name='weakref.ref')
  945. headers = strategy.hybrid_to_proto2('', {'id': uuid(),
  946. 'task': self.mytask.name})[1]
  947. job = self.zRequest(revoked_tasks=set(), ref=weakref_ref, **headers)
  948. job.execute_using_pool(self.pool)
  949. assert job._apply_result
  950. weakref_ref.assert_called_with(self.pool.apply_async())
  951. assert job._apply_result is weakref_ref()