test_request.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. import anyjson
  5. import os
  6. import signal
  7. import sys
  8. import time
  9. from datetime import datetime, timedelta
  10. from kombu.transport.base import Message
  11. from kombu.utils.encoding import from_utf8, default_encode
  12. from mock import Mock, patch
  13. from nose import SkipTest
  14. from celery import current_app
  15. from celery import states
  16. from celery.app import app_or_default
  17. from celery.concurrency.base import BasePool
  18. from celery.datastructures import ExceptionInfo
  19. from celery.exceptions import (RetryTaskError,
  20. WorkerLostError, InvalidTaskError)
  21. from celery.task.trace import (
  22. trace_task,
  23. trace_task_ret,
  24. TraceInfo,
  25. mro_lookup,
  26. build_tracer,
  27. )
  28. from celery.result import AsyncResult
  29. from celery.signals import task_revoked
  30. from celery.task import task as task_dec
  31. from celery.task.base import Task
  32. from celery.utils import uuid
  33. from celery.worker import job as module
  34. from celery.worker.job import Request, TaskRequest
  35. from celery.worker.state import revoked
  36. from celery.tests.utils import Case, assert_signal_called
  37. scratch = {'ACK': False}
  38. some_kwargs_scratchpad = {}
  39. class test_mro_lookup(Case):
  40. def test_order(self):
  41. class A(object):
  42. pass
  43. class B(A):
  44. pass
  45. class C(B):
  46. pass
  47. class D(C):
  48. @classmethod
  49. def mro(cls):
  50. return ()
  51. A.x = 10
  52. self.assertEqual(mro_lookup(C, 'x'), A)
  53. self.assertIsNone(mro_lookup(C, 'x', stop=(A, )))
  54. B.x = 10
  55. self.assertEqual(mro_lookup(C, 'x'), B)
  56. C.x = 10
  57. self.assertEqual(mro_lookup(C, 'x'), C)
  58. self.assertIsNone(mro_lookup(D, 'x'))
  59. def jail(task_id, name, args, kwargs):
  60. request = {'id': task_id}
  61. task = current_app.tasks[name]
  62. task.__trace__ = None # rebuild
  63. return trace_task(task,
  64. task_id, args, kwargs, request=request, eager=False)
  65. def on_ack(*args, **kwargs):
  66. scratch['ACK'] = True
  67. @task_dec(accept_magic_kwargs=False)
  68. def mytask(i, **kwargs):
  69. return i ** i
  70. @task_dec # traverses coverage for decorator without parens
  71. def mytask_no_kwargs(i):
  72. return i ** i
  73. class MyTaskIgnoreResult(Task):
  74. ignore_result = True
  75. def run(self, i):
  76. return i ** i
  77. @task_dec(accept_magic_kwargs=True)
  78. def mytask_some_kwargs(i, task_id):
  79. some_kwargs_scratchpad['task_id'] = task_id
  80. return i ** i
  81. @task_dec(accept_magic_kwargs=False)
  82. def mytask_raising(i):
  83. raise KeyError(i)
  84. class test_default_encode(Case):
  85. def setUp(self):
  86. if sys.version_info >= (3, 0):
  87. raise SkipTest('py3k: not relevant')
  88. def test_jython(self):
  89. prev, sys.platform = sys.platform, 'java 1.6.1'
  90. try:
  91. self.assertEqual(default_encode('foo'), 'foo')
  92. finally:
  93. sys.platform = prev
  94. def test_cython(self):
  95. prev, sys.platform = sys.platform, 'darwin'
  96. gfe, sys.getfilesystemencoding = sys.getfilesystemencoding, \
  97. lambda: 'utf-8'
  98. try:
  99. self.assertEqual(default_encode('foo'), 'foo')
  100. finally:
  101. sys.platform = prev
  102. sys.getfilesystemencoding = gfe
  103. class test_RetryTaskError(Case):
  104. def test_retry_task_error(self):
  105. try:
  106. raise Exception('foo')
  107. except Exception, exc:
  108. ret = RetryTaskError('Retrying task', exc)
  109. self.assertEqual(ret.exc, exc)
  110. class test_trace_task(Case):
  111. @patch('celery.task.trace._logger')
  112. def test_process_cleanup_fails(self, _logger):
  113. backend = mytask.backend
  114. mytask.backend = Mock()
  115. mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  116. try:
  117. tid = uuid()
  118. ret = jail(tid, mytask.name, [2], {})
  119. self.assertEqual(ret, 4)
  120. mytask.backend.store_result.assert_called_with(tid, 4,
  121. states.SUCCESS)
  122. self.assertIn('Process cleanup failed',
  123. _logger.error.call_args[0][0])
  124. finally:
  125. mytask.backend = backend
  126. def test_process_cleanup_BaseException(self):
  127. backend = mytask.backend
  128. mytask.backend = Mock()
  129. mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  130. try:
  131. with self.assertRaises(SystemExit):
  132. jail(uuid(), mytask.name, [2], {})
  133. finally:
  134. mytask.backend = backend
  135. def test_execute_jail_success(self):
  136. ret = jail(uuid(), mytask.name, [2], {})
  137. self.assertEqual(ret, 4)
  138. def test_marked_as_started(self):
  139. class Backend(mytask.backend.__class__):
  140. _started = []
  141. def store_result(self, tid, meta, state):
  142. if state == states.STARTED:
  143. self._started.append(tid)
  144. prev, mytask.backend = mytask.backend, Backend()
  145. mytask.track_started = True
  146. try:
  147. tid = uuid()
  148. jail(tid, mytask.name, [2], {})
  149. self.assertIn(tid, Backend._started)
  150. mytask.ignore_result = True
  151. tid = uuid()
  152. jail(tid, mytask.name, [2], {})
  153. self.assertNotIn(tid, Backend._started)
  154. finally:
  155. mytask.backend = prev
  156. mytask.track_started = False
  157. mytask.ignore_result = False
  158. def test_execute_jail_failure(self):
  159. ret = jail(uuid(), mytask_raising.name,
  160. [4], {})
  161. self.assertIsInstance(ret, ExceptionInfo)
  162. self.assertTupleEqual(ret.exception.args, (4, ))
  163. def test_execute_ignore_result(self):
  164. task_id = uuid()
  165. ret = jail(task_id, MyTaskIgnoreResult.name, [4], {})
  166. self.assertEqual(ret, 256)
  167. self.assertFalse(AsyncResult(task_id).ready())
  168. class MockEventDispatcher(object):
  169. def __init__(self):
  170. self.sent = []
  171. self.enabled = True
  172. def send(self, event, **fields):
  173. self.sent.append(event)
  174. class test_TaskRequest(Case):
  175. def test_task_wrapper_repr(self):
  176. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  177. self.assertTrue(repr(tw))
  178. @patch('celery.worker.job.kwdict')
  179. def test_kwdict(self, kwdict):
  180. prev, module.NEEDS_KWDICT = module.NEEDS_KWDICT, True
  181. try:
  182. TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  183. self.assertTrue(kwdict.called)
  184. finally:
  185. module.NEEDS_KWDICT = prev
  186. def test_sets_store_errors(self):
  187. mytask.ignore_result = True
  188. try:
  189. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  190. self.assertFalse(tw.store_errors)
  191. mytask.store_errors_even_if_ignored = True
  192. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  193. self.assertTrue(tw.store_errors)
  194. finally:
  195. mytask.ignore_result = False
  196. mytask.store_errors_even_if_ignored = False
  197. def test_send_event(self):
  198. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  199. tw.eventer = MockEventDispatcher()
  200. tw.send_event('task-frobulated')
  201. self.assertIn('task-frobulated', tw.eventer.sent)
  202. def test_on_retry(self):
  203. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  204. tw.eventer = MockEventDispatcher()
  205. try:
  206. raise RetryTaskError('foo', KeyError('moofoobar'))
  207. except:
  208. einfo = ExceptionInfo()
  209. tw.on_failure(einfo)
  210. self.assertIn('task-retried', tw.eventer.sent)
  211. prev, module._does_info = module._does_info, False
  212. try:
  213. tw.on_failure(einfo)
  214. finally:
  215. module._does_info = prev
  216. einfo.internal = True
  217. tw.on_failure(einfo)
  218. def test_compat_properties(self):
  219. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  220. self.assertEqual(tw.task_id, tw.id)
  221. self.assertEqual(tw.task_name, tw.name)
  222. tw.task_id = 'ID'
  223. self.assertEqual(tw.id, 'ID')
  224. tw.task_name = 'NAME'
  225. self.assertEqual(tw.name, 'NAME')
  226. def test_terminate__task_started(self):
  227. pool = Mock()
  228. signum = signal.SIGKILL
  229. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  230. with assert_signal_called(task_revoked, sender=tw.task,
  231. terminated=True,
  232. expired=False,
  233. signum=signum):
  234. tw.time_start = time.time()
  235. tw.worker_pid = 313
  236. tw.terminate(pool, signal='KILL')
  237. pool.terminate_job.assert_called_with(tw.worker_pid, signum)
  238. def test_terminate__task_reserved(self):
  239. pool = Mock()
  240. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  241. tw.time_start = None
  242. tw.terminate(pool, signal='KILL')
  243. self.assertFalse(pool.terminate_job.called)
  244. self.assertTupleEqual(tw._terminate_on_ack, (pool, 'KILL'))
  245. tw.terminate(pool, signal='KILL')
  246. def test_revoked_expires_expired(self):
  247. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
  248. expires=datetime.utcnow() - timedelta(days=1))
  249. with assert_signal_called(task_revoked, sender=tw.task,
  250. terminated=False,
  251. expired=True,
  252. signum=None):
  253. tw.revoked()
  254. self.assertIn(tw.id, revoked)
  255. self.assertEqual(mytask.backend.get_status(tw.id),
  256. states.REVOKED)
  257. def test_revoked_expires_not_expired(self):
  258. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
  259. expires=datetime.utcnow() + timedelta(days=1))
  260. tw.revoked()
  261. self.assertNotIn(tw.id, revoked)
  262. self.assertNotEqual(mytask.backend.get_status(tw.id),
  263. states.REVOKED)
  264. def test_revoked_expires_ignore_result(self):
  265. mytask.ignore_result = True
  266. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
  267. expires=datetime.utcnow() - timedelta(days=1))
  268. try:
  269. tw.revoked()
  270. self.assertIn(tw.id, revoked)
  271. self.assertNotEqual(mytask.backend.get_status(tw.id),
  272. states.REVOKED)
  273. finally:
  274. mytask.ignore_result = False
  275. def test_send_email(self):
  276. app = app_or_default()
  277. old_mail_admins = app.mail_admins
  278. old_enable_mails = mytask.send_error_emails
  279. mail_sent = [False]
  280. def mock_mail_admins(*args, **kwargs):
  281. mail_sent[0] = True
  282. def get_ei():
  283. try:
  284. raise KeyError('moofoobar')
  285. except:
  286. return ExceptionInfo()
  287. app.mail_admins = mock_mail_admins
  288. mytask.send_error_emails = True
  289. try:
  290. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  291. einfo = get_ei()
  292. tw.on_failure(einfo)
  293. self.assertTrue(mail_sent[0])
  294. einfo = get_ei()
  295. mail_sent[0] = False
  296. mytask.send_error_emails = False
  297. tw.on_failure(einfo)
  298. self.assertFalse(mail_sent[0])
  299. einfo = get_ei()
  300. mail_sent[0] = False
  301. mytask.send_error_emails = True
  302. mytask.error_whitelist = [KeyError]
  303. tw.on_failure(einfo)
  304. self.assertTrue(mail_sent[0])
  305. einfo = get_ei()
  306. mail_sent[0] = False
  307. mytask.send_error_emails = True
  308. mytask.error_whitelist = [SyntaxError]
  309. tw.on_failure(einfo)
  310. self.assertFalse(mail_sent[0])
  311. finally:
  312. app.mail_admins = old_mail_admins
  313. mytask.send_error_emails = old_enable_mails
  314. mytask.error_whitelist = ()
  315. def test_already_revoked(self):
  316. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  317. tw._already_revoked = True
  318. self.assertTrue(tw.revoked())
  319. def test_revoked(self):
  320. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  321. with assert_signal_called(task_revoked, sender=tw.task,
  322. terminated=False,
  323. expired=False,
  324. signum=None):
  325. revoked.add(tw.id)
  326. self.assertTrue(tw.revoked())
  327. self.assertTrue(tw._already_revoked)
  328. self.assertTrue(tw.acknowledged)
  329. def test_execute_does_not_execute_revoked(self):
  330. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  331. revoked.add(tw.id)
  332. tw.execute()
  333. def test_execute_acks_late(self):
  334. mytask_raising.acks_late = True
  335. tw = TaskRequest(mytask_raising.name, uuid(), [1])
  336. try:
  337. tw.execute()
  338. self.assertTrue(tw.acknowledged)
  339. tw.task.accept_magic_kwargs = False
  340. tw.execute()
  341. finally:
  342. mytask_raising.acks_late = False
  343. def test_execute_using_pool_does_not_execute_revoked(self):
  344. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  345. revoked.add(tw.id)
  346. tw.execute_using_pool(None)
  347. def test_on_accepted_acks_early(self):
  348. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  349. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  350. self.assertTrue(tw.acknowledged)
  351. prev, module._does_debug = module._does_debug, False
  352. try:
  353. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  354. finally:
  355. module._does_debug = prev
  356. def test_on_accepted_acks_late(self):
  357. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  358. mytask.acks_late = True
  359. try:
  360. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  361. self.assertFalse(tw.acknowledged)
  362. finally:
  363. mytask.acks_late = False
  364. def test_on_accepted_terminates(self):
  365. signum = signal.SIGKILL
  366. pool = Mock()
  367. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  368. with assert_signal_called(task_revoked, sender=tw.task,
  369. terminated=True,
  370. expired=False,
  371. signum=signum):
  372. tw.terminate(pool, signal='KILL')
  373. self.assertFalse(pool.terminate_job.call_count)
  374. tw.on_accepted(pid=314, time_accepted=time.time())
  375. pool.terminate_job.assert_called_with(314, signum)
  376. def test_on_success_acks_early(self):
  377. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  378. tw.time_start = 1
  379. tw.on_success(42)
  380. prev, module._does_info = module._does_info, False
  381. try:
  382. tw.on_success(42)
  383. self.assertFalse(tw.acknowledged)
  384. finally:
  385. module._does_info = prev
  386. def test_on_success_BaseException(self):
  387. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  388. tw.time_start = 1
  389. with self.assertRaises(SystemExit):
  390. try:
  391. raise SystemExit()
  392. except SystemExit:
  393. tw.on_success(ExceptionInfo())
  394. else:
  395. assert False
  396. def test_on_success_eventer(self):
  397. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  398. tw.time_start = 1
  399. tw.eventer = Mock()
  400. tw.send_event = Mock()
  401. tw.on_success(42)
  402. self.assertTrue(tw.send_event.called)
  403. def test_on_success_when_failure(self):
  404. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  405. tw.time_start = 1
  406. tw.on_failure = Mock()
  407. try:
  408. raise KeyError('foo')
  409. except Exception:
  410. tw.on_success(ExceptionInfo())
  411. self.assertTrue(tw.on_failure.called)
  412. def test_on_success_acks_late(self):
  413. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  414. tw.time_start = 1
  415. mytask.acks_late = True
  416. try:
  417. tw.on_success(42)
  418. self.assertTrue(tw.acknowledged)
  419. finally:
  420. mytask.acks_late = False
  421. def test_on_failure_WorkerLostError(self):
  422. def get_ei():
  423. try:
  424. raise WorkerLostError('do re mi')
  425. except WorkerLostError:
  426. return ExceptionInfo()
  427. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  428. exc_info = get_ei()
  429. tw.on_failure(exc_info)
  430. self.assertEqual(mytask.backend.get_status(tw.id),
  431. states.FAILURE)
  432. mytask.ignore_result = True
  433. try:
  434. exc_info = get_ei()
  435. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  436. tw.on_failure(exc_info)
  437. self.assertEqual(mytask.backend.get_status(tw.id),
  438. states.PENDING)
  439. finally:
  440. mytask.ignore_result = False
  441. def test_on_failure_acks_late(self):
  442. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  443. tw.time_start = 1
  444. mytask.acks_late = True
  445. try:
  446. try:
  447. raise KeyError('foo')
  448. except KeyError:
  449. exc_info = ExceptionInfo()
  450. tw.on_failure(exc_info)
  451. self.assertTrue(tw.acknowledged)
  452. finally:
  453. mytask.acks_late = False
  454. def test_from_message_invalid_kwargs(self):
  455. body = dict(task=mytask.name, id=1, args=(), kwargs='foo')
  456. with self.assertRaises(InvalidTaskError):
  457. TaskRequest.from_message(None, body)
  458. @patch('celery.worker.job.error')
  459. @patch('celery.worker.job.warn')
  460. def test_on_timeout(self, warn, error):
  461. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  462. tw.on_timeout(soft=True, timeout=1337)
  463. self.assertIn('Soft time limit', warn.call_args[0][0])
  464. tw.on_timeout(soft=False, timeout=1337)
  465. self.assertIn('Hard time limit', error.call_args[0][0])
  466. self.assertEqual(mytask.backend.get_status(tw.id),
  467. states.FAILURE)
  468. mytask.ignore_result = True
  469. try:
  470. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  471. tw.on_timeout(soft=True, timeout=1336)
  472. self.assertEqual(mytask.backend.get_status(tw.id),
  473. states.PENDING)
  474. finally:
  475. mytask.ignore_result = False
  476. def test_trace_task_ret(self):
  477. mytask.__trace__ = build_tracer(mytask.name, mytask,
  478. current_app.loader, 'test')
  479. res = trace_task_ret(mytask.name, uuid(), [4], {})
  480. self.assertEqual(res, 4 ** 4)
  481. def test_execute_safe_catches_exception(self):
  482. def _error_exec(self, *args, **kwargs):
  483. raise KeyError('baz')
  484. @task_dec(request=None)
  485. def raising():
  486. raise KeyError('baz')
  487. with self.assertWarnsRegex(RuntimeWarning,
  488. r'Exception raised outside'):
  489. res = trace_task(raising, uuid(), [], {})
  490. self.assertIsInstance(res, ExceptionInfo)
  491. def test_worker_task_trace_handle_retry(self):
  492. from celery.exceptions import RetryTaskError
  493. tid = uuid()
  494. mytask.request.update({'id': tid})
  495. try:
  496. raise ValueError('foo')
  497. except Exception, exc:
  498. try:
  499. raise RetryTaskError(str(exc), exc=exc)
  500. except RetryTaskError, exc:
  501. w = TraceInfo(states.RETRY, exc)
  502. w.handle_retry(mytask, store_errors=False)
  503. self.assertEqual(mytask.backend.get_status(tid),
  504. states.PENDING)
  505. w.handle_retry(mytask, store_errors=True)
  506. self.assertEqual(mytask.backend.get_status(tid),
  507. states.RETRY)
  508. finally:
  509. mytask.request.clear()
  510. def test_worker_task_trace_handle_failure(self):
  511. tid = uuid()
  512. mytask.request.update({'id': tid})
  513. try:
  514. try:
  515. raise ValueError('foo')
  516. except Exception, exc:
  517. w = TraceInfo(states.FAILURE, exc)
  518. w.handle_failure(mytask, store_errors=False)
  519. self.assertEqual(mytask.backend.get_status(tid),
  520. states.PENDING)
  521. w.handle_failure(mytask, store_errors=True)
  522. self.assertEqual(mytask.backend.get_status(tid),
  523. states.FAILURE)
  524. finally:
  525. mytask.request.clear()
  526. def test_task_wrapper_mail_attrs(self):
  527. tw = TaskRequest(mytask.name, uuid(), [], {})
  528. x = tw.success_msg % {'name': tw.name,
  529. 'id': tw.id,
  530. 'return_value': 10,
  531. 'runtime': 0.3641}
  532. self.assertTrue(x)
  533. x = tw.error_msg % {'name': tw.name,
  534. 'id': tw.id,
  535. 'exc': 'FOOBARBAZ',
  536. 'traceback': 'foobarbaz'}
  537. self.assertTrue(x)
  538. def test_from_message(self):
  539. us = u'æØåveéðƒeæ'
  540. body = {'task': mytask.name, 'id': uuid(),
  541. 'args': [2], 'kwargs': {us: 'bar'}}
  542. m = Message(None, body=anyjson.dumps(body), backend='foo',
  543. content_type='application/json',
  544. content_encoding='utf-8')
  545. tw = TaskRequest.from_message(m, m.decode())
  546. self.assertIsInstance(tw, Request)
  547. self.assertEqual(tw.name, body['task'])
  548. self.assertEqual(tw.id, body['id'])
  549. self.assertEqual(tw.args, body['args'])
  550. us = from_utf8(us)
  551. if sys.version_info < (2, 6):
  552. self.assertEqual(tw.kwargs.keys()[0], us)
  553. self.assertIsInstance(tw.kwargs.keys()[0], str)
  554. def test_from_message_empty_args(self):
  555. body = {'task': mytask.name, 'id': uuid()}
  556. m = Message(None, body=anyjson.dumps(body), backend='foo',
  557. content_type='application/json',
  558. content_encoding='utf-8')
  559. tw = TaskRequest.from_message(m, m.decode())
  560. self.assertIsInstance(tw, Request)
  561. self.assertEquals(tw.args, [])
  562. self.assertEquals(tw.kwargs, {})
  563. def test_from_message_missing_required_fields(self):
  564. body = {}
  565. m = Message(None, body=anyjson.dumps(body), backend='foo',
  566. content_type='application/json',
  567. content_encoding='utf-8')
  568. with self.assertRaises(KeyError):
  569. TaskRequest.from_message(m, m.decode())
  570. def test_from_message_nonexistant_task(self):
  571. body = {'task': 'cu.mytask.doesnotexist', 'id': uuid(),
  572. 'args': [2], 'kwargs': {u'æØåveéðƒeæ': 'bar'}}
  573. m = Message(None, body=anyjson.dumps(body), backend='foo',
  574. content_type='application/json',
  575. content_encoding='utf-8')
  576. with self.assertRaises(KeyError):
  577. TaskRequest.from_message(m, m.decode())
  578. def test_execute(self):
  579. tid = uuid()
  580. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  581. self.assertEqual(tw.execute(), 256)
  582. meta = mytask.backend.get_task_meta(tid)
  583. self.assertEqual(meta['result'], 256)
  584. self.assertEqual(meta['status'], states.SUCCESS)
  585. def test_execute_success_no_kwargs(self):
  586. tid = uuid()
  587. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  588. self.assertEqual(tw.execute(), 256)
  589. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  590. self.assertEqual(meta['result'], 256)
  591. self.assertEqual(meta['status'], states.SUCCESS)
  592. def test_execute_success_some_kwargs(self):
  593. tid = uuid()
  594. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  595. self.assertEqual(tw.execute(), 256)
  596. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  597. self.assertEqual(some_kwargs_scratchpad.get('task_id'), tid)
  598. self.assertEqual(meta['result'], 256)
  599. self.assertEqual(meta['status'], states.SUCCESS)
  600. def test_execute_ack(self):
  601. tid = uuid()
  602. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'},
  603. on_ack=on_ack)
  604. self.assertEqual(tw.execute(), 256)
  605. meta = mytask.backend.get_task_meta(tid)
  606. self.assertTrue(scratch['ACK'])
  607. self.assertEqual(meta['result'], 256)
  608. self.assertEqual(meta['status'], states.SUCCESS)
  609. def test_execute_fail(self):
  610. tid = uuid()
  611. tw = TaskRequest(mytask_raising.name, tid, [4])
  612. self.assertIsInstance(tw.execute(), ExceptionInfo)
  613. meta = mytask_raising.backend.get_task_meta(tid)
  614. self.assertEqual(meta['status'], states.FAILURE)
  615. self.assertIsInstance(meta['result'], KeyError)
  616. def test_execute_using_pool(self):
  617. tid = uuid()
  618. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  619. class MockPool(BasePool):
  620. target = None
  621. args = None
  622. kwargs = None
  623. def __init__(self, *args, **kwargs):
  624. pass
  625. def apply_async(self, target, args=None, kwargs=None,
  626. *margs, **mkwargs):
  627. self.target = target
  628. self.args = args
  629. self.kwargs = kwargs
  630. p = MockPool()
  631. tw.execute_using_pool(p)
  632. self.assertTrue(p.target)
  633. self.assertEqual(p.args[0], mytask.name)
  634. self.assertEqual(p.args[1], tid)
  635. self.assertEqual(p.args[2], [4])
  636. self.assertIn('f', p.args[3])
  637. self.assertIn([4], p.args)
  638. tw.task.accept_magic_kwargs = False
  639. tw.execute_using_pool(p)
  640. def test_default_kwargs(self):
  641. tid = uuid()
  642. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  643. self.assertDictEqual(
  644. tw.extend_with_default_kwargs(), {
  645. 'f': 'x',
  646. 'logfile': None,
  647. 'loglevel': None,
  648. 'task_id': tw.id,
  649. 'task_retries': 0,
  650. 'task_is_eager': False,
  651. 'delivery_info': {'exchange': None, 'routing_key': None},
  652. 'task_name': tw.name})
  653. @patch('celery.worker.job.logger')
  654. def _test_on_failure(self, exception, logger):
  655. app = app_or_default()
  656. tid = uuid()
  657. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  658. try:
  659. raise exception
  660. except Exception:
  661. exc_info = ExceptionInfo()
  662. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  663. try:
  664. tw.on_failure(exc_info)
  665. self.assertTrue(logger.log.called)
  666. context = logger.log.call_args[0][2]
  667. self.assertEqual(mytask.name, context['name'])
  668. self.assertIn(tid, context['id'])
  669. finally:
  670. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  671. def test_on_failure(self):
  672. self._test_on_failure(Exception('Inside unit tests'))
  673. def test_on_failure_unicode_exception(self):
  674. self._test_on_failure(Exception(u'Бобры атакуют'))
  675. def test_on_failure_utf8_exception(self):
  676. self._test_on_failure(Exception(
  677. from_utf8(u'Бобры атакуют')))