test_request.py 28 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import anyjson
  4. import os
  5. import signal
  6. import sys
  7. import time
  8. from datetime import datetime, timedelta
  9. from kombu.transport.base import Message
  10. from kombu.utils.encoding import from_utf8, default_encode
  11. from mock import Mock, patch
  12. from nose import SkipTest
  13. from celery import states
  14. from celery.concurrency.base import BasePool
  15. from celery.datastructures import ExceptionInfo
  16. from celery.exceptions import (
  17. RetryTaskError,
  18. WorkerLostError,
  19. InvalidTaskError,
  20. TaskRevokedError,
  21. )
  22. from celery.five import keys
  23. from celery.task.trace import (
  24. trace_task,
  25. _trace_task_ret,
  26. TraceInfo,
  27. mro_lookup,
  28. build_tracer,
  29. setup_worker_optimizations,
  30. reset_worker_optimizations,
  31. )
  32. from celery.result import AsyncResult
  33. from celery.signals import task_revoked
  34. from celery.task import task as task_dec
  35. from celery.task.base import Task
  36. from celery.utils import uuid
  37. from celery.worker import job as module
  38. from celery.worker.job import Request, TaskRequest
  39. from celery.worker.state import revoked
  40. from celery.tests.utils import AppCase, Case, assert_signal_called
  41. scratch = {'ACK': False}
  42. some_kwargs_scratchpad = {}
  43. class test_mro_lookup(Case):
  44. def test_order(self):
  45. class A(object):
  46. pass
  47. class B(A):
  48. pass
  49. class C(B):
  50. pass
  51. class D(C):
  52. @classmethod
  53. def mro(cls):
  54. return ()
  55. A.x = 10
  56. self.assertEqual(mro_lookup(C, 'x'), A)
  57. self.assertIsNone(mro_lookup(C, 'x', stop=(A, )))
  58. B.x = 10
  59. self.assertEqual(mro_lookup(C, 'x'), B)
  60. C.x = 10
  61. self.assertEqual(mro_lookup(C, 'x'), C)
  62. self.assertIsNone(mro_lookup(D, 'x'))
  63. def jail(app, task_id, name, args, kwargs):
  64. request = {'id': task_id}
  65. task = app.tasks[name]
  66. task.__trace__ = None # rebuild
  67. return trace_task(
  68. task, task_id, args, kwargs, request=request, eager=False,
  69. )
  70. def on_ack(*args, **kwargs):
  71. scratch['ACK'] = True
  72. @task_dec(accept_magic_kwargs=False)
  73. def mytask(i, **kwargs):
  74. return i ** i
  75. @task_dec # traverses coverage for decorator without parens
  76. def mytask_no_kwargs(i):
  77. return i ** i
  78. class MyTaskIgnoreResult(Task):
  79. ignore_result = True
  80. def run(self, i):
  81. return i ** i
  82. @task_dec(accept_magic_kwargs=True)
  83. def mytask_some_kwargs(i, task_id):
  84. some_kwargs_scratchpad['task_id'] = task_id
  85. return i ** i
  86. @task_dec(accept_magic_kwargs=False)
  87. def mytask_raising(i):
  88. raise KeyError(i)
  89. class test_default_encode(AppCase):
  90. def setup(self):
  91. if sys.version_info >= (3, 0):
  92. raise SkipTest('py3k: not relevant')
  93. def test_jython(self):
  94. prev, sys.platform = sys.platform, 'java 1.6.1'
  95. try:
  96. self.assertEqual(default_encode(bytes('foo')), 'foo')
  97. finally:
  98. sys.platform = prev
  99. def test_cpython(self):
  100. prev, sys.platform = sys.platform, 'darwin'
  101. gfe, sys.getfilesystemencoding = (
  102. sys.getfilesystemencoding,
  103. lambda: 'utf-8',
  104. )
  105. try:
  106. self.assertEqual(default_encode(bytes('foo')), 'foo')
  107. finally:
  108. sys.platform = prev
  109. sys.getfilesystemencoding = gfe
  110. class test_RetryTaskError(AppCase):
  111. def test_retry_task_error(self):
  112. try:
  113. raise Exception('foo')
  114. except Exception as exc:
  115. ret = RetryTaskError('Retrying task', exc)
  116. self.assertEqual(ret.exc, exc)
  117. class test_trace_task(AppCase):
  118. @patch('celery.task.trace._logger')
  119. def test_process_cleanup_fails(self, _logger):
  120. backend = mytask.backend
  121. mytask.backend = Mock()
  122. mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  123. try:
  124. tid = uuid()
  125. ret = jail(self.app, tid, mytask.name, [2], {})
  126. self.assertEqual(ret, 4)
  127. mytask.backend.store_result.assert_called_with(tid, 4,
  128. states.SUCCESS)
  129. self.assertIn('Process cleanup failed',
  130. _logger.error.call_args[0][0])
  131. finally:
  132. mytask.backend = backend
  133. def test_process_cleanup_BaseException(self):
  134. backend = mytask.backend
  135. mytask.backend = Mock()
  136. mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  137. try:
  138. with self.assertRaises(SystemExit):
  139. jail(self.app, uuid(), mytask.name, [2], {})
  140. finally:
  141. mytask.backend = backend
  142. def test_execute_jail_success(self):
  143. ret = jail(self.app, uuid(), mytask.name, [2], {})
  144. self.assertEqual(ret, 4)
  145. def test_marked_as_started(self):
  146. class Backend(mytask.backend.__class__):
  147. _started = []
  148. def store_result(self, tid, meta, state):
  149. if state == states.STARTED:
  150. self._started.append(tid)
  151. prev, mytask.backend = mytask.backend, Backend()
  152. mytask.track_started = True
  153. try:
  154. tid = uuid()
  155. jail(self.app, tid, mytask.name, [2], {})
  156. self.assertIn(tid, Backend._started)
  157. mytask.ignore_result = True
  158. tid = uuid()
  159. jail(self.app, tid, mytask.name, [2], {})
  160. self.assertNotIn(tid, Backend._started)
  161. finally:
  162. mytask.backend = prev
  163. mytask.track_started = False
  164. mytask.ignore_result = False
  165. def test_execute_jail_failure(self):
  166. ret = jail(self.app, uuid(), mytask_raising.name,
  167. [4], {})
  168. self.assertIsInstance(ret, ExceptionInfo)
  169. self.assertTupleEqual(ret.exception.args, (4, ))
  170. def test_execute_ignore_result(self):
  171. task_id = uuid()
  172. ret = jail(self.app, task_id, MyTaskIgnoreResult.name, [4], {})
  173. self.assertEqual(ret, 256)
  174. self.assertFalse(AsyncResult(task_id).ready())
  175. class MockEventDispatcher(object):
  176. def __init__(self):
  177. self.sent = []
  178. self.enabled = True
  179. def send(self, event, **fields):
  180. self.sent.append(event)
  181. class test_TaskRequest(AppCase):
  182. def test_task_wrapper_repr(self):
  183. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  184. self.assertTrue(repr(tw))
  185. @patch('celery.worker.job.kwdict')
  186. def test_kwdict(self, kwdict):
  187. prev, module.NEEDS_KWDICT = module.NEEDS_KWDICT, True
  188. try:
  189. TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  190. self.assertTrue(kwdict.called)
  191. finally:
  192. module.NEEDS_KWDICT = prev
  193. def test_sets_store_errors(self):
  194. mytask.ignore_result = True
  195. try:
  196. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  197. self.assertFalse(tw.store_errors)
  198. mytask.store_errors_even_if_ignored = True
  199. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  200. self.assertTrue(tw.store_errors)
  201. finally:
  202. mytask.ignore_result = False
  203. mytask.store_errors_even_if_ignored = False
  204. def test_send_event(self):
  205. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  206. tw.eventer = MockEventDispatcher()
  207. tw.send_event('task-frobulated')
  208. self.assertIn('task-frobulated', tw.eventer.sent)
  209. def test_on_retry(self):
  210. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  211. tw.eventer = MockEventDispatcher()
  212. try:
  213. raise RetryTaskError('foo', KeyError('moofoobar'))
  214. except:
  215. einfo = ExceptionInfo()
  216. tw.on_failure(einfo)
  217. self.assertIn('task-retried', tw.eventer.sent)
  218. prev, module._does_info = module._does_info, False
  219. try:
  220. tw.on_failure(einfo)
  221. finally:
  222. module._does_info = prev
  223. einfo.internal = True
  224. tw.on_failure(einfo)
  225. def test_compat_properties(self):
  226. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  227. self.assertEqual(tw.task_id, tw.id)
  228. self.assertEqual(tw.task_name, tw.name)
  229. tw.task_id = 'ID'
  230. self.assertEqual(tw.id, 'ID')
  231. tw.task_name = 'NAME'
  232. self.assertEqual(tw.name, 'NAME')
  233. def test_terminate__task_started(self):
  234. pool = Mock()
  235. signum = signal.SIGKILL
  236. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  237. with assert_signal_called(task_revoked, sender=tw.task,
  238. terminated=True,
  239. expired=False,
  240. signum=signum):
  241. tw.time_start = time.time()
  242. tw.worker_pid = 313
  243. tw.terminate(pool, signal='KILL')
  244. pool.terminate_job.assert_called_with(tw.worker_pid, signum)
  245. def test_terminate__task_reserved(self):
  246. pool = Mock()
  247. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  248. tw.time_start = None
  249. tw.terminate(pool, signal='KILL')
  250. self.assertFalse(pool.terminate_job.called)
  251. self.assertTupleEqual(tw._terminate_on_ack, (pool, 'KILL'))
  252. tw.terminate(pool, signal='KILL')
  253. def test_revoked_expires_expired(self):
  254. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
  255. expires=datetime.utcnow() - timedelta(days=1))
  256. with assert_signal_called(task_revoked, sender=tw.task,
  257. terminated=False,
  258. expired=True,
  259. signum=None):
  260. tw.revoked()
  261. self.assertIn(tw.id, revoked)
  262. self.assertEqual(mytask.backend.get_status(tw.id),
  263. states.REVOKED)
  264. def test_revoked_expires_not_expired(self):
  265. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
  266. expires=datetime.utcnow() + timedelta(days=1))
  267. tw.revoked()
  268. self.assertNotIn(tw.id, revoked)
  269. self.assertNotEqual(
  270. mytask.backend.get_status(tw.id),
  271. states.REVOKED,
  272. )
  273. def test_revoked_expires_ignore_result(self):
  274. mytask.ignore_result = True
  275. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
  276. expires=datetime.utcnow() - timedelta(days=1))
  277. try:
  278. tw.revoked()
  279. self.assertIn(tw.id, revoked)
  280. self.assertNotEqual(mytask.backend.get_status(tw.id),
  281. states.REVOKED)
  282. finally:
  283. mytask.ignore_result = False
  284. def test_send_email(self):
  285. app = self.app
  286. old_mail_admins = app.mail_admins
  287. old_enable_mails = mytask.send_error_emails
  288. mail_sent = [False]
  289. def mock_mail_admins(*args, **kwargs):
  290. mail_sent[0] = True
  291. def get_ei():
  292. try:
  293. raise KeyError('moofoobar')
  294. except:
  295. return ExceptionInfo()
  296. app.mail_admins = mock_mail_admins
  297. mytask.send_error_emails = True
  298. try:
  299. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  300. einfo = get_ei()
  301. tw.on_failure(einfo)
  302. self.assertTrue(mail_sent[0])
  303. einfo = get_ei()
  304. mail_sent[0] = False
  305. mytask.send_error_emails = False
  306. tw.on_failure(einfo)
  307. self.assertFalse(mail_sent[0])
  308. einfo = get_ei()
  309. mail_sent[0] = False
  310. mytask.send_error_emails = True
  311. tw.on_failure(einfo)
  312. self.assertTrue(mail_sent[0])
  313. finally:
  314. app.mail_admins = old_mail_admins
  315. mytask.send_error_emails = old_enable_mails
  316. def test_already_revoked(self):
  317. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  318. tw._already_revoked = True
  319. self.assertTrue(tw.revoked())
  320. def test_revoked(self):
  321. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  322. with assert_signal_called(task_revoked, sender=tw.task,
  323. terminated=False,
  324. expired=False,
  325. signum=None):
  326. revoked.add(tw.id)
  327. self.assertTrue(tw.revoked())
  328. self.assertTrue(tw._already_revoked)
  329. self.assertTrue(tw.acknowledged)
  330. def test_execute_does_not_execute_revoked(self):
  331. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  332. revoked.add(tw.id)
  333. tw.execute()
  334. def test_execute_acks_late(self):
  335. mytask_raising.acks_late = True
  336. tw = TaskRequest(mytask_raising.name, uuid(), [1])
  337. try:
  338. tw.execute()
  339. self.assertTrue(tw.acknowledged)
  340. tw.task.accept_magic_kwargs = False
  341. tw.execute()
  342. finally:
  343. mytask_raising.acks_late = False
  344. def test_execute_using_pool_does_not_execute_revoked(self):
  345. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  346. revoked.add(tw.id)
  347. with self.assertRaises(TaskRevokedError):
  348. tw.execute_using_pool(None)
  349. def test_on_accepted_acks_early(self):
  350. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  351. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  352. self.assertTrue(tw.acknowledged)
  353. prev, module._does_debug = module._does_debug, False
  354. try:
  355. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  356. finally:
  357. module._does_debug = prev
  358. def test_on_accepted_acks_late(self):
  359. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  360. mytask.acks_late = True
  361. try:
  362. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  363. self.assertFalse(tw.acknowledged)
  364. finally:
  365. mytask.acks_late = False
  366. def test_on_accepted_terminates(self):
  367. signum = signal.SIGKILL
  368. pool = Mock()
  369. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  370. with assert_signal_called(task_revoked, sender=tw.task,
  371. terminated=True,
  372. expired=False,
  373. signum=signum):
  374. tw.terminate(pool, signal='KILL')
  375. self.assertFalse(pool.terminate_job.call_count)
  376. tw.on_accepted(pid=314, time_accepted=time.time())
  377. pool.terminate_job.assert_called_with(314, signum)
  378. def test_on_success_acks_early(self):
  379. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  380. tw.time_start = 1
  381. tw.on_success(42)
  382. prev, module._does_info = module._does_info, False
  383. try:
  384. tw.on_success(42)
  385. self.assertFalse(tw.acknowledged)
  386. finally:
  387. module._does_info = prev
  388. def test_on_success_BaseException(self):
  389. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  390. tw.time_start = 1
  391. with self.assertRaises(SystemExit):
  392. try:
  393. raise SystemExit()
  394. except SystemExit:
  395. tw.on_success(ExceptionInfo())
  396. else:
  397. assert False
  398. def test_on_success_eventer(self):
  399. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  400. tw.time_start = 1
  401. tw.eventer = Mock()
  402. tw.send_event = Mock()
  403. tw.on_success(42)
  404. self.assertTrue(tw.send_event.called)
  405. def test_on_success_when_failure(self):
  406. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  407. tw.time_start = 1
  408. tw.on_failure = Mock()
  409. try:
  410. raise KeyError('foo')
  411. except Exception:
  412. tw.on_success(ExceptionInfo())
  413. self.assertTrue(tw.on_failure.called)
  414. def test_on_success_acks_late(self):
  415. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  416. tw.time_start = 1
  417. mytask.acks_late = True
  418. try:
  419. tw.on_success(42)
  420. self.assertTrue(tw.acknowledged)
  421. finally:
  422. mytask.acks_late = False
  423. def test_on_failure_WorkerLostError(self):
  424. def get_ei():
  425. try:
  426. raise WorkerLostError('do re mi')
  427. except WorkerLostError:
  428. return ExceptionInfo()
  429. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  430. exc_info = get_ei()
  431. tw.on_failure(exc_info)
  432. self.assertEqual(mytask.backend.get_status(tw.id),
  433. states.FAILURE)
  434. mytask.ignore_result = True
  435. try:
  436. exc_info = get_ei()
  437. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  438. tw.on_failure(exc_info)
  439. self.assertEqual(mytask.backend.get_status(tw.id),
  440. states.PENDING)
  441. finally:
  442. mytask.ignore_result = False
  443. def test_on_failure_acks_late(self):
  444. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  445. tw.time_start = 1
  446. mytask.acks_late = True
  447. try:
  448. try:
  449. raise KeyError('foo')
  450. except KeyError:
  451. exc_info = ExceptionInfo()
  452. tw.on_failure(exc_info)
  453. self.assertTrue(tw.acknowledged)
  454. finally:
  455. mytask.acks_late = False
  456. def test_from_message_invalid_kwargs(self):
  457. body = dict(task=mytask.name, id=1, args=(), kwargs='foo')
  458. with self.assertRaises(InvalidTaskError):
  459. TaskRequest.from_message(None, body)
  460. @patch('celery.worker.job.error')
  461. @patch('celery.worker.job.warn')
  462. def test_on_timeout(self, warn, error):
  463. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  464. tw.on_timeout(soft=True, timeout=1337)
  465. self.assertIn('Soft time limit', warn.call_args[0][0])
  466. tw.on_timeout(soft=False, timeout=1337)
  467. self.assertIn('Hard time limit', error.call_args[0][0])
  468. self.assertEqual(mytask.backend.get_status(tw.id),
  469. states.FAILURE)
  470. mytask.ignore_result = True
  471. try:
  472. tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
  473. tw.on_timeout(soft=True, timeout=1336)
  474. self.assertEqual(mytask.backend.get_status(tw.id),
  475. states.PENDING)
  476. finally:
  477. mytask.ignore_result = False
  478. def test_fast_trace_task(self):
  479. from celery.task import trace
  480. setup_worker_optimizations(self.app)
  481. self.assertIs(trace.trace_task_ret, trace._fast_trace_task)
  482. try:
  483. mytask.__trace__ = build_tracer(mytask.name, mytask,
  484. self.app.loader, 'test')
  485. res = trace.trace_task_ret(mytask.name, uuid(), [4], {})
  486. self.assertEqual(res, 4 ** 4)
  487. finally:
  488. reset_worker_optimizations()
  489. self.assertIs(trace.trace_task_ret, trace._trace_task_ret)
  490. delattr(mytask, '__trace__')
  491. res = trace.trace_task_ret(mytask.name, uuid(), [4], {})
  492. self.assertEqual(res, 4 ** 4)
  493. def test_trace_task_ret(self):
  494. mytask.__trace__ = build_tracer(mytask.name, mytask,
  495. self.app.loader, 'test')
  496. res = _trace_task_ret(mytask.name, uuid(), [4], {})
  497. self.assertEqual(res, 4 ** 4)
  498. def test_trace_task_ret__no_trace(self):
  499. try:
  500. delattr(mytask, '__trace__')
  501. except AttributeError:
  502. pass
  503. res = _trace_task_ret(mytask.name, uuid(), [4], {})
  504. self.assertEqual(res, 4 ** 4)
  505. def test_execute_safe_catches_exception(self):
  506. def _error_exec(self, *args, **kwargs):
  507. raise KeyError('baz')
  508. @task_dec(request=None)
  509. def raising():
  510. raise KeyError('baz')
  511. with self.assertWarnsRegex(
  512. RuntimeWarning, r'Exception raised outside'):
  513. res = trace_task(raising, uuid(), [], {})
  514. self.assertIsInstance(res, ExceptionInfo)
  515. def test_worker_task_trace_handle_retry(self):
  516. from celery.exceptions import RetryTaskError
  517. tid = uuid()
  518. mytask.push_request(id=tid)
  519. try:
  520. raise ValueError('foo')
  521. except Exception as exc:
  522. try:
  523. raise RetryTaskError(str(exc), exc=exc)
  524. except RetryTaskError as exc:
  525. w = TraceInfo(states.RETRY, exc)
  526. w.handle_retry(mytask, store_errors=False)
  527. self.assertEqual(mytask.backend.get_status(tid),
  528. states.PENDING)
  529. w.handle_retry(mytask, store_errors=True)
  530. self.assertEqual(mytask.backend.get_status(tid),
  531. states.RETRY)
  532. finally:
  533. mytask.pop_request()
  534. def test_worker_task_trace_handle_failure(self):
  535. tid = uuid()
  536. mytask.push_request()
  537. try:
  538. mytask.request.id = tid
  539. try:
  540. raise ValueError('foo')
  541. except Exception as exc:
  542. w = TraceInfo(states.FAILURE, exc)
  543. w.handle_failure(mytask, store_errors=False)
  544. self.assertEqual(mytask.backend.get_status(tid),
  545. states.PENDING)
  546. w.handle_failure(mytask, store_errors=True)
  547. self.assertEqual(mytask.backend.get_status(tid),
  548. states.FAILURE)
  549. finally:
  550. mytask.pop_request()
  551. def test_task_wrapper_mail_attrs(self):
  552. tw = TaskRequest(mytask.name, uuid(), [], {})
  553. x = tw.success_msg % {
  554. 'name': tw.name,
  555. 'id': tw.id,
  556. 'return_value': 10,
  557. 'runtime': 0.3641,
  558. }
  559. self.assertTrue(x)
  560. x = tw.error_msg % {
  561. 'name': tw.name,
  562. 'id': tw.id,
  563. 'exc': 'FOOBARBAZ',
  564. 'traceback': 'foobarbaz',
  565. }
  566. self.assertTrue(x)
  567. def test_from_message(self):
  568. us = 'æØåveéðƒeæ'
  569. body = {'task': mytask.name, 'id': uuid(),
  570. 'args': [2], 'kwargs': {us: 'bar'}}
  571. m = Message(None, body=anyjson.dumps(body), backend='foo',
  572. content_type='application/json',
  573. content_encoding='utf-8')
  574. tw = TaskRequest.from_message(m, m.decode())
  575. self.assertIsInstance(tw, Request)
  576. self.assertEqual(tw.name, body['task'])
  577. self.assertEqual(tw.id, body['id'])
  578. self.assertEqual(tw.args, body['args'])
  579. us = from_utf8(us)
  580. if sys.version_info < (2, 6):
  581. self.assertEqual(next(keys(tw.kwargs)), us)
  582. self.assertIsInstance(next(keys(tw.kwargs)), str)
  583. def test_from_message_empty_args(self):
  584. body = {'task': mytask.name, 'id': uuid()}
  585. m = Message(None, body=anyjson.dumps(body), backend='foo',
  586. content_type='application/json',
  587. content_encoding='utf-8')
  588. tw = TaskRequest.from_message(m, m.decode())
  589. self.assertIsInstance(tw, Request)
  590. self.assertEquals(tw.args, [])
  591. self.assertEquals(tw.kwargs, {})
  592. def test_from_message_missing_required_fields(self):
  593. body = {}
  594. m = Message(None, body=anyjson.dumps(body), backend='foo',
  595. content_type='application/json',
  596. content_encoding='utf-8')
  597. with self.assertRaises(KeyError):
  598. TaskRequest.from_message(m, m.decode())
  599. def test_from_message_nonexistant_task(self):
  600. body = {'task': 'cu.mytask.doesnotexist', 'id': uuid(),
  601. 'args': [2], 'kwargs': {'æØåveéðƒeæ': 'bar'}}
  602. m = Message(None, body=anyjson.dumps(body), backend='foo',
  603. content_type='application/json',
  604. content_encoding='utf-8')
  605. with self.assertRaises(KeyError):
  606. TaskRequest.from_message(m, m.decode())
  607. def test_execute(self):
  608. tid = uuid()
  609. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  610. self.assertEqual(tw.execute(), 256)
  611. meta = mytask.backend.get_task_meta(tid)
  612. self.assertEqual(meta['result'], 256)
  613. self.assertEqual(meta['status'], states.SUCCESS)
  614. def test_execute_success_no_kwargs(self):
  615. tid = uuid()
  616. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  617. self.assertEqual(tw.execute(), 256)
  618. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  619. self.assertEqual(meta['result'], 256)
  620. self.assertEqual(meta['status'], states.SUCCESS)
  621. def test_execute_success_some_kwargs(self):
  622. tid = uuid()
  623. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  624. self.assertEqual(tw.execute(), 256)
  625. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  626. self.assertEqual(some_kwargs_scratchpad.get('task_id'), tid)
  627. self.assertEqual(meta['result'], 256)
  628. self.assertEqual(meta['status'], states.SUCCESS)
  629. def test_execute_ack(self):
  630. tid = uuid()
  631. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'},
  632. on_ack=on_ack)
  633. self.assertEqual(tw.execute(), 256)
  634. meta = mytask.backend.get_task_meta(tid)
  635. self.assertTrue(scratch['ACK'])
  636. self.assertEqual(meta['result'], 256)
  637. self.assertEqual(meta['status'], states.SUCCESS)
  638. def test_execute_fail(self):
  639. tid = uuid()
  640. tw = TaskRequest(mytask_raising.name, tid, [4])
  641. self.assertIsInstance(tw.execute(), ExceptionInfo)
  642. meta = mytask_raising.backend.get_task_meta(tid)
  643. self.assertEqual(meta['status'], states.FAILURE)
  644. self.assertIsInstance(meta['result'], KeyError)
  645. def test_execute_using_pool(self):
  646. tid = uuid()
  647. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  648. class MockPool(BasePool):
  649. target = None
  650. args = None
  651. kwargs = None
  652. def __init__(self, *args, **kwargs):
  653. pass
  654. def apply_async(self, target, args=None, kwargs=None,
  655. *margs, **mkwargs):
  656. self.target = target
  657. self.args = args
  658. self.kwargs = kwargs
  659. p = MockPool()
  660. tw.execute_using_pool(p)
  661. self.assertTrue(p.target)
  662. self.assertEqual(p.args[0], mytask.name)
  663. self.assertEqual(p.args[1], tid)
  664. self.assertEqual(p.args[2], [4])
  665. self.assertIn('f', p.args[3])
  666. self.assertIn([4], p.args)
  667. tw.task.accept_magic_kwargs = False
  668. tw.execute_using_pool(p)
  669. def test_default_kwargs(self):
  670. tid = uuid()
  671. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  672. self.assertDictEqual(
  673. tw.extend_with_default_kwargs(), {
  674. 'f': 'x',
  675. 'logfile': None,
  676. 'loglevel': None,
  677. 'task_id': tw.id,
  678. 'task_retries': 0,
  679. 'task_is_eager': False,
  680. 'delivery_info': {
  681. 'exchange': None,
  682. 'routing_key': None,
  683. 'priority': None,
  684. },
  685. 'task_name': tw.name})
  686. @patch('celery.worker.job.logger')
  687. def _test_on_failure(self, exception, logger):
  688. app = self.app
  689. tid = uuid()
  690. tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
  691. try:
  692. raise exception
  693. except Exception:
  694. exc_info = ExceptionInfo()
  695. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  696. try:
  697. tw.on_failure(exc_info)
  698. self.assertTrue(logger.log.called)
  699. context = logger.log.call_args[0][2]
  700. self.assertEqual(mytask.name, context['name'])
  701. self.assertIn(tid, context['id'])
  702. finally:
  703. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  704. def test_on_failure(self):
  705. self._test_on_failure(Exception('Inside unit tests'))
  706. def test_on_failure_unicode_exception(self):
  707. self._test_on_failure(Exception('Бобры атакуют'))
  708. def test_on_failure_utf8_exception(self):
  709. self._test_on_failure(Exception(
  710. from_utf8('Бобры атакуют')))