test_worker_job.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. import anyjson
  5. import logging
  6. import os
  7. import sys
  8. import time
  9. from datetime import datetime, timedelta
  10. from kombu.transport.base import Message
  11. from mock import Mock
  12. from nose import SkipTest
  13. from celery import current_app
  14. from celery import states
  15. from celery.app import app_or_default
  16. from celery.concurrency.base import BasePool
  17. from celery.datastructures import ExceptionInfo
  18. from celery.exceptions import (RetryTaskError,
  19. WorkerLostError, InvalidTaskError)
  20. from celery.execute.trace import eager_trace_task, TraceInfo
  21. from celery.log import setup_logger
  22. from celery.result import AsyncResult
  23. from celery.task import task as task_dec
  24. from celery.task.base import Task
  25. from celery.utils import uuid
  26. from celery.utils.encoding import from_utf8, default_encode
  27. from celery.worker.job import Request, TaskRequest, execute_and_trace
  28. from celery.worker.state import revoked
  29. from celery.tests.utils import Case, WhateverIO, wrap_logger
  30. scratch = {"ACK": False}
  31. some_kwargs_scratchpad = {}
  32. def jail(task_id, name, args, kwargs):
  33. return eager_trace_task(current_app.tasks[name],
  34. task_id, args, kwargs, eager=False)[0]
  35. def on_ack(*args, **kwargs):
  36. scratch["ACK"] = True
  37. @task_dec(accept_magic_kwargs=True)
  38. def mytask(i, **kwargs):
  39. return i ** i
  40. @task_dec # traverses coverage for decorator without parens
  41. def mytask_no_kwargs(i):
  42. return i ** i
  43. class MyTaskIgnoreResult(Task):
  44. ignore_result = True
  45. def run(self, i):
  46. return i ** i
  47. @task_dec(accept_magic_kwargs=True)
  48. def mytask_some_kwargs(i, logfile):
  49. some_kwargs_scratchpad["logfile"] = logfile
  50. return i ** i
  51. @task_dec(accept_magic_kwargs=True)
  52. def mytask_raising(i, **kwargs):
  53. raise KeyError(i)
  54. class test_default_encode(Case):
  55. def setUp(self):
  56. if sys.version_info >= (3, 0):
  57. raise SkipTest("py3k: not relevant")
  58. def test_jython(self):
  59. prev, sys.platform = sys.platform, "java 1.6.1"
  60. try:
  61. self.assertEqual(default_encode("foo"), "foo")
  62. finally:
  63. sys.platform = prev
  64. def test_cython(self):
  65. prev, sys.platform = sys.platform, "darwin"
  66. gfe, sys.getfilesystemencoding = sys.getfilesystemencoding, \
  67. lambda: "utf-8"
  68. try:
  69. self.assertEqual(default_encode("foo"), "foo")
  70. finally:
  71. sys.platform = prev
  72. sys.getfilesystemencoding = gfe
  73. class test_RetryTaskError(Case):
  74. def test_retry_task_error(self):
  75. try:
  76. raise Exception("foo")
  77. except Exception, exc:
  78. ret = RetryTaskError("Retrying task", exc)
  79. self.assertEqual(ret.exc, exc)
  80. class test_trace_task(Case):
  81. def test_process_cleanup_fails(self):
  82. backend = mytask.backend
  83. mytask.backend = Mock()
  84. mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  85. try:
  86. logger = mytask.app.log.get_default_logger()
  87. with wrap_logger(logger) as sio:
  88. tid = uuid()
  89. ret = jail(tid, mytask.name, [2], {})
  90. self.assertEqual(ret, 4)
  91. mytask.backend.store_result.assert_called_with(tid, 4,
  92. states.SUCCESS)
  93. logs = sio.getvalue().strip()
  94. self.assertIn("Process cleanup failed", logs)
  95. finally:
  96. mytask.backend = backend
  97. def test_process_cleanup_BaseException(self):
  98. backend = mytask.backend
  99. mytask.backend = Mock()
  100. mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  101. try:
  102. with self.assertRaises(SystemExit):
  103. jail(uuid(), mytask.name, [2], {})
  104. finally:
  105. mytask.backend = backend
  106. def test_execute_jail_success(self):
  107. ret = jail(uuid(), mytask.name, [2], {})
  108. self.assertEqual(ret, 4)
  109. def test_marked_as_started(self):
  110. class Backend(mytask.backend.__class__):
  111. _started = []
  112. def store_result(self, tid, meta, state):
  113. if state == states.STARTED:
  114. self._started.append(tid)
  115. prev, mytask.backend = mytask.backend, Backend()
  116. mytask.track_started = True
  117. try:
  118. tid = uuid()
  119. jail(tid, mytask.name, [2], {})
  120. self.assertIn(tid, Backend._started)
  121. mytask.ignore_result = True
  122. tid = uuid()
  123. jail(tid, mytask.name, [2], {})
  124. self.assertNotIn(tid, Backend._started)
  125. finally:
  126. mytask.backend = prev
  127. mytask.track_started = False
  128. mytask.ignore_result = False
  129. def test_execute_jail_failure(self):
  130. u = uuid()
  131. mytask_raising.request.update({"id": u})
  132. try:
  133. ret = jail(u, mytask_raising.name,
  134. [4], {})
  135. self.assertIsInstance(ret, ExceptionInfo)
  136. self.assertTupleEqual(ret.exception.args, (4, ))
  137. finally:
  138. mytask_raising.request.clear()
  139. def test_execute_ignore_result(self):
  140. task_id = uuid()
  141. MyTaskIgnoreResult.request.update({"id": task_id})
  142. try:
  143. ret = jail(task_id, MyTaskIgnoreResult.name,
  144. [4], {})
  145. self.assertEqual(ret, 256)
  146. self.assertFalse(AsyncResult(task_id).ready())
  147. finally:
  148. MyTaskIgnoreResult.request.clear()
  149. class MockEventDispatcher(object):
  150. def __init__(self):
  151. self.sent = []
  152. self.enabled = True
  153. def send(self, event, **fields):
  154. self.sent.append(event)
  155. class test_TaskRequest(Case):
  156. def test_task_wrapper_repr(self):
  157. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  158. self.assertTrue(repr(tw))
  159. def test_sets_store_errors(self):
  160. mytask.ignore_result = True
  161. try:
  162. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  163. self.assertFalse(tw.store_errors)
  164. mytask.store_errors_even_if_ignored = True
  165. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  166. self.assertTrue(tw.store_errors)
  167. finally:
  168. mytask.ignore_result = False
  169. mytask.store_errors_even_if_ignored = False
  170. def test_send_event(self):
  171. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  172. tw.eventer = MockEventDispatcher()
  173. tw.send_event("task-frobulated")
  174. self.assertIn("task-frobulated", tw.eventer.sent)
  175. def test_on_retry(self):
  176. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  177. tw.eventer = MockEventDispatcher()
  178. try:
  179. raise RetryTaskError("foo", KeyError("moofoobar"))
  180. except:
  181. einfo = ExceptionInfo(sys.exc_info())
  182. tw.on_failure(einfo)
  183. self.assertIn("task-retried", tw.eventer.sent)
  184. def test_terminate__task_started(self):
  185. pool = Mock()
  186. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  187. tw.time_start = time.time()
  188. tw.worker_pid = 313
  189. tw.terminate(pool, signal="KILL")
  190. pool.terminate_job.assert_called_with(tw.worker_pid, "KILL")
  191. def test_terminate__task_reserved(self):
  192. pool = Mock()
  193. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  194. tw.time_start = None
  195. tw.terminate(pool, signal="KILL")
  196. self.assertFalse(pool.terminate_job.call_count)
  197. self.assertTupleEqual(tw._terminate_on_ack, (True, pool, "KILL"))
  198. tw.terminate(pool, signal="KILL")
  199. def test_revoked_expires_expired(self):
  200. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
  201. expires=datetime.utcnow() - timedelta(days=1))
  202. tw.revoked()
  203. self.assertIn(tw.task_id, revoked)
  204. self.assertEqual(mytask.backend.get_status(tw.task_id),
  205. states.REVOKED)
  206. def test_revoked_expires_not_expired(self):
  207. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
  208. expires=datetime.utcnow() + timedelta(days=1))
  209. tw.revoked()
  210. self.assertNotIn(tw.task_id, revoked)
  211. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  212. states.REVOKED)
  213. def test_revoked_expires_ignore_result(self):
  214. mytask.ignore_result = True
  215. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
  216. expires=datetime.utcnow() - timedelta(days=1))
  217. try:
  218. tw.revoked()
  219. self.assertIn(tw.task_id, revoked)
  220. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  221. states.REVOKED)
  222. finally:
  223. mytask.ignore_result = False
  224. def test_send_email(self):
  225. app = app_or_default()
  226. old_mail_admins = app.mail_admins
  227. old_enable_mails = mytask.send_error_emails
  228. mail_sent = [False]
  229. def mock_mail_admins(*args, **kwargs):
  230. mail_sent[0] = True
  231. def get_ei():
  232. try:
  233. raise KeyError("moofoobar")
  234. except:
  235. return ExceptionInfo(sys.exc_info())
  236. app.mail_admins = mock_mail_admins
  237. mytask.send_error_emails = True
  238. try:
  239. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  240. einfo = get_ei()
  241. tw.on_failure(einfo)
  242. self.assertTrue(mail_sent[0])
  243. einfo = get_ei()
  244. mail_sent[0] = False
  245. mytask.send_error_emails = False
  246. tw.on_failure(einfo)
  247. self.assertFalse(mail_sent[0])
  248. einfo = get_ei()
  249. mail_sent[0] = False
  250. mytask.send_error_emails = True
  251. mytask.error_whitelist = [KeyError]
  252. tw.on_failure(einfo)
  253. self.assertTrue(mail_sent[0])
  254. einfo = get_ei()
  255. mail_sent[0] = False
  256. mytask.send_error_emails = True
  257. mytask.error_whitelist = [SyntaxError]
  258. tw.on_failure(einfo)
  259. self.assertFalse(mail_sent[0])
  260. finally:
  261. app.mail_admins = old_mail_admins
  262. mytask.send_error_emails = old_enable_mails
  263. mytask.error_whitelist = ()
  264. def test_already_revoked(self):
  265. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  266. tw._already_revoked = True
  267. self.assertTrue(tw.revoked())
  268. def test_revoked(self):
  269. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  270. revoked.add(tw.task_id)
  271. self.assertTrue(tw.revoked())
  272. self.assertTrue(tw._already_revoked)
  273. self.assertTrue(tw.acknowledged)
  274. def test_execute_does_not_execute_revoked(self):
  275. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  276. revoked.add(tw.task_id)
  277. tw.execute()
  278. def test_execute_acks_late(self):
  279. mytask_raising.acks_late = True
  280. tw = TaskRequest(mytask_raising.name, uuid(), [1], {"f": "x"})
  281. try:
  282. tw.execute()
  283. self.assertTrue(tw.acknowledged)
  284. finally:
  285. mytask_raising.acks_late = False
  286. def test_execute_using_pool_does_not_execute_revoked(self):
  287. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  288. revoked.add(tw.task_id)
  289. tw.execute_using_pool(None)
  290. def test_on_accepted_acks_early(self):
  291. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  292. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  293. self.assertTrue(tw.acknowledged)
  294. def test_on_accepted_acks_late(self):
  295. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  296. mytask.acks_late = True
  297. try:
  298. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  299. self.assertFalse(tw.acknowledged)
  300. finally:
  301. mytask.acks_late = False
  302. def test_on_accepted_terminates(self):
  303. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  304. pool = Mock()
  305. tw.terminate(pool, signal="KILL")
  306. self.assertFalse(pool.terminate_job.call_count)
  307. tw.on_accepted(pid=314, time_accepted=time.time())
  308. pool.terminate_job.assert_called_with(314, "KILL")
  309. def test_on_success_acks_early(self):
  310. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  311. tw.time_start = 1
  312. tw.on_success(42)
  313. self.assertFalse(tw.acknowledged)
  314. def test_on_success_acks_late(self):
  315. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  316. tw.time_start = 1
  317. mytask.acks_late = True
  318. try:
  319. tw.on_success(42)
  320. self.assertTrue(tw.acknowledged)
  321. finally:
  322. mytask.acks_late = False
  323. def test_on_failure_WorkerLostError(self):
  324. def get_ei():
  325. try:
  326. raise WorkerLostError("do re mi")
  327. except WorkerLostError:
  328. return ExceptionInfo(sys.exc_info())
  329. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  330. exc_info = get_ei()
  331. tw.on_failure(exc_info)
  332. self.assertEqual(mytask.backend.get_status(tw.task_id),
  333. states.FAILURE)
  334. mytask.ignore_result = True
  335. try:
  336. exc_info = get_ei()
  337. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  338. tw.on_failure(exc_info)
  339. self.assertEqual(mytask.backend.get_status(tw.task_id),
  340. states.PENDING)
  341. finally:
  342. mytask.ignore_result = False
  343. def test_on_failure_acks_late(self):
  344. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  345. tw.time_start = 1
  346. mytask.acks_late = True
  347. try:
  348. try:
  349. raise KeyError("foo")
  350. except KeyError:
  351. exc_info = ExceptionInfo(sys.exc_info())
  352. tw.on_failure(exc_info)
  353. self.assertTrue(tw.acknowledged)
  354. finally:
  355. mytask.acks_late = False
  356. def test_from_message_invalid_kwargs(self):
  357. body = dict(task=mytask.name, id=1, args=(), kwargs="foo")
  358. with self.assertRaises(InvalidTaskError):
  359. TaskRequest.from_message(None, body)
  360. def test_on_timeout(self):
  361. class MockLogger(object):
  362. def __init__(self):
  363. self.warnings = []
  364. self.errors = []
  365. def warning(self, msg, *args, **kwargs):
  366. self.warnings.append(msg % args)
  367. def error(self, msg, *args, **kwargs):
  368. self.errors.append(msg % args)
  369. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  370. tw.logger = MockLogger()
  371. tw.on_timeout(soft=True, timeout=1337)
  372. self.assertIn("Soft time limit (1337s) exceeded",
  373. tw.logger.warnings[0])
  374. tw.on_timeout(soft=False, timeout=1337)
  375. self.assertIn("Hard time limit (1337s) exceeded", tw.logger.errors[0])
  376. self.assertEqual(mytask.backend.get_status(tw.task_id),
  377. states.FAILURE)
  378. mytask.ignore_result = True
  379. try:
  380. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  381. tw.logger = MockLogger()
  382. finally:
  383. tw.on_timeout(soft=True, timeout=1336)
  384. self.assertEqual(mytask.backend.get_status(tw.task_id),
  385. states.PENDING)
  386. mytask.ignore_result = False
  387. def test_execute_and_trace(self):
  388. res = execute_and_trace(mytask.name, uuid(), [4], {})
  389. self.assertEqual(res, 4 ** 4)
  390. def test_execute_safe_catches_exception(self):
  391. def _error_exec(self, *args, **kwargs):
  392. raise KeyError("baz")
  393. @task_dec
  394. def raising():
  395. raise KeyError("baz")
  396. raising.request = None
  397. with self.assertWarnsRegex(RuntimeWarning,
  398. r'Exception raised outside'):
  399. res = execute_and_trace(raising.name, uuid(),
  400. [], {})
  401. self.assertIsInstance(res, ExceptionInfo)
  402. def create_exception(self, exc):
  403. try:
  404. raise exc
  405. except exc.__class__:
  406. return sys.exc_info()
  407. def test_worker_task_trace_handle_retry(self):
  408. from celery.exceptions import RetryTaskError
  409. tid = uuid()
  410. mytask.request.update({"id": tid})
  411. try:
  412. _, value_, _ = self.create_exception(ValueError("foo"))
  413. einfo = self.create_exception(RetryTaskError(str(value_),
  414. exc=value_))
  415. w = TraceInfo(states.RETRY, einfo[1], einfo)
  416. w.handle_retry(mytask, store_errors=False)
  417. self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
  418. w.handle_retry(mytask, store_errors=True)
  419. self.assertEqual(mytask.backend.get_status(tid), states.RETRY)
  420. finally:
  421. mytask.request.clear()
  422. def test_worker_task_trace_handle_failure(self):
  423. tid = uuid()
  424. mytask.request.update({"id": tid})
  425. try:
  426. einfo = self.create_exception(ValueError("foo"))
  427. w = TraceInfo(states.FAILURE, einfo[1], einfo)
  428. w.handle_failure(mytask, store_errors=False)
  429. self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
  430. w.handle_failure(mytask, store_errors=True)
  431. self.assertEqual(mytask.backend.get_status(tid), states.FAILURE)
  432. finally:
  433. mytask.request.clear()
  434. def test_task_wrapper_mail_attrs(self):
  435. tw = TaskRequest(mytask.name, uuid(), [], {})
  436. x = tw.success_msg % {"name": tw.task_name,
  437. "id": tw.task_id,
  438. "return_value": 10,
  439. "runtime": 0.3641}
  440. self.assertTrue(x)
  441. x = tw.error_msg % {"name": tw.task_name,
  442. "id": tw.task_id,
  443. "exc": "FOOBARBAZ",
  444. "traceback": "foobarbaz"}
  445. self.assertTrue(x)
  446. def test_from_message(self):
  447. us = u"æØåveéðƒeæ"
  448. body = {"task": mytask.name, "id": uuid(),
  449. "args": [2], "kwargs": {us: "bar"}}
  450. m = Message(None, body=anyjson.serialize(body), backend="foo",
  451. content_type="application/json",
  452. content_encoding="utf-8")
  453. tw = TaskRequest.from_message(m, m.decode())
  454. self.assertIsInstance(tw, Request)
  455. self.assertEqual(tw.task_name, body["task"])
  456. self.assertEqual(tw.task_id, body["id"])
  457. self.assertEqual(tw.args, body["args"])
  458. us = from_utf8(us)
  459. if sys.version_info < (2, 6):
  460. self.assertEqual(tw.kwargs.keys()[0], us)
  461. self.assertIsInstance(tw.kwargs.keys()[0], str)
  462. self.assertTrue(tw.logger)
  463. def test_from_message_empty_args(self):
  464. body = {"task": mytask.name, "id": uuid()}
  465. m = Message(None, body=anyjson.serialize(body), backend="foo",
  466. content_type="application/json",
  467. content_encoding="utf-8")
  468. tw = TaskRequest.from_message(m, m.decode())
  469. self.assertIsInstance(tw, Request)
  470. self.assertEquals(tw.args, [])
  471. self.assertEquals(tw.kwargs, {})
  472. def test_from_message_missing_required_fields(self):
  473. body = {}
  474. m = Message(None, body=anyjson.serialize(body), backend="foo",
  475. content_type="application/json",
  476. content_encoding="utf-8")
  477. with self.assertRaises(KeyError):
  478. TaskRequest.from_message(m, m.decode())
  479. def test_from_message_nonexistant_task(self):
  480. body = {"task": "cu.mytask.doesnotexist", "id": uuid(),
  481. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  482. m = Message(None, body=anyjson.serialize(body), backend="foo",
  483. content_type="application/json",
  484. content_encoding="utf-8")
  485. with self.assertRaises(KeyError):
  486. TaskRequest.from_message(m, m.decode())
  487. def test_execute(self):
  488. tid = uuid()
  489. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  490. self.assertEqual(tw.execute(), 256)
  491. meta = mytask.backend.get_task_meta(tid)
  492. self.assertEqual(meta["result"], 256)
  493. self.assertEqual(meta["status"], states.SUCCESS)
  494. def test_execute_success_no_kwargs(self):
  495. tid = uuid()
  496. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  497. self.assertEqual(tw.execute(), 256)
  498. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  499. self.assertEqual(meta["result"], 256)
  500. self.assertEqual(meta["status"], states.SUCCESS)
  501. def test_execute_success_some_kwargs(self):
  502. tid = uuid()
  503. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  504. self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
  505. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  506. self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  507. self.assertEqual(meta["result"], 256)
  508. self.assertEqual(meta["status"], states.SUCCESS)
  509. def test_execute_ack(self):
  510. tid = uuid()
  511. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
  512. on_ack=on_ack)
  513. self.assertEqual(tw.execute(), 256)
  514. meta = mytask.backend.get_task_meta(tid)
  515. self.assertTrue(scratch["ACK"])
  516. self.assertEqual(meta["result"], 256)
  517. self.assertEqual(meta["status"], states.SUCCESS)
  518. def test_execute_fail(self):
  519. tid = uuid()
  520. tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
  521. self.assertIsInstance(tw.execute(), ExceptionInfo)
  522. meta = mytask_raising.backend.get_task_meta(tid)
  523. self.assertEqual(meta["status"], states.FAILURE)
  524. self.assertIsInstance(meta["result"], KeyError)
  525. def test_execute_using_pool(self):
  526. tid = uuid()
  527. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  528. class MockPool(BasePool):
  529. target = None
  530. args = None
  531. kwargs = None
  532. def __init__(self, *args, **kwargs):
  533. pass
  534. def apply_async(self, target, args=None, kwargs=None,
  535. *margs, **mkwargs):
  536. self.target = target
  537. self.args = args
  538. self.kwargs = kwargs
  539. p = MockPool()
  540. tw.execute_using_pool(p)
  541. self.assertTrue(p.target)
  542. self.assertEqual(p.args[0], mytask.name)
  543. self.assertEqual(p.args[1], tid)
  544. self.assertEqual(p.args[2], [4])
  545. self.assertIn("f", p.args[3])
  546. self.assertIn([4], p.args)
  547. def test_default_kwargs(self):
  548. tid = uuid()
  549. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  550. self.assertDictEqual(
  551. tw.extend_with_default_kwargs(10, "some_logfile"), {
  552. "f": "x",
  553. "logfile": "some_logfile",
  554. "loglevel": 10,
  555. "task_id": tw.task_id,
  556. "task_retries": 0,
  557. "task_is_eager": False,
  558. "delivery_info": {"exchange": None, "routing_key": None},
  559. "task_name": tw.task_name})
  560. def _test_on_failure(self, exception):
  561. app = app_or_default()
  562. tid = uuid()
  563. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  564. try:
  565. raise exception
  566. except Exception:
  567. exc_info = ExceptionInfo(sys.exc_info())
  568. logfh = WhateverIO()
  569. tw.logger.handlers = []
  570. tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
  571. root=False)
  572. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  573. tw.on_failure(exc_info)
  574. logvalue = logfh.getvalue()
  575. self.assertIn(mytask.name, logvalue)
  576. self.assertIn(tid, logvalue)
  577. self.assertIn("ERROR", logvalue)
  578. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  579. def test_on_failure(self):
  580. self._test_on_failure(Exception("Inside unit tests"))
  581. def test_on_failure_unicode_exception(self):
  582. self._test_on_failure(Exception(u"Бобры атакуют"))
  583. def test_on_failure_utf8_exception(self):
  584. self._test_on_failure(Exception(
  585. from_utf8(u"Бобры атакуют")))