test_request.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import anyjson
  4. import os
  5. import signal
  6. import socket
  7. import sys
  8. from datetime import datetime, timedelta
  9. from billiard.einfo import ExceptionInfo
  10. from kombu.transport.base import Message
  11. from kombu.utils.encoding import from_utf8, default_encode
  12. from mock import Mock, patch
  13. from nose import SkipTest
  14. from celery import states
  15. from celery.app.trace import (
  16. trace_task,
  17. _trace_task_ret,
  18. TraceInfo,
  19. mro_lookup,
  20. build_tracer,
  21. setup_worker_optimizations,
  22. reset_worker_optimizations,
  23. )
  24. from celery.concurrency.base import BasePool
  25. from celery.exceptions import (
  26. Ignore,
  27. InvalidTaskError,
  28. Retry,
  29. TaskRevokedError,
  30. Terminated,
  31. WorkerLostError,
  32. )
  33. from celery.five import keys, monotonic
  34. from celery.signals import task_revoked
  35. from celery.utils import uuid
  36. from celery.worker import job as module
  37. from celery.worker.job import Request, TaskRequest, logger as req_logger
  38. from celery.worker.state import revoked
  39. from celery.tests.case import (
  40. AppCase,
  41. Case,
  42. assert_signal_called,
  43. body_from_sig,
  44. )
  45. class test_mro_lookup(Case):
  46. def test_order(self):
  47. class A(object):
  48. pass
  49. class B(A):
  50. pass
  51. class C(B):
  52. pass
  53. class D(C):
  54. @classmethod
  55. def mro(cls):
  56. return ()
  57. A.x = 10
  58. self.assertEqual(mro_lookup(C, 'x'), A)
  59. self.assertIsNone(mro_lookup(C, 'x', stop=(A, )))
  60. B.x = 10
  61. self.assertEqual(mro_lookup(C, 'x'), B)
  62. C.x = 10
  63. self.assertEqual(mro_lookup(C, 'x'), C)
  64. self.assertIsNone(mro_lookup(D, 'x'))
  65. def jail(app, task_id, name, args, kwargs):
  66. request = {'id': task_id}
  67. task = app.tasks[name]
  68. task.__trace__ = None # rebuild
  69. return trace_task(
  70. task, task_id, args, kwargs, request=request, eager=False, app=app,
  71. )
  72. class test_default_encode(AppCase):
  73. def setup(self):
  74. if sys.version_info >= (3, 0):
  75. raise SkipTest('py3k: not relevant')
  76. def test_jython(self):
  77. prev, sys.platform = sys.platform, 'java 1.6.1'
  78. try:
  79. self.assertEqual(default_encode(bytes('foo')), 'foo')
  80. finally:
  81. sys.platform = prev
  82. def test_cpython(self):
  83. prev, sys.platform = sys.platform, 'darwin'
  84. gfe, sys.getfilesystemencoding = (
  85. sys.getfilesystemencoding,
  86. lambda: 'utf-8',
  87. )
  88. try:
  89. self.assertEqual(default_encode(bytes('foo')), 'foo')
  90. finally:
  91. sys.platform = prev
  92. sys.getfilesystemencoding = gfe
  93. class test_Retry(AppCase):
  94. def test_retry_semipredicate(self):
  95. try:
  96. raise Exception('foo')
  97. except Exception as exc:
  98. ret = Retry('Retrying task', exc)
  99. self.assertEqual(ret.exc, exc)
  100. class test_trace_task(AppCase):
  101. def setup(self):
  102. @self.app.task(shared=False)
  103. def mytask(i, **kwargs):
  104. return i ** i
  105. self.mytask = mytask
  106. @self.app.task(shared=False)
  107. def mytask_raising(i):
  108. raise KeyError(i)
  109. self.mytask_raising = mytask_raising
  110. @patch('celery.app.trace._logger')
  111. def test_process_cleanup_fails(self, _logger):
  112. self.mytask.backend = Mock()
  113. self.mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  114. tid = uuid()
  115. ret = jail(self.app, tid, self.mytask.name, [2], {})
  116. self.assertEqual(ret, 4)
  117. self.assertTrue(self.mytask.backend.store_result.called)
  118. self.assertIn('Process cleanup failed', _logger.error.call_args[0][0])
  119. def test_process_cleanup_BaseException(self):
  120. self.mytask.backend = Mock()
  121. self.mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  122. with self.assertRaises(SystemExit):
  123. jail(self.app, uuid(), self.mytask.name, [2], {})
  124. def test_execute_jail_success(self):
  125. ret = jail(self.app, uuid(), self.mytask.name, [2], {})
  126. self.assertEqual(ret, 4)
  127. def test_marked_as_started(self):
  128. _started = []
  129. def store_result(tid, meta, state, **kwars):
  130. if state == states.STARTED:
  131. _started.append(tid)
  132. self.mytask.backend.store_result = Mock(name='store_result')
  133. self.mytask.backend.store_result.side_effect = store_result
  134. self.mytask.track_started = True
  135. tid = uuid()
  136. jail(self.app, tid, self.mytask.name, [2], {})
  137. self.assertIn(tid, _started)
  138. self.mytask.ignore_result = True
  139. tid = uuid()
  140. jail(self.app, tid, self.mytask.name, [2], {})
  141. self.assertNotIn(tid, _started)
  142. def test_execute_jail_failure(self):
  143. ret = jail(
  144. self.app, uuid(), self.mytask_raising.name, [4], {},
  145. )
  146. self.assertIsInstance(ret, ExceptionInfo)
  147. self.assertTupleEqual(ret.exception.args, (4, ))
  148. def test_execute_ignore_result(self):
  149. @self.app.task(shared=False, ignore_result=True)
  150. def ignores_result(i):
  151. return i ** i
  152. task_id = uuid()
  153. ret = jail(self.app, task_id, ignores_result.name, [4], {})
  154. self.assertEqual(ret, 256)
  155. self.assertFalse(self.app.AsyncResult(task_id).ready())
  156. class MockEventDispatcher(object):
  157. def __init__(self):
  158. self.sent = []
  159. self.enabled = True
  160. def send(self, event, **fields):
  161. self.sent.append(event)
  162. class test_Request(AppCase):
  163. def setup(self):
  164. @self.app.task(shared=False)
  165. def add(x, y, **kw_):
  166. return x + y
  167. self.add = add
  168. @self.app.task(shared=False)
  169. def mytask(i, **kwargs):
  170. return i ** i
  171. self.mytask = mytask
  172. @self.app.task(shared=False)
  173. def mytask_raising(i):
  174. raise KeyError(i)
  175. self.mytask_raising = mytask_raising
  176. def get_request(self, sig, Request=Request, **kwargs):
  177. return Request(
  178. body_from_sig(self.app, sig),
  179. on_ack=Mock(),
  180. eventer=Mock(),
  181. app=self.app,
  182. connection_errors=(socket.error, ),
  183. task=sig.type,
  184. **kwargs
  185. )
  186. def test_invalid_eta_raises_InvalidTaskError(self):
  187. with self.assertRaises(InvalidTaskError):
  188. self.get_request(self.add.s(2, 2).set(eta='12345'))
  189. def test_invalid_expires_raises_InvalidTaskError(self):
  190. with self.assertRaises(InvalidTaskError):
  191. self.get_request(self.add.s(2, 2).set(expires='12345'))
  192. def test_valid_expires_with_utc_makes_aware(self):
  193. with patch('celery.worker.job.maybe_make_aware') as mma:
  194. self.get_request(self.add.s(2, 2).set(expires=10))
  195. self.assertTrue(mma.called)
  196. def test_maybe_expire_when_expires_is_None(self):
  197. req = self.get_request(self.add.s(2, 2))
  198. self.assertFalse(req.maybe_expire())
  199. def test_on_retry_acks_if_late(self):
  200. self.add.acks_late = True
  201. req = self.get_request(self.add.s(2, 2))
  202. req.on_retry(Mock())
  203. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  204. def test_on_failure_Termianted(self):
  205. einfo = None
  206. try:
  207. raise Terminated('9')
  208. except Terminated:
  209. einfo = ExceptionInfo()
  210. self.assertIsNotNone(einfo)
  211. req = self.get_request(self.add.s(2, 2))
  212. req.on_failure(einfo)
  213. req.eventer.send.assert_called_with(
  214. 'task-revoked',
  215. uuid=req.id, terminated=True, signum='9', expired=False,
  216. )
  217. def test_log_error_propagates_MemoryError(self):
  218. einfo = None
  219. try:
  220. raise MemoryError()
  221. except MemoryError:
  222. einfo = ExceptionInfo(internal=True)
  223. self.assertIsNotNone(einfo)
  224. req = self.get_request(self.add.s(2, 2))
  225. with self.assertRaises(MemoryError):
  226. req._log_error(einfo)
  227. def test_log_error_when_Ignore(self):
  228. einfo = None
  229. try:
  230. raise Ignore()
  231. except Ignore:
  232. einfo = ExceptionInfo(internal=True)
  233. self.assertIsNotNone(einfo)
  234. req = self.get_request(self.add.s(2, 2))
  235. req._log_error(einfo)
  236. req.on_ack.assert_called_with(req_logger, req.connection_errors)
  237. def test_tzlocal_is_cached(self):
  238. req = self.get_request(self.add.s(2, 2))
  239. req._tzlocal = 'foo'
  240. self.assertEqual(req.tzlocal, 'foo')
  241. def test_execute_magic_kwargs(self):
  242. task = self.add.s(2, 2)
  243. task.freeze()
  244. req = self.get_request(task)
  245. self.add.accept_magic_kwargs = True
  246. pool = Mock()
  247. req.execute_using_pool(pool)
  248. self.assertTrue(pool.apply_async.called)
  249. args = pool.apply_async.call_args[1]['args']
  250. self.assertEqual(args[0], task.task)
  251. self.assertEqual(args[1], task.id)
  252. self.assertEqual(args[2], task.args)
  253. kwargs = args[3]
  254. self.assertEqual(kwargs.get('task_name'), task.task)
  255. def test_task_wrapper_repr(self):
  256. job = TaskRequest(
  257. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  258. )
  259. self.assertTrue(repr(job))
  260. @patch('celery.worker.job.kwdict')
  261. def test_kwdict(self, kwdict):
  262. prev, module.NEEDS_KWDICT = module.NEEDS_KWDICT, True
  263. try:
  264. TaskRequest(
  265. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  266. )
  267. self.assertTrue(kwdict.called)
  268. finally:
  269. module.NEEDS_KWDICT = prev
  270. def test_sets_store_errors(self):
  271. self.mytask.ignore_result = True
  272. job = TaskRequest(
  273. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  274. )
  275. self.assertFalse(job.store_errors)
  276. self.mytask.store_errors_even_if_ignored = True
  277. job = TaskRequest(
  278. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  279. )
  280. self.assertTrue(job.store_errors)
  281. def test_send_event(self):
  282. job = TaskRequest(
  283. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  284. )
  285. job.eventer = MockEventDispatcher()
  286. job.send_event('task-frobulated')
  287. self.assertIn('task-frobulated', job.eventer.sent)
  288. def test_on_retry(self):
  289. job = TaskRequest(
  290. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  291. )
  292. job.eventer = MockEventDispatcher()
  293. try:
  294. raise Retry('foo', KeyError('moofoobar'))
  295. except:
  296. einfo = ExceptionInfo()
  297. job.on_failure(einfo)
  298. self.assertIn('task-retried', job.eventer.sent)
  299. prev, module._does_info = module._does_info, False
  300. try:
  301. job.on_failure(einfo)
  302. finally:
  303. module._does_info = prev
  304. einfo.internal = True
  305. job.on_failure(einfo)
  306. def test_compat_properties(self):
  307. job = TaskRequest(
  308. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  309. )
  310. self.assertEqual(job.task_id, job.id)
  311. self.assertEqual(job.task_name, job.name)
  312. job.task_id = 'ID'
  313. self.assertEqual(job.id, 'ID')
  314. job.task_name = 'NAME'
  315. self.assertEqual(job.name, 'NAME')
  316. def test_terminate__task_started(self):
  317. pool = Mock()
  318. signum = signal.SIGKILL
  319. job = TaskRequest(
  320. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  321. )
  322. with assert_signal_called(
  323. task_revoked, sender=job.task, request=job,
  324. terminated=True, expired=False, signum=signum):
  325. job.time_start = monotonic()
  326. job.worker_pid = 313
  327. job.terminate(pool, signal='KILL')
  328. pool.terminate_job.assert_called_with(job.worker_pid, signum)
  329. def test_terminate__task_reserved(self):
  330. pool = Mock()
  331. job = TaskRequest(
  332. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  333. )
  334. job.time_start = None
  335. job.terminate(pool, signal='KILL')
  336. self.assertFalse(pool.terminate_job.called)
  337. self.assertTupleEqual(job._terminate_on_ack, (pool, 'KILL'))
  338. job.terminate(pool, signal='KILL')
  339. def test_revoked_expires_expired(self):
  340. job = TaskRequest(
  341. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  342. expires=datetime.utcnow() - timedelta(days=1),
  343. )
  344. with assert_signal_called(
  345. task_revoked, sender=job.task, request=job,
  346. terminated=False, expired=True, signum=None):
  347. job.revoked()
  348. self.assertIn(job.id, revoked)
  349. self.assertEqual(
  350. self.mytask.backend.get_status(job.id),
  351. states.REVOKED,
  352. )
  353. def test_revoked_expires_not_expired(self):
  354. job = TaskRequest(
  355. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  356. expires=datetime.utcnow() + timedelta(days=1),
  357. )
  358. job.revoked()
  359. self.assertNotIn(job.id, revoked)
  360. self.assertNotEqual(
  361. self.mytask.backend.get_status(job.id),
  362. states.REVOKED,
  363. )
  364. def test_revoked_expires_ignore_result(self):
  365. self.mytask.ignore_result = True
  366. job = TaskRequest(
  367. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  368. expires=datetime.utcnow() - timedelta(days=1),
  369. )
  370. job.revoked()
  371. self.assertIn(job.id, revoked)
  372. self.assertNotEqual(
  373. self.mytask.backend.get_status(job.id), states.REVOKED,
  374. )
  375. def test_send_email(self):
  376. app = self.app
  377. mail_sent = [False]
  378. def mock_mail_admins(*args, **kwargs):
  379. mail_sent[0] = True
  380. def get_ei():
  381. try:
  382. raise KeyError('moofoobar')
  383. except:
  384. return ExceptionInfo()
  385. app.mail_admins = mock_mail_admins
  386. self.mytask.send_error_emails = True
  387. job = TaskRequest(
  388. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  389. )
  390. einfo = get_ei()
  391. job.on_failure(einfo)
  392. self.assertTrue(mail_sent[0])
  393. einfo = get_ei()
  394. mail_sent[0] = False
  395. self.mytask.send_error_emails = False
  396. job.on_failure(einfo)
  397. self.assertFalse(mail_sent[0])
  398. einfo = get_ei()
  399. mail_sent[0] = False
  400. self.mytask.send_error_emails = True
  401. job.on_failure(einfo)
  402. self.assertTrue(mail_sent[0])
  403. def test_already_revoked(self):
  404. job = TaskRequest(
  405. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  406. )
  407. job._already_revoked = True
  408. self.assertTrue(job.revoked())
  409. def test_revoked(self):
  410. job = TaskRequest(
  411. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  412. )
  413. with assert_signal_called(
  414. task_revoked, sender=job.task, request=job,
  415. terminated=False, expired=False, signum=None):
  416. revoked.add(job.id)
  417. self.assertTrue(job.revoked())
  418. self.assertTrue(job._already_revoked)
  419. self.assertTrue(job.acknowledged)
  420. def test_execute_does_not_execute_revoked(self):
  421. job = TaskRequest(
  422. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  423. )
  424. revoked.add(job.id)
  425. job.execute()
  426. def test_execute_acks_late(self):
  427. self.mytask_raising.acks_late = True
  428. job = TaskRequest(self.mytask_raising.name, uuid(), [1], app=self.app)
  429. job.execute()
  430. self.assertTrue(job.acknowledged)
  431. job.execute()
  432. def test_execute_using_pool_does_not_execute_revoked(self):
  433. job = TaskRequest(
  434. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  435. )
  436. revoked.add(job.id)
  437. with self.assertRaises(TaskRevokedError):
  438. job.execute_using_pool(None)
  439. def test_on_accepted_acks_early(self):
  440. job = TaskRequest(
  441. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  442. )
  443. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  444. self.assertTrue(job.acknowledged)
  445. prev, module._does_debug = module._does_debug, False
  446. try:
  447. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  448. finally:
  449. module._does_debug = prev
  450. def test_on_accepted_acks_late(self):
  451. job = TaskRequest(
  452. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  453. )
  454. self.mytask.acks_late = True
  455. job.on_accepted(pid=os.getpid(), time_accepted=monotonic())
  456. self.assertFalse(job.acknowledged)
  457. def test_on_accepted_terminates(self):
  458. signum = signal.SIGKILL
  459. pool = Mock()
  460. job = TaskRequest(
  461. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  462. )
  463. with assert_signal_called(
  464. task_revoked, sender=job.task, request=job,
  465. terminated=True, expired=False, signum=signum):
  466. job.terminate(pool, signal='KILL')
  467. self.assertFalse(pool.terminate_job.call_count)
  468. job.on_accepted(pid=314, time_accepted=monotonic())
  469. pool.terminate_job.assert_called_with(314, signum)
  470. def test_on_success_acks_early(self):
  471. job = TaskRequest(
  472. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  473. )
  474. job.time_start = 1
  475. job.on_success(42)
  476. prev, module._does_info = module._does_info, False
  477. try:
  478. job.on_success(42)
  479. self.assertFalse(job.acknowledged)
  480. finally:
  481. module._does_info = prev
  482. def test_on_success_BaseException(self):
  483. job = TaskRequest(
  484. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  485. )
  486. job.time_start = 1
  487. with self.assertRaises(SystemExit):
  488. try:
  489. raise SystemExit()
  490. except SystemExit:
  491. job.on_success(ExceptionInfo())
  492. else:
  493. assert False
  494. def test_on_success_eventer(self):
  495. job = TaskRequest(
  496. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  497. )
  498. job.time_start = 1
  499. job.eventer = Mock()
  500. job.send_event = Mock()
  501. job.on_success(42)
  502. self.assertTrue(job.send_event.called)
  503. def test_on_success_when_failure(self):
  504. job = TaskRequest(
  505. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  506. )
  507. job.time_start = 1
  508. job.on_failure = Mock()
  509. try:
  510. raise KeyError('foo')
  511. except Exception:
  512. job.on_success(ExceptionInfo())
  513. self.assertTrue(job.on_failure.called)
  514. def test_on_success_acks_late(self):
  515. job = TaskRequest(
  516. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  517. )
  518. job.time_start = 1
  519. self.mytask.acks_late = True
  520. job.on_success(42)
  521. self.assertTrue(job.acknowledged)
  522. def test_on_failure_WorkerLostError(self):
  523. def get_ei():
  524. try:
  525. raise WorkerLostError('do re mi')
  526. except WorkerLostError:
  527. return ExceptionInfo()
  528. job = TaskRequest(
  529. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  530. )
  531. exc_info = get_ei()
  532. job.on_failure(exc_info)
  533. self.assertEqual(
  534. self.mytask.backend.get_status(job.id), states.FAILURE,
  535. )
  536. self.mytask.ignore_result = True
  537. exc_info = get_ei()
  538. job = TaskRequest(
  539. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  540. )
  541. job.on_failure(exc_info)
  542. self.assertEqual(
  543. self.mytask.backend.get_status(job.id), states.PENDING,
  544. )
  545. def test_on_failure_acks_late(self):
  546. job = TaskRequest(
  547. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  548. )
  549. job.time_start = 1
  550. self.mytask.acks_late = True
  551. try:
  552. raise KeyError('foo')
  553. except KeyError:
  554. exc_info = ExceptionInfo()
  555. job.on_failure(exc_info)
  556. self.assertTrue(job.acknowledged)
  557. def test_from_message_invalid_kwargs(self):
  558. body = dict(task=self.mytask.name, id=1, args=(), kwargs='foo')
  559. with self.assertRaises(InvalidTaskError):
  560. TaskRequest.from_message(None, body, app=self.app)
  561. @patch('celery.worker.job.error')
  562. @patch('celery.worker.job.warn')
  563. def test_on_timeout(self, warn, error):
  564. job = TaskRequest(
  565. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  566. )
  567. job.on_timeout(soft=True, timeout=1337)
  568. self.assertIn('Soft time limit', warn.call_args[0][0])
  569. job.on_timeout(soft=False, timeout=1337)
  570. self.assertIn('Hard time limit', error.call_args[0][0])
  571. self.assertEqual(
  572. self.mytask.backend.get_status(job.id), states.FAILURE,
  573. )
  574. self.mytask.ignore_result = True
  575. job = TaskRequest(
  576. self.mytask.name, uuid(), [1], {'f': 'x'}, app=self.app,
  577. )
  578. job.on_timeout(soft=True, timeout=1336)
  579. self.assertEqual(
  580. self.mytask.backend.get_status(job.id), states.PENDING,
  581. )
  582. def test_fast_trace_task(self):
  583. from celery.app import trace
  584. setup_worker_optimizations(self.app)
  585. self.assertIs(trace.trace_task_ret, trace._fast_trace_task)
  586. try:
  587. self.mytask.__trace__ = build_tracer(
  588. self.mytask.name, self.mytask, self.app.loader, 'test',
  589. app=self.app,
  590. )
  591. res = trace.trace_task_ret(self.mytask.name, uuid(), [4], {})
  592. self.assertEqual(res, 4 ** 4)
  593. finally:
  594. reset_worker_optimizations()
  595. self.assertIs(trace.trace_task_ret, trace._trace_task_ret)
  596. delattr(self.mytask, '__trace__')
  597. res = trace.trace_task_ret(
  598. self.mytask.name, uuid(), [4], {}, app=self.app,
  599. )
  600. self.assertEqual(res, 4 ** 4)
  601. def test_trace_task_ret(self):
  602. self.mytask.__trace__ = build_tracer(
  603. self.mytask.name, self.mytask, self.app.loader, 'test',
  604. app=self.app,
  605. )
  606. res = _trace_task_ret(self.mytask.name, uuid(), [4], {}, app=self.app)
  607. self.assertEqual(res, 4 ** 4)
  608. def test_trace_task_ret__no_trace(self):
  609. try:
  610. delattr(self.mytask, '__trace__')
  611. except AttributeError:
  612. pass
  613. res = _trace_task_ret(self.mytask.name, uuid(), [4], {}, app=self.app)
  614. self.assertEqual(res, 4 ** 4)
  615. def test_trace_catches_exception(self):
  616. def _error_exec(self, *args, **kwargs):
  617. raise KeyError('baz')
  618. @self.app.task(request=None, shared=False)
  619. def raising():
  620. raise KeyError('baz')
  621. with self.assertWarnsRegex(RuntimeWarning,
  622. r'Exception raised outside'):
  623. res = trace_task(raising, uuid(), [], {}, app=self.app)
  624. self.assertIsInstance(res, ExceptionInfo)
  625. def test_worker_task_trace_handle_retry(self):
  626. tid = uuid()
  627. self.mytask.push_request(id=tid)
  628. try:
  629. raise ValueError('foo')
  630. except Exception as exc:
  631. try:
  632. raise Retry(str(exc), exc=exc)
  633. except Retry as exc:
  634. w = TraceInfo(states.RETRY, exc)
  635. w.handle_retry(self.mytask, store_errors=False)
  636. self.assertEqual(
  637. self.mytask.backend.get_status(tid), states.PENDING,
  638. )
  639. w.handle_retry(self.mytask, store_errors=True)
  640. self.assertEqual(
  641. self.mytask.backend.get_status(tid), states.RETRY,
  642. )
  643. finally:
  644. self.mytask.pop_request()
  645. def test_worker_task_trace_handle_failure(self):
  646. tid = uuid()
  647. self.mytask.push_request()
  648. try:
  649. self.mytask.request.id = tid
  650. try:
  651. raise ValueError('foo')
  652. except Exception as exc:
  653. w = TraceInfo(states.FAILURE, exc)
  654. w.handle_failure(self.mytask, store_errors=False)
  655. self.assertEqual(
  656. self.mytask.backend.get_status(tid), states.PENDING,
  657. )
  658. w.handle_failure(self.mytask, store_errors=True)
  659. self.assertEqual(
  660. self.mytask.backend.get_status(tid), states.FAILURE,
  661. )
  662. finally:
  663. self.mytask.pop_request()
  664. def test_task_wrapper_mail_attrs(self):
  665. job = TaskRequest(self.mytask.name, uuid(), [], {}, app=self.app)
  666. x = job.success_msg % {
  667. 'name': job.name,
  668. 'id': job.id,
  669. 'return_value': 10,
  670. 'runtime': 0.3641,
  671. }
  672. self.assertTrue(x)
  673. x = job.error_msg % {
  674. 'name': job.name,
  675. 'id': job.id,
  676. 'exc': 'FOOBARBAZ',
  677. 'traceback': 'foobarbaz',
  678. }
  679. self.assertTrue(x)
  680. def test_from_message(self):
  681. us = 'æØåveéðƒeæ'
  682. body = {'task': self.mytask.name, 'id': uuid(),
  683. 'args': [2], 'kwargs': {us: 'bar'}}
  684. m = Message(None, body=anyjson.dumps(body), backend='foo',
  685. content_type='application/json',
  686. content_encoding='utf-8')
  687. job = TaskRequest.from_message(m, m.decode(), app=self.app)
  688. self.assertIsInstance(job, Request)
  689. self.assertEqual(job.name, body['task'])
  690. self.assertEqual(job.id, body['id'])
  691. self.assertEqual(job.args, body['args'])
  692. us = from_utf8(us)
  693. if sys.version_info < (2, 6):
  694. self.assertEqual(next(keys(job.kwargs)), us)
  695. self.assertIsInstance(next(keys(job.kwargs)), str)
  696. def test_from_message_empty_args(self):
  697. body = {'task': self.mytask.name, 'id': uuid()}
  698. m = Message(None, body=anyjson.dumps(body), backend='foo',
  699. content_type='application/json',
  700. content_encoding='utf-8')
  701. job = TaskRequest.from_message(m, m.decode(), app=self.app)
  702. self.assertIsInstance(job, Request)
  703. self.assertEqual(job.args, [])
  704. self.assertEqual(job.kwargs, {})
  705. def test_from_message_missing_required_fields(self):
  706. body = {}
  707. m = Message(None, body=anyjson.dumps(body), backend='foo',
  708. content_type='application/json',
  709. content_encoding='utf-8')
  710. with self.assertRaises(KeyError):
  711. TaskRequest.from_message(m, m.decode(), app=self.app)
  712. def test_from_message_nonexistant_task(self):
  713. body = {'task': 'cu.mytask.doesnotexist', 'id': uuid(),
  714. 'args': [2], 'kwargs': {'æØåveéðƒeæ': 'bar'}}
  715. m = Message(None, body=anyjson.dumps(body), backend='foo',
  716. content_type='application/json',
  717. content_encoding='utf-8')
  718. with self.assertRaises(KeyError):
  719. TaskRequest.from_message(m, m.decode(), app=self.app)
  720. def test_execute(self):
  721. tid = uuid()
  722. job = TaskRequest(self.mytask.name, tid, [4], {'f': 'x'}, app=self.app)
  723. self.assertEqual(job.execute(), 256)
  724. meta = self.mytask.backend.get_task_meta(tid)
  725. self.assertEqual(meta['result'], 256)
  726. self.assertEqual(meta['status'], states.SUCCESS)
  727. def test_execute_success_no_kwargs(self):
  728. @self.app.task # traverses coverage for decorator without parens
  729. def mytask_no_kwargs(i):
  730. return i ** i
  731. tid = uuid()
  732. job = TaskRequest(mytask_no_kwargs.name, tid, [4], {}, app=self.app)
  733. self.assertEqual(job.execute(), 256)
  734. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  735. self.assertEqual(meta['result'], 256)
  736. self.assertEqual(meta['status'], states.SUCCESS)
  737. def test_execute_success_some_kwargs(self):
  738. scratch = {'task_id': None}
  739. @self.app.task(shared=False, accept_magic_kwargs=True)
  740. def mytask_some_kwargs(i, task_id):
  741. scratch['task_id'] = task_id
  742. return i ** i
  743. tid = uuid()
  744. job = TaskRequest(mytask_some_kwargs.name, tid, [4], {}, app=self.app)
  745. self.assertEqual(job.execute(), 256)
  746. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  747. self.assertEqual(scratch.get('task_id'), tid)
  748. self.assertEqual(meta['result'], 256)
  749. self.assertEqual(meta['status'], states.SUCCESS)
  750. def test_execute_ack(self):
  751. scratch = {'ACK': False}
  752. def on_ack(*args, **kwargs):
  753. scratch['ACK'] = True
  754. tid = uuid()
  755. job = TaskRequest(
  756. self.mytask.name, tid, [4], {'f': 'x'},
  757. on_ack=on_ack, app=self.app,
  758. )
  759. self.assertEqual(job.execute(), 256)
  760. meta = self.mytask.backend.get_task_meta(tid)
  761. self.assertTrue(scratch['ACK'])
  762. self.assertEqual(meta['result'], 256)
  763. self.assertEqual(meta['status'], states.SUCCESS)
  764. def test_execute_fail(self):
  765. tid = uuid()
  766. job = TaskRequest(self.mytask_raising.name, tid, [4], app=self.app)
  767. self.assertIsInstance(job.execute(), ExceptionInfo)
  768. meta = self.mytask_raising.backend.get_task_meta(tid)
  769. self.assertEqual(meta['status'], states.FAILURE)
  770. self.assertIsInstance(meta['result'], KeyError)
  771. def test_execute_using_pool(self):
  772. tid = uuid()
  773. job = TaskRequest(self.mytask.name, tid, [4], {'f': 'x'}, app=self.app)
  774. class MockPool(BasePool):
  775. target = None
  776. args = None
  777. kwargs = None
  778. def __init__(self, *args, **kwargs):
  779. pass
  780. def apply_async(self, target, args=None, kwargs=None,
  781. *margs, **mkwargs):
  782. self.target = target
  783. self.args = args
  784. self.kwargs = kwargs
  785. p = MockPool()
  786. job.execute_using_pool(p)
  787. self.assertTrue(p.target)
  788. self.assertEqual(p.args[0], self.mytask.name)
  789. self.assertEqual(p.args[1], tid)
  790. self.assertEqual(p.args[2], [4])
  791. self.assertIn('f', p.args[3])
  792. self.assertIn([4], p.args)
  793. job.task.accept_magic_kwargs = False
  794. job.execute_using_pool(p)
  795. def test_default_kwargs(self):
  796. tid = uuid()
  797. job = TaskRequest(self.mytask.name, tid, [4], {'f': 'x'}, app=self.app)
  798. self.assertDictEqual(
  799. job.extend_with_default_kwargs(), {
  800. 'f': 'x',
  801. 'logfile': None,
  802. 'loglevel': None,
  803. 'task_id': job.id,
  804. 'task_retries': 0,
  805. 'task_is_eager': False,
  806. 'delivery_info': {
  807. 'exchange': None,
  808. 'routing_key': None,
  809. 'priority': None,
  810. 'redelivered': None,
  811. },
  812. 'task_name': job.name})
  813. @patch('celery.worker.job.logger')
  814. def _test_on_failure(self, exception, logger):
  815. app = self.app
  816. tid = uuid()
  817. job = TaskRequest(self.mytask.name, tid, [4], {'f': 'x'}, app=self.app)
  818. try:
  819. raise exception
  820. except Exception:
  821. exc_info = ExceptionInfo()
  822. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  823. job.on_failure(exc_info)
  824. self.assertTrue(logger.log.called)
  825. context = logger.log.call_args[0][2]
  826. self.assertEqual(self.mytask.name, context['name'])
  827. self.assertIn(tid, context['id'])
  828. def test_on_failure(self):
  829. self._test_on_failure(Exception('Inside unit tests'))
  830. def test_on_failure_unicode_exception(self):
  831. self._test_on_failure(Exception('Бобры атакуют'))
  832. def test_on_failure_utf8_exception(self):
  833. self._test_on_failure(Exception(
  834. from_utf8('Бобры атакуют')))