test_request.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. import anyjson
  5. import os
  6. import sys
  7. import time
  8. from datetime import datetime, timedelta
  9. from kombu.transport.base import Message
  10. from kombu.utils.encoding import from_utf8, default_encode
  11. from mock import Mock, patch
  12. from nose import SkipTest
  13. from celery import current_app
  14. from celery import states
  15. from celery.app import app_or_default
  16. from celery.concurrency.base import BasePool
  17. from celery.datastructures import ExceptionInfo
  18. from celery.exceptions import (RetryTaskError,
  19. WorkerLostError, InvalidTaskError)
  20. from celery.task.trace import eager_trace_task, TraceInfo, mro_lookup
  21. from celery.result import AsyncResult
  22. from celery.task import task as task_dec
  23. from celery.task.base import Task
  24. from celery.utils import uuid
  25. from celery.worker import job as module
  26. from celery.worker.job import Request, TaskRequest, execute_and_trace
  27. from celery.worker.state import revoked
  28. from celery.tests.utils import Case
  29. scratch = {"ACK": False}
  30. some_kwargs_scratchpad = {}
  31. class test_mro_lookup(Case):
  32. def test_order(self):
  33. class A(object):
  34. pass
  35. class B(A):
  36. pass
  37. class C(B):
  38. pass
  39. class D(C):
  40. @classmethod
  41. def mro(cls):
  42. return ()
  43. A.x = 10
  44. self.assertEqual(mro_lookup(C, "x"), A)
  45. self.assertIsNone(mro_lookup(C, "x", stop=(A, )))
  46. B.x = 10
  47. self.assertEqual(mro_lookup(C, "x"), B)
  48. C.x = 10
  49. self.assertEqual(mro_lookup(C, "x"), C)
  50. self.assertIsNone(mro_lookup(D, "x"))
  51. def jail(task_id, name, args, kwargs):
  52. return eager_trace_task(current_app.tasks[name],
  53. task_id, args, kwargs, eager=False)[0]
  54. def on_ack(*args, **kwargs):
  55. scratch["ACK"] = True
  56. @task_dec(accept_magic_kwargs=False)
  57. def mytask(i, **kwargs):
  58. return i ** i
  59. @task_dec # traverses coverage for decorator without parens
  60. def mytask_no_kwargs(i):
  61. return i ** i
  62. class MyTaskIgnoreResult(Task):
  63. ignore_result = True
  64. def run(self, i):
  65. return i ** i
  66. @task_dec(accept_magic_kwargs=True)
  67. def mytask_some_kwargs(i, logfile):
  68. some_kwargs_scratchpad["logfile"] = logfile
  69. return i ** i
  70. @task_dec(accept_magic_kwargs=False)
  71. def mytask_raising(i):
  72. raise KeyError(i)
  73. class test_default_encode(Case):
  74. def setUp(self):
  75. if sys.version_info >= (3, 0):
  76. raise SkipTest("py3k: not relevant")
  77. def test_jython(self):
  78. prev, sys.platform = sys.platform, "java 1.6.1"
  79. try:
  80. self.assertEqual(default_encode("foo"), "foo")
  81. finally:
  82. sys.platform = prev
  83. def test_cython(self):
  84. prev, sys.platform = sys.platform, "darwin"
  85. gfe, sys.getfilesystemencoding = sys.getfilesystemencoding, \
  86. lambda: "utf-8"
  87. try:
  88. self.assertEqual(default_encode("foo"), "foo")
  89. finally:
  90. sys.platform = prev
  91. sys.getfilesystemencoding = gfe
  92. class test_RetryTaskError(Case):
  93. def test_retry_task_error(self):
  94. try:
  95. raise Exception("foo")
  96. except Exception, exc:
  97. ret = RetryTaskError("Retrying task", exc)
  98. self.assertEqual(ret.exc, exc)
  99. class test_trace_task(Case):
  100. @patch("celery.task.trace._logger")
  101. def test_process_cleanup_fails(self, _logger):
  102. backend = mytask.backend
  103. mytask.backend = Mock()
  104. mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  105. try:
  106. tid = uuid()
  107. ret = jail(tid, mytask.name, [2], {})
  108. self.assertEqual(ret, 4)
  109. mytask.backend.store_result.assert_called_with(tid, 4,
  110. states.SUCCESS)
  111. self.assertIn("Process cleanup failed",
  112. _logger.error.call_args[0][0])
  113. finally:
  114. mytask.backend = backend
  115. def test_process_cleanup_BaseException(self):
  116. backend = mytask.backend
  117. mytask.backend = Mock()
  118. mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  119. try:
  120. with self.assertRaises(SystemExit):
  121. jail(uuid(), mytask.name, [2], {})
  122. finally:
  123. mytask.backend = backend
  124. def test_execute_jail_success(self):
  125. ret = jail(uuid(), mytask.name, [2], {})
  126. self.assertEqual(ret, 4)
  127. def test_marked_as_started(self):
  128. class Backend(mytask.backend.__class__):
  129. _started = []
  130. def store_result(self, tid, meta, state):
  131. if state == states.STARTED:
  132. self._started.append(tid)
  133. prev, mytask.backend = mytask.backend, Backend()
  134. mytask.track_started = True
  135. try:
  136. tid = uuid()
  137. jail(tid, mytask.name, [2], {})
  138. self.assertIn(tid, Backend._started)
  139. mytask.ignore_result = True
  140. tid = uuid()
  141. jail(tid, mytask.name, [2], {})
  142. self.assertNotIn(tid, Backend._started)
  143. finally:
  144. mytask.backend = prev
  145. mytask.track_started = False
  146. mytask.ignore_result = False
  147. def test_execute_jail_failure(self):
  148. u = uuid()
  149. mytask_raising.request.update({"id": u})
  150. try:
  151. ret = jail(u, mytask_raising.name,
  152. [4], {})
  153. self.assertIsInstance(ret, ExceptionInfo)
  154. self.assertTupleEqual(ret.exception.args, (4, ))
  155. finally:
  156. mytask_raising.request.clear()
  157. def test_execute_ignore_result(self):
  158. task_id = uuid()
  159. MyTaskIgnoreResult.request.update({"id": task_id})
  160. try:
  161. ret = jail(task_id, MyTaskIgnoreResult.name,
  162. [4], {})
  163. self.assertEqual(ret, 256)
  164. self.assertFalse(AsyncResult(task_id).ready())
  165. finally:
  166. MyTaskIgnoreResult.request.clear()
  167. class MockEventDispatcher(object):
  168. def __init__(self):
  169. self.sent = []
  170. self.enabled = True
  171. def send(self, event, **fields):
  172. self.sent.append(event)
  173. class test_TaskRequest(Case):
  174. def test_task_wrapper_repr(self):
  175. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  176. self.assertTrue(repr(tw))
  177. @patch("celery.worker.job.kwdict")
  178. def test_kwdict(self, kwdict):
  179. prev, module.NEEDS_KWDICT = module.NEEDS_KWDICT, True
  180. try:
  181. TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  182. self.assertTrue(kwdict.called)
  183. finally:
  184. module.NEEDS_KWDICT = prev
  185. def test_sets_store_errors(self):
  186. mytask.ignore_result = True
  187. try:
  188. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  189. self.assertFalse(tw.store_errors)
  190. mytask.store_errors_even_if_ignored = True
  191. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  192. self.assertTrue(tw.store_errors)
  193. finally:
  194. mytask.ignore_result = False
  195. mytask.store_errors_even_if_ignored = False
  196. def test_send_event(self):
  197. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  198. tw.eventer = MockEventDispatcher()
  199. tw.send_event("task-frobulated")
  200. self.assertIn("task-frobulated", tw.eventer.sent)
  201. def test_on_retry(self):
  202. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  203. tw.eventer = MockEventDispatcher()
  204. try:
  205. raise RetryTaskError("foo", KeyError("moofoobar"))
  206. except:
  207. einfo = ExceptionInfo(sys.exc_info())
  208. tw.on_failure(einfo)
  209. self.assertIn("task-retried", tw.eventer.sent)
  210. tw._does_info = False
  211. tw.on_failure(einfo)
  212. einfo.internal = True
  213. tw.on_failure(einfo)
  214. def test_compat_properties(self):
  215. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  216. self.assertEqual(tw.task_id, tw.id)
  217. self.assertEqual(tw.task_name, tw.name)
  218. tw.task_id = "ID"
  219. self.assertEqual(tw.id, "ID")
  220. tw.task_name = "NAME"
  221. self.assertEqual(tw.name, "NAME")
  222. def test_terminate__task_started(self):
  223. pool = Mock()
  224. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  225. tw.time_start = time.time()
  226. tw.worker_pid = 313
  227. tw.terminate(pool, signal="KILL")
  228. pool.terminate_job.assert_called_with(tw.worker_pid, "KILL")
  229. def test_terminate__task_reserved(self):
  230. pool = Mock()
  231. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  232. tw.time_start = None
  233. tw.terminate(pool, signal="KILL")
  234. self.assertFalse(pool.terminate_job.call_count)
  235. self.assertTupleEqual(tw._terminate_on_ack, (True, pool, "KILL"))
  236. tw.terminate(pool, signal="KILL")
  237. def test_revoked_expires_expired(self):
  238. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
  239. expires=datetime.utcnow() - timedelta(days=1))
  240. tw.revoked()
  241. self.assertIn(tw.id, revoked)
  242. self.assertEqual(mytask.backend.get_status(tw.id),
  243. states.REVOKED)
  244. def test_revoked_expires_not_expired(self):
  245. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
  246. expires=datetime.utcnow() + timedelta(days=1))
  247. tw.revoked()
  248. self.assertNotIn(tw.id, revoked)
  249. self.assertNotEqual(mytask.backend.get_status(tw.id),
  250. states.REVOKED)
  251. def test_revoked_expires_ignore_result(self):
  252. mytask.ignore_result = True
  253. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
  254. expires=datetime.utcnow() - timedelta(days=1))
  255. try:
  256. tw.revoked()
  257. self.assertIn(tw.id, revoked)
  258. self.assertNotEqual(mytask.backend.get_status(tw.id),
  259. states.REVOKED)
  260. finally:
  261. mytask.ignore_result = False
  262. def test_send_email(self):
  263. app = app_or_default()
  264. old_mail_admins = app.mail_admins
  265. old_enable_mails = mytask.send_error_emails
  266. mail_sent = [False]
  267. def mock_mail_admins(*args, **kwargs):
  268. mail_sent[0] = True
  269. def get_ei():
  270. try:
  271. raise KeyError("moofoobar")
  272. except:
  273. return ExceptionInfo(sys.exc_info())
  274. app.mail_admins = mock_mail_admins
  275. mytask.send_error_emails = True
  276. try:
  277. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  278. einfo = get_ei()
  279. tw.on_failure(einfo)
  280. self.assertTrue(mail_sent[0])
  281. einfo = get_ei()
  282. mail_sent[0] = False
  283. mytask.send_error_emails = False
  284. tw.on_failure(einfo)
  285. self.assertFalse(mail_sent[0])
  286. einfo = get_ei()
  287. mail_sent[0] = False
  288. mytask.send_error_emails = True
  289. mytask.error_whitelist = [KeyError]
  290. tw.on_failure(einfo)
  291. self.assertTrue(mail_sent[0])
  292. einfo = get_ei()
  293. mail_sent[0] = False
  294. mytask.send_error_emails = True
  295. mytask.error_whitelist = [SyntaxError]
  296. tw.on_failure(einfo)
  297. self.assertFalse(mail_sent[0])
  298. finally:
  299. app.mail_admins = old_mail_admins
  300. mytask.send_error_emails = old_enable_mails
  301. mytask.error_whitelist = ()
  302. def test_already_revoked(self):
  303. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  304. tw._already_revoked = True
  305. self.assertTrue(tw.revoked())
  306. def test_revoked(self):
  307. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  308. revoked.add(tw.id)
  309. self.assertTrue(tw.revoked())
  310. self.assertTrue(tw._already_revoked)
  311. self.assertTrue(tw.acknowledged)
  312. def test_execute_does_not_execute_revoked(self):
  313. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  314. revoked.add(tw.id)
  315. tw.execute()
  316. def test_execute_acks_late(self):
  317. mytask_raising.acks_late = True
  318. tw = TaskRequest(mytask_raising.name, uuid(), [1])
  319. try:
  320. tw.execute()
  321. self.assertTrue(tw.acknowledged)
  322. tw.task.accept_magic_kwargs = False
  323. tw.execute()
  324. finally:
  325. mytask_raising.acks_late = False
  326. def test_execute_using_pool_does_not_execute_revoked(self):
  327. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  328. revoked.add(tw.id)
  329. tw.execute_using_pool(None)
  330. def test_on_accepted_acks_early(self):
  331. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  332. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  333. self.assertTrue(tw.acknowledged)
  334. tw._does_debug = False
  335. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  336. def test_on_accepted_acks_late(self):
  337. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  338. mytask.acks_late = True
  339. try:
  340. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  341. self.assertFalse(tw.acknowledged)
  342. finally:
  343. mytask.acks_late = False
  344. def test_on_accepted_terminates(self):
  345. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  346. pool = Mock()
  347. tw.terminate(pool, signal="KILL")
  348. self.assertFalse(pool.terminate_job.call_count)
  349. tw.on_accepted(pid=314, time_accepted=time.time())
  350. pool.terminate_job.assert_called_with(314, "KILL")
  351. def test_on_success_acks_early(self):
  352. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  353. tw.time_start = 1
  354. tw.on_success(42)
  355. tw._does_info = False
  356. tw.on_success(42)
  357. self.assertFalse(tw.acknowledged)
  358. def test_on_success_BaseException(self):
  359. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  360. tw.time_start = 1
  361. with self.assertRaises(SystemExit):
  362. try:
  363. raise SystemExit()
  364. except SystemExit:
  365. tw.on_success(ExceptionInfo(sys.exc_info()))
  366. else:
  367. assert False
  368. def test_on_success_eventer(self):
  369. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  370. tw.time_start = 1
  371. tw.eventer = Mock()
  372. tw.send_event = Mock()
  373. tw.on_success(42)
  374. self.assertTrue(tw.send_event.called)
  375. def test_on_success_when_failure(self):
  376. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  377. tw.time_start = 1
  378. tw.on_failure = Mock()
  379. try:
  380. raise KeyError("foo")
  381. except Exception:
  382. tw.on_success(ExceptionInfo(sys.exc_info()))
  383. self.assertTrue(tw.on_failure.called)
  384. def test_on_success_acks_late(self):
  385. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  386. tw.time_start = 1
  387. mytask.acks_late = True
  388. try:
  389. tw.on_success(42)
  390. self.assertTrue(tw.acknowledged)
  391. finally:
  392. mytask.acks_late = False
  393. def test_on_failure_WorkerLostError(self):
  394. def get_ei():
  395. try:
  396. raise WorkerLostError("do re mi")
  397. except WorkerLostError:
  398. return ExceptionInfo(sys.exc_info())
  399. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  400. exc_info = get_ei()
  401. tw.on_failure(exc_info)
  402. self.assertEqual(mytask.backend.get_status(tw.id),
  403. states.FAILURE)
  404. mytask.ignore_result = True
  405. try:
  406. exc_info = get_ei()
  407. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  408. tw.on_failure(exc_info)
  409. self.assertEqual(mytask.backend.get_status(tw.id),
  410. states.PENDING)
  411. finally:
  412. mytask.ignore_result = False
  413. def test_on_failure_acks_late(self):
  414. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  415. tw.time_start = 1
  416. mytask.acks_late = True
  417. try:
  418. try:
  419. raise KeyError("foo")
  420. except KeyError:
  421. exc_info = ExceptionInfo(sys.exc_info())
  422. tw.on_failure(exc_info)
  423. self.assertTrue(tw.acknowledged)
  424. finally:
  425. mytask.acks_late = False
  426. def test_from_message_invalid_kwargs(self):
  427. body = dict(task=mytask.name, id=1, args=(), kwargs="foo")
  428. with self.assertRaises(InvalidTaskError):
  429. TaskRequest.from_message(None, body)
  430. @patch("celery.worker.job.error")
  431. @patch("celery.worker.job.warn")
  432. def test_on_timeout(self, warn, error):
  433. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  434. tw.on_timeout(soft=True, timeout=1337)
  435. self.assertIn("Soft time limit", warn.call_args[0][0])
  436. tw.on_timeout(soft=False, timeout=1337)
  437. self.assertIn("Hard time limit", error.call_args[0][0])
  438. self.assertEqual(mytask.backend.get_status(tw.id),
  439. states.FAILURE)
  440. mytask.ignore_result = True
  441. try:
  442. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  443. tw.on_timeout(soft=True, timeout=1336)
  444. self.assertEqual(mytask.backend.get_status(tw.id),
  445. states.PENDING)
  446. finally:
  447. mytask.ignore_result = False
  448. def test_execute_and_trace(self):
  449. res = execute_and_trace(mytask.name, uuid(), [4], {})
  450. self.assertEqual(res, 4 ** 4)
  451. def test_execute_safe_catches_exception(self):
  452. def _error_exec(self, *args, **kwargs):
  453. raise KeyError("baz")
  454. @task_dec
  455. def raising():
  456. raise KeyError("baz")
  457. raising.request = None
  458. with self.assertWarnsRegex(RuntimeWarning,
  459. r'Exception raised outside'):
  460. res = execute_and_trace(raising.name, uuid(),
  461. [], {})
  462. self.assertIsInstance(res, ExceptionInfo)
  463. def create_exception(self, exc):
  464. try:
  465. raise exc
  466. except exc.__class__:
  467. return sys.exc_info()
  468. def test_worker_task_trace_handle_retry(self):
  469. from celery.exceptions import RetryTaskError
  470. tid = uuid()
  471. mytask.request.update({"id": tid})
  472. try:
  473. _, value_, _ = self.create_exception(ValueError("foo"))
  474. einfo = self.create_exception(RetryTaskError(str(value_),
  475. exc=value_))
  476. w = TraceInfo(states.RETRY, einfo[1], einfo)
  477. w.handle_retry(mytask, store_errors=False)
  478. self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
  479. w.handle_retry(mytask, store_errors=True)
  480. self.assertEqual(mytask.backend.get_status(tid), states.RETRY)
  481. finally:
  482. mytask.request.clear()
  483. def test_worker_task_trace_handle_failure(self):
  484. tid = uuid()
  485. mytask.request.update({"id": tid})
  486. try:
  487. einfo = self.create_exception(ValueError("foo"))
  488. w = TraceInfo(states.FAILURE, einfo[1], einfo)
  489. w.handle_failure(mytask, store_errors=False)
  490. self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
  491. w.handle_failure(mytask, store_errors=True)
  492. self.assertEqual(mytask.backend.get_status(tid), states.FAILURE)
  493. finally:
  494. mytask.request.clear()
  495. def test_task_wrapper_mail_attrs(self):
  496. tw = TaskRequest(mytask.name, uuid(), [], {})
  497. x = tw.success_msg % {"name": tw.name,
  498. "id": tw.id,
  499. "return_value": 10,
  500. "runtime": 0.3641}
  501. self.assertTrue(x)
  502. x = tw.error_msg % {"name": tw.name,
  503. "id": tw.id,
  504. "exc": "FOOBARBAZ",
  505. "traceback": "foobarbaz"}
  506. self.assertTrue(x)
  507. def test_from_message(self):
  508. us = u"æØåveéðƒeæ"
  509. body = {"task": mytask.name, "id": uuid(),
  510. "args": [2], "kwargs": {us: "bar"}}
  511. m = Message(None, body=anyjson.dumps(body), backend="foo",
  512. content_type="application/json",
  513. content_encoding="utf-8")
  514. tw = TaskRequest.from_message(m, m.decode())
  515. self.assertIsInstance(tw, Request)
  516. self.assertEqual(tw.name, body["task"])
  517. self.assertEqual(tw.id, body["id"])
  518. self.assertEqual(tw.args, body["args"])
  519. us = from_utf8(us)
  520. if sys.version_info < (2, 6):
  521. self.assertEqual(tw.kwargs.keys()[0], us)
  522. self.assertIsInstance(tw.kwargs.keys()[0], str)
  523. def test_from_message_empty_args(self):
  524. body = {"task": mytask.name, "id": uuid()}
  525. m = Message(None, body=anyjson.dumps(body), backend="foo",
  526. content_type="application/json",
  527. content_encoding="utf-8")
  528. tw = TaskRequest.from_message(m, m.decode())
  529. self.assertIsInstance(tw, Request)
  530. self.assertEquals(tw.args, [])
  531. self.assertEquals(tw.kwargs, {})
  532. def test_from_message_missing_required_fields(self):
  533. body = {}
  534. m = Message(None, body=anyjson.dumps(body), backend="foo",
  535. content_type="application/json",
  536. content_encoding="utf-8")
  537. with self.assertRaises(KeyError):
  538. TaskRequest.from_message(m, m.decode())
  539. def test_from_message_nonexistant_task(self):
  540. body = {"task": "cu.mytask.doesnotexist", "id": uuid(),
  541. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  542. m = Message(None, body=anyjson.dumps(body), backend="foo",
  543. content_type="application/json",
  544. content_encoding="utf-8")
  545. with self.assertRaises(KeyError):
  546. TaskRequest.from_message(m, m.decode())
  547. def test_execute(self):
  548. tid = uuid()
  549. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  550. self.assertEqual(tw.execute(), 256)
  551. meta = mytask.backend.get_task_meta(tid)
  552. self.assertEqual(meta["result"], 256)
  553. self.assertEqual(meta["status"], states.SUCCESS)
  554. def test_execute_success_no_kwargs(self):
  555. tid = uuid()
  556. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  557. self.assertEqual(tw.execute(), 256)
  558. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  559. self.assertEqual(meta["result"], 256)
  560. self.assertEqual(meta["status"], states.SUCCESS)
  561. def test_execute_success_some_kwargs(self):
  562. tid = uuid()
  563. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  564. self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
  565. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  566. self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  567. self.assertEqual(meta["result"], 256)
  568. self.assertEqual(meta["status"], states.SUCCESS)
  569. def test_execute_ack(self):
  570. tid = uuid()
  571. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
  572. on_ack=on_ack)
  573. self.assertEqual(tw.execute(), 256)
  574. meta = mytask.backend.get_task_meta(tid)
  575. self.assertTrue(scratch["ACK"])
  576. self.assertEqual(meta["result"], 256)
  577. self.assertEqual(meta["status"], states.SUCCESS)
  578. def test_execute_fail(self):
  579. tid = uuid()
  580. tw = TaskRequest(mytask_raising.name, tid, [4])
  581. self.assertIsInstance(tw.execute(), ExceptionInfo)
  582. meta = mytask_raising.backend.get_task_meta(tid)
  583. self.assertEqual(meta["status"], states.FAILURE)
  584. self.assertIsInstance(meta["result"], KeyError)
  585. def test_execute_using_pool(self):
  586. tid = uuid()
  587. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  588. class MockPool(BasePool):
  589. target = None
  590. args = None
  591. kwargs = None
  592. def __init__(self, *args, **kwargs):
  593. pass
  594. def apply_async(self, target, args=None, kwargs=None,
  595. *margs, **mkwargs):
  596. self.target = target
  597. self.args = args
  598. self.kwargs = kwargs
  599. p = MockPool()
  600. tw.execute_using_pool(p)
  601. self.assertTrue(p.target)
  602. self.assertEqual(p.args[0], mytask.name)
  603. self.assertEqual(p.args[1], tid)
  604. self.assertEqual(p.args[2], [4])
  605. self.assertIn("f", p.args[3])
  606. self.assertIn([4], p.args)
  607. tw.task.accept_magic_kwargs = False
  608. tw.execute_using_pool(p)
  609. def test_default_kwargs(self):
  610. tid = uuid()
  611. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  612. self.assertDictEqual(
  613. tw.extend_with_default_kwargs(10, "some_logfile"), {
  614. "f": "x",
  615. "logfile": "some_logfile",
  616. "loglevel": 10,
  617. "task_id": tw.id,
  618. "task_retries": 0,
  619. "task_is_eager": False,
  620. "delivery_info": {"exchange": None, "routing_key": None},
  621. "task_name": tw.name})
  622. @patch("celery.worker.job.logger")
  623. def _test_on_failure(self, exception, logger):
  624. app = app_or_default()
  625. tid = uuid()
  626. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  627. try:
  628. raise exception
  629. except Exception:
  630. exc_info = ExceptionInfo(sys.exc_info())
  631. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  632. try:
  633. tw.on_failure(exc_info)
  634. self.assertTrue(logger.log.called)
  635. context = logger.log.call_args[0][2]
  636. self.assertEqual(mytask.name, context["name"])
  637. self.assertIn(tid, context["id"])
  638. finally:
  639. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  640. def test_on_failure(self):
  641. self._test_on_failure(Exception("Inside unit tests"))
  642. def test_on_failure_unicode_exception(self):
  643. self._test_on_failure(Exception(u"Бобры атакуют"))
  644. def test_on_failure_utf8_exception(self):
  645. self._test_on_failure(Exception(
  646. from_utf8(u"Бобры атакуют")))