test_worker_job.py 22 KB


  1. # -*- coding: utf-8 -*-
  2. import anyjson
  3. import logging
  4. import os
  5. import sys
  6. import time
  7. from datetime import datetime, timedelta
  8. from kombu.transport.base import Message
  9. from mock import Mock
  10. from celery import states
  11. from celery.app import app_or_default
  12. from celery.concurrency.base import BasePool
  13. from celery.datastructures import ExceptionInfo
  14. from celery.task import task as task_dec
  15. from celery.exceptions import RetryTaskError, NotRegistered, WorkerLostError
  16. from celery.log import setup_logger
  17. from celery.result import AsyncResult
  18. from celery.task.base import Task
  19. from celery.utils import gen_unique_id
  20. from celery.worker.job import WorkerTaskTrace, TaskRequest
  21. from celery.worker.job import execute_and_trace, AlreadyExecutedError
  22. from celery.worker.job import InvalidTaskError
  23. from celery.worker.state import revoked
  24. from celery.tests.compat import catch_warnings
  25. from celery.tests.utils import unittest
  26. from celery.tests.utils import execute_context, StringIO, wrap_logger
  27. scratch = {"ACK": False}
  28. some_kwargs_scratchpad = {}
  29. def jail(task_id, task_name, args, kwargs):
  30. return WorkerTaskTrace(task_name, task_id, args, kwargs)()
  31. def on_ack():
  32. scratch["ACK"] = True
  33. @task_dec(accept_magic_kwargs=True)
  34. def mytask(i, **kwargs):
  35. return i ** i
  36. @task_dec # traverses coverage for decorator without parens
  37. def mytask_no_kwargs(i):
  38. return i ** i
  39. class MyTaskIgnoreResult(Task):
  40. ignore_result = True
  41. def run(self, i):
  42. return i ** i
  43. @task_dec(accept_magic_kwargs=True)
  44. def mytask_some_kwargs(i, logfile):
  45. some_kwargs_scratchpad["logfile"] = logfile
  46. return i ** i
  47. @task_dec(accept_magic_kwargs=True)
  48. def mytask_raising(i, **kwargs):
  49. raise KeyError(i)
  50. class test_RetryTaskError(unittest.TestCase):
  51. def test_retry_task_error(self):
  52. try:
  53. raise Exception("foo")
  54. except Exception, exc:
  55. ret = RetryTaskError("Retrying task", exc)
  56. self.assertEqual(ret.exc, exc)
  57. class test_WorkerTaskTrace(unittest.TestCase):
  58. def test_process_cleanup_fails(self):
  59. backend = mytask.backend
  60. mytask.backend = Mock()
  61. mytask.backend.process_cleanup = Mock(side_effect=KeyError())
  62. try:
  63. def with_wrap_logger(sio):
  64. uuid = gen_unique_id()
  65. ret = jail(uuid, mytask.name, [2], {})
  66. self.assertEqual(ret, 4)
  67. mytask.backend.mark_as_done.assert_called_with(uuid, 4)
  68. logs = sio.getvalue().strip()
  69. self.assertIn("Process cleanup failed", logs)
  70. return 1234
  71. logger = mytask.app.log.get_default_logger()
  72. self.assertEqual(execute_context(
  73. wrap_logger(logger), with_wrap_logger), 1234)
  74. finally:
  75. mytask.backend = backend
  76. def test_process_cleanup_BaseException(self):
  77. backend = mytask.backend
  78. mytask.backend = Mock()
  79. mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
  80. try:
  81. self.assertRaises(SystemExit,
  82. jail, gen_unique_id(), mytask.name, [2], {})
  83. finally:
  84. mytask.backend = backend
  85. def test_execute_jail_success(self):
  86. ret = jail(gen_unique_id(), mytask.name, [2], {})
  87. self.assertEqual(ret, 4)
  88. def test_marked_as_started(self):
  89. mytask.track_started = True
  90. class Backend(mytask.backend.__class__):
  91. _started = []
  92. def mark_as_started(self, uuid, *args, **kwargs):
  93. self._started.append(uuid)
  94. prev, mytask.backend = mytask.backend, Backend()
  95. try:
  96. uuid = gen_unique_id()
  97. jail(uuid, mytask.name, [2], {})
  98. self.assertIn(uuid, Backend._started)
  99. mytask.ignore_result = True
  100. uuid = gen_unique_id()
  101. jail(uuid, mytask.name, [2], {})
  102. self.assertNotIn(uuid, Backend._started)
  103. finally:
  104. mytask.backend = prev
  105. mytask.track_started = False
  106. mytask.ignore_result = False
  107. def test_execute_jail_failure(self):
  108. ret = jail(gen_unique_id(), mytask_raising.name,
  109. [4], {})
  110. self.assertIsInstance(ret, ExceptionInfo)
  111. self.assertTupleEqual(ret.exception.args, (4, ))
  112. def test_execute_ignore_result(self):
  113. task_id = gen_unique_id()
  114. ret = jail(id, MyTaskIgnoreResult.name,
  115. [4], {})
  116. self.assertEqual(ret, 256)
  117. self.assertFalse(AsyncResult(task_id).ready())
  118. class MockEventDispatcher(object):
  119. def __init__(self):
  120. self.sent = []
  121. def send(self, event, **fields):
  122. self.sent.append(event)
  123. class test_TaskRequest(unittest.TestCase):
  124. def test_task_wrapper_repr(self):
  125. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  126. self.assertTrue(repr(tw))
  127. def test_sets_store_errors(self):
  128. mytask.ignore_result = True
  129. try:
  130. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  131. self.assertFalse(tw._store_errors)
  132. mytask.store_errors_even_if_ignored = True
  133. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  134. self.assertTrue(tw._store_errors)
  135. finally:
  136. mytask.ignore_result = False
  137. mytask.store_errors_even_if_ignored = False
  138. def test_send_event(self):
  139. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  140. tw.eventer = MockEventDispatcher()
  141. tw.send_event("task-frobulated")
  142. self.assertIn("task-frobulated", tw.eventer.sent)
  143. def test_on_retry(self):
  144. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  145. tw.eventer = MockEventDispatcher()
  146. try:
  147. raise RetryTaskError("foo", KeyError("moofoobar"))
  148. except:
  149. einfo = ExceptionInfo(sys.exc_info())
  150. tw.on_failure(einfo)
  151. self.assertIn("task-retried", tw.eventer.sent)
  152. def test_revoked_expires_expired(self):
  153. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  154. tw.expires = datetime.now() - timedelta(days=1)
  155. tw.revoked()
  156. self.assertIn(tw.task_id, revoked)
  157. self.assertEqual(mytask.backend.get_status(tw.task_id),
  158. states.REVOKED)
  159. def test_revoked_expires_not_expired(self):
  160. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  161. tw.expires = datetime.now() + timedelta(days=1)
  162. tw.revoked()
  163. self.assertNotIn(tw.task_id, revoked)
  164. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  165. states.REVOKED)
  166. def test_revoked_expires_ignore_result(self):
  167. mytask.ignore_result = True
  168. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  169. try:
  170. tw.expires = datetime.now() - timedelta(days=1)
  171. tw.revoked()
  172. self.assertIn(tw.task_id, revoked)
  173. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  174. states.REVOKED)
  175. finally:
  176. mytask.ignore_result = False
  177. def test_send_email(self):
  178. app = app_or_default()
  179. old_mail_admins = app.mail_admins
  180. old_enable_mails = mytask.send_error_emails
  181. mail_sent = [False]
  182. def mock_mail_admins(*args, **kwargs):
  183. mail_sent[0] = True
  184. app.mail_admins = mock_mail_admins
  185. mytask.send_error_emails = True
  186. try:
  187. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  188. try:
  189. raise KeyError("moofoobar")
  190. except:
  191. einfo = ExceptionInfo(sys.exc_info())
  192. tw.on_failure(einfo)
  193. self.assertTrue(mail_sent[0])
  194. mail_sent[0] = False
  195. mytask.send_error_emails = False
  196. tw.on_failure(einfo)
  197. self.assertFalse(mail_sent[0])
  198. mail_sent[0] = False
  199. mytask.send_error_emails = True
  200. mytask.error_whitelist = [KeyError]
  201. tw.on_failure(einfo)
  202. self.assertTrue(mail_sent[0])
  203. mail_sent[0] = False
  204. mytask.send_error_emails = True
  205. mytask.error_whitelist = [SyntaxError]
  206. tw.on_failure(einfo)
  207. self.assertFalse(mail_sent[0])
  208. finally:
  209. app.mail_admins = old_mail_admins
  210. mytask.send_error_emails = old_enable_mails
  211. mytask.error_whitelist = ()
  212. def test_already_revoked(self):
  213. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  214. tw._already_revoked = True
  215. self.assertTrue(tw.revoked())
  216. def test_revoked(self):
  217. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  218. revoked.add(tw.task_id)
  219. self.assertTrue(tw.revoked())
  220. self.assertTrue(tw._already_revoked)
  221. self.assertTrue(tw.acknowledged)
  222. def test_execute_does_not_execute_revoked(self):
  223. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  224. revoked.add(tw.task_id)
  225. tw.execute()
  226. def test_execute_acks_late(self):
  227. mytask_raising.acks_late = True
  228. tw = TaskRequest(mytask_raising.name, gen_unique_id(), [1], {"f": "x"})
  229. try:
  230. tw.execute()
  231. self.assertTrue(tw.acknowledged)
  232. finally:
  233. mytask_raising.acks_late = False
  234. def test_execute_using_pool_does_not_execute_revoked(self):
  235. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  236. revoked.add(tw.task_id)
  237. tw.execute_using_pool(None)
  238. def test_on_accepted_acks_early(self):
  239. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  240. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  241. self.assertTrue(tw.acknowledged)
  242. def test_on_accepted_acks_late(self):
  243. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  244. mytask.acks_late = True
  245. try:
  246. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  247. self.assertFalse(tw.acknowledged)
  248. finally:
  249. mytask.acks_late = False
  250. def test_on_success_acks_early(self):
  251. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  252. tw.time_start = 1
  253. tw.on_success(42)
  254. self.assertFalse(tw.acknowledged)
  255. def test_on_success_acks_late(self):
  256. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  257. tw.time_start = 1
  258. mytask.acks_late = True
  259. try:
  260. tw.on_success(42)
  261. self.assertTrue(tw.acknowledged)
  262. finally:
  263. mytask.acks_late = False
  264. def test_on_failure_WorkerLostError(self):
  265. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  266. try:
  267. raise WorkerLostError("do re mi")
  268. except WorkerLostError:
  269. exc_info = ExceptionInfo(sys.exc_info())
  270. tw.on_failure(exc_info)
  271. self.assertEqual(mytask.backend.get_status(tw.task_id),
  272. states.FAILURE)
  273. mytask.ignore_result = True
  274. try:
  275. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  276. tw.on_failure(exc_info)
  277. self.assertEqual(mytask.backend.get_status(tw.task_id),
  278. states.PENDING)
  279. finally:
  280. mytask.ignore_result = False
  281. def test_on_failure_acks_late(self):
  282. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  283. tw.time_start = 1
  284. mytask.acks_late = True
  285. try:
  286. try:
  287. raise KeyError("foo")
  288. except KeyError:
  289. exc_info = ExceptionInfo(sys.exc_info())
  290. tw.on_failure(exc_info)
  291. self.assertTrue(tw.acknowledged)
  292. finally:
  293. mytask.acks_late = False
  294. def test_from_message_invalid_kwargs(self):
  295. body = dict(task="foo", id=1, args=(), kwargs="foo")
  296. self.assertRaises(InvalidTaskError,
  297. TaskRequest.from_message, None, body)
  298. def test_on_timeout(self):
  299. class MockLogger(object):
  300. def __init__(self):
  301. self.warnings = []
  302. self.errors = []
  303. def warning(self, msg, *args, **kwargs):
  304. self.warnings.append(msg)
  305. def error(self, msg, *args, **kwargs):
  306. self.errors.append(msg)
  307. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  308. tw.logger = MockLogger()
  309. tw.on_timeout(soft=True)
  310. self.assertIn("Soft time limit exceeded", tw.logger.warnings[0])
  311. tw.on_timeout(soft=False)
  312. self.assertIn("Hard time limit exceeded", tw.logger.errors[0])
  313. self.assertEqual(mytask.backend.get_status(tw.task_id),
  314. states.FAILURE)
  315. mytask.ignore_result = True
  316. try:
  317. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  318. tw.logger = MockLogger()
  319. finally:
  320. mytask.ignore_result = False
  321. tw.on_timeout(soft=True)
  322. self.assertEqual(mytask.backend.get_status(tw.task_id),
  323. states.PENDING)
  324. def test_execute_and_trace(self):
  325. res = execute_and_trace(mytask.name, gen_unique_id(), [4], {})
  326. self.assertEqual(res, 4 ** 4)
  327. def test_execute_safe_catches_exception(self):
  328. old_exec = WorkerTaskTrace.execute
  329. def _error_exec(self, *args, **kwargs):
  330. raise KeyError("baz")
  331. WorkerTaskTrace.execute = _error_exec
  332. try:
  333. def with_catch_warnings(log):
  334. res = execute_and_trace(mytask.name, gen_unique_id(),
  335. [4], {})
  336. self.assertIsInstance(res, ExceptionInfo)
  337. self.assertTrue(log)
  338. self.assertIn("Exception outside", log[0].message.args[0])
  339. self.assertIn("KeyError", log[0].message.args[0])
  340. context = catch_warnings(record=True)
  341. execute_context(context, with_catch_warnings)
  342. finally:
  343. WorkerTaskTrace.execute = old_exec
  344. def create_exception(self, exc):
  345. try:
  346. raise exc
  347. except exc.__class__:
  348. return sys.exc_info()
  349. def test_worker_task_trace_handle_retry(self):
  350. from celery.exceptions import RetryTaskError
  351. uuid = gen_unique_id()
  352. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  353. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  354. type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
  355. exc=value_))
  356. w._store_errors = False
  357. w.handle_retry(value_, type_, tb_, "")
  358. self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
  359. w._store_errors = True
  360. w.handle_retry(value_, type_, tb_, "")
  361. self.assertEqual(mytask.backend.get_status(uuid), states.RETRY)
  362. def test_worker_task_trace_handle_failure(self):
  363. uuid = gen_unique_id()
  364. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  365. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  366. w._store_errors = False
  367. w.handle_failure(value_, type_, tb_, "")
  368. self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
  369. w._store_errors = True
  370. w.handle_failure(value_, type_, tb_, "")
  371. self.assertEqual(mytask.backend.get_status(uuid), states.FAILURE)
  372. def test_executed_bit(self):
  373. tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
  374. self.assertFalse(tw.executed)
  375. tw._set_executed_bit()
  376. self.assertTrue(tw.executed)
  377. self.assertRaises(AlreadyExecutedError, tw._set_executed_bit)
  378. def test_task_wrapper_mail_attrs(self):
  379. tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
  380. x = tw.success_msg % {"name": tw.task_name,
  381. "id": tw.task_id,
  382. "return_value": 10,
  383. "runtime": 0.3641}
  384. self.assertTrue(x)
  385. x = tw.error_msg % {"name": tw.task_name,
  386. "id": tw.task_id,
  387. "exc": "FOOBARBAZ",
  388. "traceback": "foobarbaz"}
  389. self.assertTrue(x)
  390. x = tw.email_subject % {"name": tw.task_name,
  391. "id": tw.task_id,
  392. "exc": "FOOBARBAZ",
  393. "hostname": "lana"}
  394. self.assertTrue(x)
  395. def test_from_message(self):
  396. body = {"task": mytask.name, "id": gen_unique_id(),
  397. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  398. m = Message(None, body=anyjson.serialize(body), backend="foo",
  399. content_type="application/json",
  400. content_encoding="utf-8")
  401. tw = TaskRequest.from_message(m, m.decode())
  402. self.assertIsInstance(tw, TaskRequest)
  403. self.assertEqual(tw.task_name, body["task"])
  404. self.assertEqual(tw.task_id, body["id"])
  405. self.assertEqual(tw.args, body["args"])
  406. self.assertEqual(tw.kwargs.keys()[0],
  407. u"æØåveéðƒeæ".encode("utf-8"))
  408. self.assertNotIsInstance(tw.kwargs.keys()[0], unicode)
  409. self.assertTrue(tw.logger)
  410. def test_from_message_nonexistant_task(self):
  411. body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
  412. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  413. m = Message(None, body=anyjson.serialize(body), backend="foo",
  414. content_type="application/json",
  415. content_encoding="utf-8")
  416. self.assertRaises(NotRegistered, TaskRequest.from_message,
  417. m, m.decode())
  418. def test_execute(self):
  419. tid = gen_unique_id()
  420. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  421. self.assertEqual(tw.execute(), 256)
  422. meta = mytask.backend.get_task_meta(tid)
  423. self.assertEqual(meta["result"], 256)
  424. self.assertEqual(meta["status"], states.SUCCESS)
  425. def test_execute_success_no_kwargs(self):
  426. tid = gen_unique_id()
  427. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  428. self.assertEqual(tw.execute(), 256)
  429. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  430. self.assertEqual(meta["result"], 256)
  431. self.assertEqual(meta["status"], states.SUCCESS)
  432. def test_execute_success_some_kwargs(self):
  433. tid = gen_unique_id()
  434. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  435. self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
  436. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  437. self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  438. self.assertEqual(meta["result"], 256)
  439. self.assertEqual(meta["status"], states.SUCCESS)
  440. def test_execute_ack(self):
  441. tid = gen_unique_id()
  442. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
  443. on_ack=on_ack)
  444. self.assertEqual(tw.execute(), 256)
  445. meta = mytask.backend.get_task_meta(tid)
  446. self.assertTrue(scratch["ACK"])
  447. self.assertEqual(meta["result"], 256)
  448. self.assertEqual(meta["status"], states.SUCCESS)
  449. def test_execute_fail(self):
  450. tid = gen_unique_id()
  451. tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
  452. self.assertIsInstance(tw.execute(), ExceptionInfo)
  453. meta = mytask_raising.backend.get_task_meta(tid)
  454. self.assertEqual(meta["status"], states.FAILURE)
  455. self.assertIsInstance(meta["result"], KeyError)
  456. def test_execute_using_pool(self):
  457. tid = gen_unique_id()
  458. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  459. class MockPool(BasePool):
  460. target = None
  461. args = None
  462. kwargs = None
  463. def __init__(self, *args, **kwargs):
  464. pass
  465. def apply_async(self, target, args=None, kwargs=None,
  466. *margs, **mkwargs):
  467. self.target = target
  468. self.args = args
  469. self.kwargs = kwargs
  470. p = MockPool()
  471. tw.execute_using_pool(p)
  472. self.assertTrue(p.target)
  473. self.assertEqual(p.args[0], mytask.name)
  474. self.assertEqual(p.args[1], tid)
  475. self.assertEqual(p.args[2], [4])
  476. self.assertIn("f", p.args[3])
  477. self.assertIn([4], p.args)
  478. def test_default_kwargs(self):
  479. tid = gen_unique_id()
  480. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  481. self.assertDictEqual(
  482. tw.extend_with_default_kwargs(10, "some_logfile"), {
  483. "f": "x",
  484. "logfile": "some_logfile",
  485. "loglevel": 10,
  486. "task_id": tw.task_id,
  487. "task_retries": 0,
  488. "task_is_eager": False,
  489. "delivery_info": {},
  490. "task_name": tw.task_name})
  491. def _test_on_failure(self, exception):
  492. app = app_or_default()
  493. tid = gen_unique_id()
  494. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  495. try:
  496. raise exception
  497. except Exception:
  498. exc_info = ExceptionInfo(sys.exc_info())
  499. logfh = StringIO()
  500. tw.logger.handlers = []
  501. tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
  502. root=False)
  503. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  504. tw.on_failure(exc_info)
  505. logvalue = logfh.getvalue()
  506. self.assertIn(mytask.name, logvalue)
  507. self.assertIn(tid, logvalue)
  508. self.assertIn("ERROR", logvalue)
  509. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  510. def test_on_failure(self):
  511. self._test_on_failure(Exception("Inside unit tests"))
  512. def test_on_failure_unicode_exception(self):
  513. self._test_on_failure(Exception(u"Бобры атакуют"))
  514. def test_on_failure_utf8_exception(self):
  515. self._test_on_failure(Exception(
  516. u"Бобры атакуют".encode('utf8')))