test_worker_job.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. import anyjson
  5. import logging
  6. import os
  7. import sys
  8. import time
  9. from datetime import datetime, timedelta
  10. from kombu.transport.base import Message
  11. from mock import Mock
  12. from nose import SkipTest
  13. from celery import states
  14. from celery.app import app_or_default
  15. from celery.concurrency.base import BasePool
  16. from celery.datastructures import ExceptionInfo
  17. from celery.task import task as task_dec
  18. from celery.exceptions import RetryTaskError, NotRegistered, WorkerLostError
  19. from celery.log import setup_logger
  20. from celery.result import AsyncResult
  21. from celery.task.base import Task
  22. from celery.utils import uuid
  23. from celery.utils.encoding import from_utf8
  24. from celery.worker.job import (WorkerTaskTrace, TaskRequest,
  25. InvalidTaskError, execute_and_trace,
  26. default_encode)
  27. from celery.worker.state import revoked
  28. from celery.tests.compat import catch_warnings
  29. from celery.tests.utils import unittest
  30. from celery.tests.utils import WhateverIO, wrap_logger
  31. scratch = {"ACK": False}
  32. some_kwargs_scratchpad = {}
  33. def jail(task_id, task_name, args, kwargs):
  34. return WorkerTaskTrace(task_name, task_id, args, kwargs)()
  35. def on_ack():
  36. scratch["ACK"] = True
  37. @task_dec(accept_magic_kwargs=True)
  38. def mytask(i, **kwargs):
  39. return i ** i
  40. @task_dec # traverses coverage for decorator without parens
  41. def mytask_no_kwargs(i):
  42. return i ** i
  43. class MyTaskIgnoreResult(Task):
  44. ignore_result = True
  45. def run(self, i):
  46. return i ** i
  47. @task_dec(accept_magic_kwargs=True)
  48. def mytask_some_kwargs(i, logfile):
  49. some_kwargs_scratchpad["logfile"] = logfile
  50. return i ** i
  51. @task_dec(accept_magic_kwargs=True)
  52. def mytask_raising(i, **kwargs):
  53. raise KeyError(i)
  54. class test_default_encode(unittest.TestCase):
  55. def setUp(self):
  56. if sys.version_info >= (3, 0):
  57. raise SkipTest("py3k: not relevant")
  58. def test_jython(self):
  59. prev, sys.platform = sys.platform, "java 1.6.1"
  60. try:
  61. self.assertEqual(default_encode("foo"), "foo")
  62. finally:
  63. sys.platform = prev
  64. def test_cython(self):
  65. prev, sys.platform = sys.platform, "darwin"
  66. gfe, sys.getfilesystemencoding = sys.getfilesystemencoding, \
  67. lambda: "utf-8"
  68. try:
  69. self.assertEqual(default_encode("foo"), "foo")
  70. finally:
  71. sys.platform = prev
  72. sys.getfilesystemencoding = gfe
  73. class test_RetryTaskError(unittest.TestCase):
  74. def test_retry_task_error(self):
  75. try:
  76. raise Exception("foo")
  77. except Exception, exc:
  78. ret = RetryTaskError("Retrying task", exc)
  79. self.assertEqual(ret.exc, exc)
  80. class test_WorkerTaskTrace(unittest.TestCase):
  81. def test_process_cleanup_fails(self):
  82. backend = mytask.backend
  83. mytask.backend = Mock()
  84. mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  85. try:
  86. logger = mytask.app.log.get_default_logger()
  87. with wrap_logger(logger) as sio:
  88. tid = uuid()
  89. ret = jail(tid, mytask.name, [2], {})
  90. self.assertEqual(ret, 4)
  91. mytask.backend.mark_as_done.assert_called_with(tid, 4)
  92. logs = sio.getvalue().strip()
  93. self.assertIn("Process cleanup failed", logs)
  94. finally:
  95. mytask.backend = backend
  96. def test_process_cleanup_BaseException(self):
  97. backend = mytask.backend
  98. mytask.backend = Mock()
  99. mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  100. try:
  101. with self.assertRaises(SystemExit):
  102. jail(uuid(), mytask.name, [2], {})
  103. finally:
  104. mytask.backend = backend
  105. def test_execute_jail_success(self):
  106. ret = jail(uuid(), mytask.name, [2], {})
  107. self.assertEqual(ret, 4)
  108. def test_marked_as_started(self):
  109. mytask.track_started = True
  110. class Backend(mytask.backend.__class__):
  111. _started = []
  112. def mark_as_started(self, tid, *args, **kwargs):
  113. self._started.append(tid)
  114. prev, mytask.backend = mytask.backend, Backend()
  115. try:
  116. tid = uuid()
  117. jail(tid, mytask.name, [2], {})
  118. self.assertIn(tid, Backend._started)
  119. mytask.ignore_result = True
  120. tid = uuid()
  121. jail(tid, mytask.name, [2], {})
  122. self.assertNotIn(tid, Backend._started)
  123. finally:
  124. mytask.backend = prev
  125. mytask.track_started = False
  126. mytask.ignore_result = False
  127. def test_execute_jail_failure(self):
  128. ret = jail(uuid(), mytask_raising.name,
  129. [4], {})
  130. self.assertIsInstance(ret, ExceptionInfo)
  131. self.assertTupleEqual(ret.exception.args, (4, ))
  132. def test_execute_ignore_result(self):
  133. task_id = uuid()
  134. ret = jail(id, MyTaskIgnoreResult.name,
  135. [4], {})
  136. self.assertEqual(ret, 256)
  137. self.assertFalse(AsyncResult(task_id).ready())
  138. class MockEventDispatcher(object):
  139. def __init__(self):
  140. self.sent = []
  141. def send(self, event, **fields):
  142. self.sent.append(event)
  143. class test_TaskRequest(unittest.TestCase):
  144. def test_task_wrapper_repr(self):
  145. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  146. self.assertTrue(repr(tw))
  147. def test_sets_store_errors(self):
  148. mytask.ignore_result = True
  149. try:
  150. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  151. self.assertFalse(tw._store_errors)
  152. mytask.store_errors_even_if_ignored = True
  153. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  154. self.assertTrue(tw._store_errors)
  155. finally:
  156. mytask.ignore_result = False
  157. mytask.store_errors_even_if_ignored = False
  158. def test_send_event(self):
  159. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  160. tw.eventer = MockEventDispatcher()
  161. tw.send_event("task-frobulated")
  162. self.assertIn("task-frobulated", tw.eventer.sent)
  163. def test_on_retry(self):
  164. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  165. tw.eventer = MockEventDispatcher()
  166. try:
  167. raise RetryTaskError("foo", KeyError("moofoobar"))
  168. except:
  169. einfo = ExceptionInfo(sys.exc_info())
  170. tw.on_failure(einfo)
  171. self.assertIn("task-retried", tw.eventer.sent)
  172. def test_terminate__task_started(self):
  173. pool = Mock()
  174. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  175. tw.time_start = time.time()
  176. tw.worker_pid = 313
  177. tw.terminate(pool, signal="KILL")
  178. pool.terminate_job.assert_called_with(tw.worker_pid, "KILL")
  179. def test_terminate__task_reserved(self):
  180. pool = Mock()
  181. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  182. tw.time_start = None
  183. tw.terminate(pool, signal="KILL")
  184. self.assertFalse(pool.terminate_job.call_count)
  185. self.assertTupleEqual(tw._terminate_on_ack, (True, pool, "KILL"))
  186. tw.terminate(pool, signal="KILL")
  187. def test_revoked_expires_expired(self):
  188. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  189. tw.expires = datetime.now() - timedelta(days=1)
  190. tw.revoked()
  191. self.assertIn(tw.task_id, revoked)
  192. self.assertEqual(mytask.backend.get_status(tw.task_id),
  193. states.REVOKED)
  194. def test_revoked_expires_not_expired(self):
  195. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  196. tw.expires = datetime.now() + timedelta(days=1)
  197. tw.revoked()
  198. self.assertNotIn(tw.task_id, revoked)
  199. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  200. states.REVOKED)
  201. def test_revoked_expires_ignore_result(self):
  202. mytask.ignore_result = True
  203. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  204. try:
  205. tw.expires = datetime.now() - timedelta(days=1)
  206. tw.revoked()
  207. self.assertIn(tw.task_id, revoked)
  208. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  209. states.REVOKED)
  210. finally:
  211. mytask.ignore_result = False
  212. def test_send_email(self):
  213. app = app_or_default()
  214. old_mail_admins = app.mail_admins
  215. old_enable_mails = mytask.send_error_emails
  216. mail_sent = [False]
  217. def mock_mail_admins(*args, **kwargs):
  218. mail_sent[0] = True
  219. app.mail_admins = mock_mail_admins
  220. mytask.send_error_emails = True
  221. try:
  222. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  223. try:
  224. raise KeyError("moofoobar")
  225. except:
  226. einfo = ExceptionInfo(sys.exc_info())
  227. tw.on_failure(einfo)
  228. self.assertTrue(mail_sent[0])
  229. mail_sent[0] = False
  230. mytask.send_error_emails = False
  231. tw.on_failure(einfo)
  232. self.assertFalse(mail_sent[0])
  233. mail_sent[0] = False
  234. mytask.send_error_emails = True
  235. mytask.error_whitelist = [KeyError]
  236. tw.on_failure(einfo)
  237. self.assertTrue(mail_sent[0])
  238. mail_sent[0] = False
  239. mytask.send_error_emails = True
  240. mytask.error_whitelist = [SyntaxError]
  241. tw.on_failure(einfo)
  242. self.assertFalse(mail_sent[0])
  243. finally:
  244. app.mail_admins = old_mail_admins
  245. mytask.send_error_emails = old_enable_mails
  246. mytask.error_whitelist = ()
  247. def test_already_revoked(self):
  248. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  249. tw._already_revoked = True
  250. self.assertTrue(tw.revoked())
  251. def test_revoked(self):
  252. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  253. revoked.add(tw.task_id)
  254. self.assertTrue(tw.revoked())
  255. self.assertTrue(tw._already_revoked)
  256. self.assertTrue(tw.acknowledged)
  257. def test_execute_does_not_execute_revoked(self):
  258. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  259. revoked.add(tw.task_id)
  260. tw.execute()
  261. def test_execute_acks_late(self):
  262. mytask_raising.acks_late = True
  263. tw = TaskRequest(mytask_raising.name, uuid(), [1], {"f": "x"})
  264. try:
  265. tw.execute()
  266. self.assertTrue(tw.acknowledged)
  267. finally:
  268. mytask_raising.acks_late = False
  269. def test_execute_using_pool_does_not_execute_revoked(self):
  270. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  271. revoked.add(tw.task_id)
  272. tw.execute_using_pool(None)
  273. def test_on_accepted_acks_early(self):
  274. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  275. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  276. self.assertTrue(tw.acknowledged)
  277. def test_on_accepted_acks_late(self):
  278. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  279. mytask.acks_late = True
  280. try:
  281. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  282. self.assertFalse(tw.acknowledged)
  283. finally:
  284. mytask.acks_late = False
  285. def test_on_accepted_terminates(self):
  286. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  287. pool = Mock()
  288. tw.terminate(pool, signal="KILL")
  289. self.assertFalse(pool.terminate_job.call_count)
  290. tw.on_accepted(pid=314, time_accepted=time.time())
  291. pool.terminate_job.assert_called_with(314, "KILL")
  292. def test_on_success_acks_early(self):
  293. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  294. tw.time_start = 1
  295. tw.on_success(42)
  296. self.assertFalse(tw.acknowledged)
  297. def test_on_success_acks_late(self):
  298. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  299. tw.time_start = 1
  300. mytask.acks_late = True
  301. try:
  302. tw.on_success(42)
  303. self.assertTrue(tw.acknowledged)
  304. finally:
  305. mytask.acks_late = False
  306. def test_on_failure_WorkerLostError(self):
  307. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  308. try:
  309. raise WorkerLostError("do re mi")
  310. except WorkerLostError:
  311. exc_info = ExceptionInfo(sys.exc_info())
  312. tw.on_failure(exc_info)
  313. self.assertEqual(mytask.backend.get_status(tw.task_id),
  314. states.FAILURE)
  315. mytask.ignore_result = True
  316. try:
  317. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  318. tw.on_failure(exc_info)
  319. self.assertEqual(mytask.backend.get_status(tw.task_id),
  320. states.PENDING)
  321. finally:
  322. mytask.ignore_result = False
  323. def test_on_failure_acks_late(self):
  324. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  325. tw.time_start = 1
  326. mytask.acks_late = True
  327. try:
  328. try:
  329. raise KeyError("foo")
  330. except KeyError:
  331. exc_info = ExceptionInfo(sys.exc_info())
  332. tw.on_failure(exc_info)
  333. self.assertTrue(tw.acknowledged)
  334. finally:
  335. mytask.acks_late = False
  336. def test_from_message_invalid_kwargs(self):
  337. body = dict(task="foo", id=1, args=(), kwargs="foo")
  338. with self.assertRaises(InvalidTaskError):
  339. TaskRequest.from_message(None, body)
  340. def test_on_timeout(self):
  341. class MockLogger(object):
  342. def __init__(self):
  343. self.warnings = []
  344. self.errors = []
  345. def warning(self, msg, *args, **kwargs):
  346. self.warnings.append(msg % args)
  347. def error(self, msg, *args, **kwargs):
  348. self.errors.append(msg % args)
  349. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  350. tw.logger = MockLogger()
  351. tw.on_timeout(soft=True, timeout=1337)
  352. self.assertIn("Soft time limit (1337s) exceeded",
  353. tw.logger.warnings[0])
  354. tw.on_timeout(soft=False, timeout=1337)
  355. self.assertIn("Hard time limit (1337s) exceeded", tw.logger.errors[0])
  356. self.assertEqual(mytask.backend.get_status(tw.task_id),
  357. states.FAILURE)
  358. mytask.ignore_result = True
  359. try:
  360. tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
  361. tw.logger = MockLogger()
  362. finally:
  363. mytask.ignore_result = False
  364. tw.on_timeout(soft=True, timeout=1336)
  365. self.assertEqual(mytask.backend.get_status(tw.task_id),
  366. states.PENDING)
  367. def test_execute_and_trace(self):
  368. res = execute_and_trace(mytask.name, uuid(), [4], {})
  369. self.assertEqual(res, 4 ** 4)
  370. def test_execute_safe_catches_exception(self):
  371. old_exec = WorkerTaskTrace.execute
  372. def _error_exec(self, *args, **kwargs):
  373. raise KeyError("baz")
  374. WorkerTaskTrace.execute = _error_exec
  375. try:
  376. with catch_warnings(record=True) as log:
  377. res = execute_and_trace(mytask.name, uuid(),
  378. [4], {})
  379. self.assertIsInstance(res, ExceptionInfo)
  380. self.assertTrue(log)
  381. self.assertIn("Exception outside", log[0].message.args[0])
  382. self.assertIn("KeyError", log[0].message.args[0])
  383. finally:
  384. WorkerTaskTrace.execute = old_exec
  385. def create_exception(self, exc):
  386. try:
  387. raise exc
  388. except exc.__class__:
  389. return sys.exc_info()
  390. def test_worker_task_trace_handle_retry(self):
  391. from celery.exceptions import RetryTaskError
  392. tid = uuid()
  393. w = WorkerTaskTrace(mytask.name, tid, [4], {})
  394. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  395. type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
  396. exc=value_))
  397. w._store_errors = False
  398. w.handle_retry(value_, type_, tb_, "")
  399. self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
  400. w._store_errors = True
  401. w.handle_retry(value_, type_, tb_, "")
  402. self.assertEqual(mytask.backend.get_status(tid), states.RETRY)
  403. def test_worker_task_trace_handle_failure(self):
  404. tid = uuid()
  405. w = WorkerTaskTrace(mytask.name, tid, [4], {})
  406. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  407. w._store_errors = False
  408. w.handle_failure(value_, type_, tb_, "")
  409. self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
  410. w._store_errors = True
  411. w.handle_failure(value_, type_, tb_, "")
  412. self.assertEqual(mytask.backend.get_status(tid), states.FAILURE)
  413. def test_task_wrapper_mail_attrs(self):
  414. tw = TaskRequest(mytask.name, uuid(), [], {})
  415. x = tw.success_msg % {"name": tw.task_name,
  416. "id": tw.task_id,
  417. "return_value": 10,
  418. "runtime": 0.3641}
  419. self.assertTrue(x)
  420. x = tw.error_msg % {"name": tw.task_name,
  421. "id": tw.task_id,
  422. "exc": "FOOBARBAZ",
  423. "traceback": "foobarbaz"}
  424. self.assertTrue(x)
  425. def test_from_message(self):
  426. us = u"æØåveéðƒeæ"
  427. body = {"task": mytask.name, "id": uuid(),
  428. "args": [2], "kwargs": {us: "bar"}}
  429. m = Message(None, body=anyjson.serialize(body), backend="foo",
  430. content_type="application/json",
  431. content_encoding="utf-8")
  432. tw = TaskRequest.from_message(m, m.decode())
  433. self.assertIsInstance(tw, TaskRequest)
  434. self.assertEqual(tw.task_name, body["task"])
  435. self.assertEqual(tw.task_id, body["id"])
  436. self.assertEqual(tw.args, body["args"])
  437. us = from_utf8(us)
  438. self.assertEqual(tw.kwargs.keys()[0], us)
  439. self.assertIsInstance(tw.kwargs.keys()[0], str)
  440. self.assertTrue(tw.logger)
  441. def test_from_message_empty_args(self):
  442. body = {"task": mytask.name, "id": uuid()}
  443. m = Message(None, body=anyjson.serialize(body), backend="foo",
  444. content_type="application/json",
  445. content_encoding="utf-8")
  446. tw = TaskRequest.from_message(m, m.decode())
  447. self.assertIsInstance(tw, TaskRequest)
  448. self.assertEquals(tw.args, [])
  449. self.assertEquals(tw.kwargs, {})
  450. def test_from_message_missing_required_fields(self):
  451. body = {}
  452. m = Message(None, body=anyjson.serialize(body), backend="foo",
  453. content_type="application/json",
  454. content_encoding="utf-8")
  455. with self.assertRaises(InvalidTaskError):
  456. TaskRequest.from_message(m, m.decode())
  457. def test_from_message_nonexistant_task(self):
  458. body = {"task": "cu.mytask.doesnotexist", "id": uuid(),
  459. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  460. m = Message(None, body=anyjson.serialize(body), backend="foo",
  461. content_type="application/json",
  462. content_encoding="utf-8")
  463. with self.assertRaises(NotRegistered):
  464. TaskRequest.from_message(m, m.decode())
  465. def test_execute(self):
  466. tid = uuid()
  467. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  468. self.assertEqual(tw.execute(), 256)
  469. meta = mytask.backend.get_task_meta(tid)
  470. self.assertEqual(meta["result"], 256)
  471. self.assertEqual(meta["status"], states.SUCCESS)
  472. def test_execute_success_no_kwargs(self):
  473. tid = uuid()
  474. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  475. self.assertEqual(tw.execute(), 256)
  476. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  477. self.assertEqual(meta["result"], 256)
  478. self.assertEqual(meta["status"], states.SUCCESS)
  479. def test_execute_success_some_kwargs(self):
  480. tid = uuid()
  481. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  482. self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
  483. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  484. self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  485. self.assertEqual(meta["result"], 256)
  486. self.assertEqual(meta["status"], states.SUCCESS)
  487. def test_execute_ack(self):
  488. tid = uuid()
  489. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
  490. on_ack=on_ack)
  491. self.assertEqual(tw.execute(), 256)
  492. meta = mytask.backend.get_task_meta(tid)
  493. self.assertTrue(scratch["ACK"])
  494. self.assertEqual(meta["result"], 256)
  495. self.assertEqual(meta["status"], states.SUCCESS)
  496. def test_execute_fail(self):
  497. tid = uuid()
  498. tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
  499. self.assertIsInstance(tw.execute(), ExceptionInfo)
  500. meta = mytask_raising.backend.get_task_meta(tid)
  501. self.assertEqual(meta["status"], states.FAILURE)
  502. self.assertIsInstance(meta["result"], KeyError)
  503. def test_execute_using_pool(self):
  504. tid = uuid()
  505. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  506. class MockPool(BasePool):
  507. target = None
  508. args = None
  509. kwargs = None
  510. def __init__(self, *args, **kwargs):
  511. pass
  512. def apply_async(self, target, args=None, kwargs=None,
  513. *margs, **mkwargs):
  514. self.target = target
  515. self.args = args
  516. self.kwargs = kwargs
  517. p = MockPool()
  518. tw.execute_using_pool(p)
  519. self.assertTrue(p.target)
  520. self.assertEqual(p.args[0], mytask.name)
  521. self.assertEqual(p.args[1], tid)
  522. self.assertEqual(p.args[2], [4])
  523. self.assertIn("f", p.args[3])
  524. self.assertIn([4], p.args)
  525. def test_default_kwargs(self):
  526. tid = uuid()
  527. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  528. self.assertDictEqual(
  529. tw.extend_with_default_kwargs(10, "some_logfile"), {
  530. "f": "x",
  531. "logfile": "some_logfile",
  532. "loglevel": 10,
  533. "task_id": tw.task_id,
  534. "task_retries": 0,
  535. "task_is_eager": False,
  536. "delivery_info": {},
  537. "task_name": tw.task_name})
  538. def _test_on_failure(self, exception):
  539. app = app_or_default()
  540. tid = uuid()
  541. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  542. try:
  543. raise exception
  544. except Exception:
  545. exc_info = ExceptionInfo(sys.exc_info())
  546. logfh = WhateverIO()
  547. tw.logger.handlers = []
  548. tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
  549. root=False)
  550. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  551. tw.on_failure(exc_info)
  552. logvalue = logfh.getvalue()
  553. self.assertIn(mytask.name, logvalue)
  554. self.assertIn(tid, logvalue)
  555. self.assertIn("ERROR", logvalue)
  556. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  557. def test_on_failure(self):
  558. self._test_on_failure(Exception("Inside unit tests"))
  559. def test_on_failure_unicode_exception(self):
  560. self._test_on_failure(Exception(u"Бобры атакуют"))
  561. def test_on_failure_utf8_exception(self):
  562. self._test_on_failure(Exception(
  563. from_utf8(u"Бобры атакуют")))