test_worker_job.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. # -*- coding: utf-8 -*-
  2. import anyjson
  3. import logging
  4. import os
  5. import sys
  6. import time
  7. from datetime import datetime, timedelta
  8. from kombu.transport.base import Message
  9. from celery import states
  10. from celery.app import app_or_default
  11. from celery.concurrency.base import BasePool
  12. from celery.datastructures import ExceptionInfo
  13. from celery.task import task as task_dec
  14. from celery.exceptions import RetryTaskError, NotRegistered, WorkerLostError
  15. from celery.log import setup_logger
  16. from celery.result import AsyncResult
  17. from celery.task.base import Task
  18. from celery.utils import gen_unique_id
  19. from celery.worker.job import WorkerTaskTrace, TaskRequest
  20. from celery.worker.job import execute_and_trace, AlreadyExecutedError
  21. from celery.worker.job import InvalidTaskError
  22. from celery.worker.state import revoked
  23. from celery.tests.compat import catch_warnings
  24. from celery.tests.utils import unittest
  25. from celery.tests.utils import execute_context, StringIO
  26. scratch = {"ACK": False}
  27. some_kwargs_scratchpad = {}
  28. def jail(task_id, task_name, args, kwargs):
  29. return WorkerTaskTrace(task_name, task_id, args, kwargs)()
  30. def on_ack():
  31. scratch["ACK"] = True
  32. @task_dec(accept_magic_kwargs=True)
  33. def mytask(i, **kwargs):
  34. return i ** i
  35. @task_dec # traverses coverage for decorator without parens
  36. def mytask_no_kwargs(i):
  37. return i ** i
  38. class MyTaskIgnoreResult(Task):
  39. ignore_result = True
  40. def run(self, i):
  41. return i ** i
  42. @task_dec(accept_magic_kwargs=True)
  43. def mytask_some_kwargs(i, logfile):
  44. some_kwargs_scratchpad["logfile"] = logfile
  45. return i ** i
  46. @task_dec(accept_magic_kwargs=True)
  47. def mytask_raising(i, **kwargs):
  48. raise KeyError(i)
  49. class test_RetryTaskError(unittest.TestCase):
  50. def test_retry_task_error(self):
  51. try:
  52. raise Exception("foo")
  53. except Exception, exc:
  54. ret = RetryTaskError("Retrying task", exc)
  55. self.assertEqual(ret.exc, exc)
  56. class test_WorkerTaskTrace(unittest.TestCase):
  57. def test_execute_jail_success(self):
  58. ret = jail(gen_unique_id(), mytask.name, [2], {})
  59. self.assertEqual(ret, 4)
  60. def test_marked_as_started(self):
  61. mytask.track_started = True
  62. class Backend(mytask.backend.__class__):
  63. _started = []
  64. def mark_as_started(self, uuid, *args, **kwargs):
  65. self._started.append(uuid)
  66. prev, mytask.backend = mytask.backend, Backend()
  67. try:
  68. uuid = gen_unique_id()
  69. jail(uuid, mytask.name, [2], {})
  70. self.assertIn(uuid, Backend._started)
  71. mytask.ignore_result = True
  72. uuid = gen_unique_id()
  73. jail(uuid, mytask.name, [2], {})
  74. self.assertNotIn(uuid, Backend._started)
  75. finally:
  76. mytask.backend = prev
  77. mytask.track_started = False
  78. mytask.ignore_result = False
  79. def test_execute_jail_failure(self):
  80. ret = jail(gen_unique_id(), mytask_raising.name,
  81. [4], {})
  82. self.assertIsInstance(ret, ExceptionInfo)
  83. self.assertTupleEqual(ret.exception.args, (4, ))
  84. def test_execute_ignore_result(self):
  85. task_id = gen_unique_id()
  86. ret = jail(id, MyTaskIgnoreResult.name,
  87. [4], {})
  88. self.assertEqual(ret, 256)
  89. self.assertFalse(AsyncResult(task_id).ready())
  90. class MockEventDispatcher(object):
  91. def __init__(self):
  92. self.sent = []
  93. def send(self, event, **fields):
  94. self.sent.append(event)
  95. class test_TaskRequest(unittest.TestCase):
  96. def test_task_wrapper_repr(self):
  97. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  98. self.assertTrue(repr(tw))
  99. def test_sets_store_errors(self):
  100. mytask.ignore_result = True
  101. try:
  102. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  103. self.assertFalse(tw._store_errors)
  104. mytask.store_errors_even_if_ignored = True
  105. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  106. self.assertTrue(tw._store_errors)
  107. finally:
  108. mytask.ignore_result = False
  109. mytask.store_errors_even_if_ignored = False
  110. def test_send_event(self):
  111. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  112. tw.eventer = MockEventDispatcher()
  113. tw.send_event("task-frobulated")
  114. self.assertIn("task-frobulated", tw.eventer.sent)
  115. def test_on_retry(self):
  116. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  117. tw.eventer = MockEventDispatcher()
  118. try:
  119. raise RetryTaskError("foo", KeyError("moofoobar"))
  120. except:
  121. einfo = ExceptionInfo(sys.exc_info())
  122. tw.on_failure(einfo)
  123. self.assertIn("task-retried", tw.eventer.sent)
  124. def test_revoked_expires_expired(self):
  125. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  126. tw.expires = datetime.now() - timedelta(days=1)
  127. tw.revoked()
  128. self.assertIn(tw.task_id, revoked)
  129. self.assertEqual(mytask.backend.get_status(tw.task_id),
  130. states.REVOKED)
  131. def test_revoked_expires_not_expired(self):
  132. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  133. tw.expires = datetime.now() + timedelta(days=1)
  134. tw.revoked()
  135. self.assertNotIn(tw.task_id, revoked)
  136. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  137. states.REVOKED)
  138. def test_revoked_expires_ignore_result(self):
  139. mytask.ignore_result = True
  140. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  141. try:
  142. tw.expires = datetime.now() - timedelta(days=1)
  143. tw.revoked()
  144. self.assertIn(tw.task_id, revoked)
  145. self.assertNotEqual(mytask.backend.get_status(tw.task_id),
  146. states.REVOKED)
  147. finally:
  148. mytask.ignore_result = False
  149. def test_send_email(self):
  150. app = app_or_default()
  151. old_mail_admins = app.mail_admins
  152. old_enable_mails = mytask.send_error_emails
  153. mail_sent = [False]
  154. def mock_mail_admins(*args, **kwargs):
  155. mail_sent[0] = True
  156. app.mail_admins = mock_mail_admins
  157. mytask.send_error_emails = True
  158. try:
  159. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  160. try:
  161. raise KeyError("moofoobar")
  162. except:
  163. einfo = ExceptionInfo(sys.exc_info())
  164. tw.on_failure(einfo)
  165. self.assertTrue(mail_sent[0])
  166. mail_sent[0] = False
  167. mytask.send_error_emails = False
  168. tw.on_failure(einfo)
  169. self.assertFalse(mail_sent[0])
  170. mail_sent[0] = False
  171. mytask.send_error_emails = True
  172. mytask.error_whitelist = [KeyError]
  173. tw.on_failure(einfo)
  174. self.assertTrue(mail_sent[0])
  175. mail_sent[0] = False
  176. mytask.send_error_emails = True
  177. mytask.error_whitelist = [SyntaxError]
  178. tw.on_failure(einfo)
  179. self.assertFalse(mail_sent[0])
  180. finally:
  181. app.mail_admins = old_mail_admins
  182. mytask.send_error_emails = old_enable_mails
  183. mytask.error_whitelist = ()
  184. def test_already_revoked(self):
  185. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  186. tw._already_revoked = True
  187. self.assertTrue(tw.revoked())
  188. def test_revoked(self):
  189. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  190. revoked.add(tw.task_id)
  191. self.assertTrue(tw.revoked())
  192. self.assertTrue(tw._already_revoked)
  193. self.assertTrue(tw.acknowledged)
  194. def test_execute_does_not_execute_revoked(self):
  195. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  196. revoked.add(tw.task_id)
  197. tw.execute()
  198. def test_execute_acks_late(self):
  199. mytask_raising.acks_late = True
  200. tw = TaskRequest(mytask_raising.name, gen_unique_id(), [1], {"f": "x"})
  201. try:
  202. tw.execute()
  203. self.assertTrue(tw.acknowledged)
  204. finally:
  205. mytask_raising.acks_late = False
  206. def test_execute_using_pool_does_not_execute_revoked(self):
  207. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  208. revoked.add(tw.task_id)
  209. tw.execute_using_pool(None)
  210. def test_on_accepted_acks_early(self):
  211. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  212. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  213. self.assertTrue(tw.acknowledged)
  214. def test_on_accepted_acks_late(self):
  215. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  216. mytask.acks_late = True
  217. try:
  218. tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
  219. self.assertFalse(tw.acknowledged)
  220. finally:
  221. mytask.acks_late = False
  222. def test_on_success_acks_early(self):
  223. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  224. tw.time_start = 1
  225. tw.on_success(42)
  226. self.assertFalse(tw.acknowledged)
  227. def test_on_success_acks_late(self):
  228. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  229. tw.time_start = 1
  230. mytask.acks_late = True
  231. try:
  232. tw.on_success(42)
  233. self.assertTrue(tw.acknowledged)
  234. finally:
  235. mytask.acks_late = False
  236. def test_on_failure_WorkerLostError(self):
  237. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  238. try:
  239. raise WorkerLostError("do re mi")
  240. except WorkerLostError:
  241. exc_info = ExceptionInfo(sys.exc_info())
  242. tw.on_failure(exc_info)
  243. self.assertEqual(mytask.backend.get_status(tw.task_id),
  244. states.FAILURE)
  245. mytask.ignore_result = True
  246. try:
  247. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  248. tw.on_failure(exc_info)
  249. self.assertEqual(mytask.backend.get_status(tw.task_id),
  250. states.PENDING)
  251. finally:
  252. mytask.ignore_result = False
  253. def test_on_failure_acks_late(self):
  254. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  255. tw.time_start = 1
  256. mytask.acks_late = True
  257. try:
  258. try:
  259. raise KeyError("foo")
  260. except KeyError:
  261. exc_info = ExceptionInfo(sys.exc_info())
  262. tw.on_failure(exc_info)
  263. self.assertTrue(tw.acknowledged)
  264. finally:
  265. mytask.acks_late = False
  266. def test_from_message_invalid_kwargs(self):
  267. body = dict(task="foo", id=1, args=(), kwargs="foo")
  268. self.assertRaises(InvalidTaskError,
  269. TaskRequest.from_message, None, body)
  270. def test_on_timeout(self):
  271. class MockLogger(object):
  272. def __init__(self):
  273. self.warnings = []
  274. self.errors = []
  275. def warning(self, msg, *args, **kwargs):
  276. self.warnings.append(msg)
  277. def error(self, msg, *args, **kwargs):
  278. self.errors.append(msg)
  279. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  280. tw.logger = MockLogger()
  281. tw.on_timeout(soft=True)
  282. self.assertIn("Soft time limit exceeded", tw.logger.warnings[0])
  283. tw.on_timeout(soft=False)
  284. self.assertIn("Hard time limit exceeded", tw.logger.errors[0])
  285. self.assertEqual(mytask.backend.get_status(tw.task_id),
  286. states.FAILURE)
  287. mytask.ignore_result = True
  288. try:
  289. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  290. tw.logger = MockLogger()
  291. finally:
  292. mytask.ignore_result = False
  293. tw.on_timeout(soft=True)
  294. self.assertEqual(mytask.backend.get_status(tw.task_id),
  295. states.PENDING)
  296. def test_execute_and_trace(self):
  297. res = execute_and_trace(mytask.name, gen_unique_id(), [4], {})
  298. self.assertEqual(res, 4 ** 4)
  299. def test_execute_safe_catches_exception(self):
  300. old_exec = WorkerTaskTrace.execute
  301. def _error_exec(self, *args, **kwargs):
  302. raise KeyError("baz")
  303. WorkerTaskTrace.execute = _error_exec
  304. try:
  305. def with_catch_warnings(log):
  306. res = execute_and_trace(mytask.name, gen_unique_id(),
  307. [4], {})
  308. self.assertIsInstance(res, ExceptionInfo)
  309. self.assertTrue(log)
  310. self.assertIn("Exception outside", log[0].message.args[0])
  311. self.assertIn("KeyError", log[0].message.args[0])
  312. context = catch_warnings(record=True)
  313. execute_context(context, with_catch_warnings)
  314. finally:
  315. WorkerTaskTrace.execute = old_exec
  316. def create_exception(self, exc):
  317. try:
  318. raise exc
  319. except exc.__class__:
  320. return sys.exc_info()
  321. def test_worker_task_trace_handle_retry(self):
  322. from celery.exceptions import RetryTaskError
  323. uuid = gen_unique_id()
  324. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  325. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  326. type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
  327. exc=value_))
  328. w._store_errors = False
  329. w.handle_retry(value_, type_, tb_, "")
  330. self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
  331. w._store_errors = True
  332. w.handle_retry(value_, type_, tb_, "")
  333. self.assertEqual(mytask.backend.get_status(uuid), states.RETRY)
  334. def test_worker_task_trace_handle_failure(self):
  335. uuid = gen_unique_id()
  336. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  337. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  338. w._store_errors = False
  339. w.handle_failure(value_, type_, tb_, "")
  340. self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
  341. w._store_errors = True
  342. w.handle_failure(value_, type_, tb_, "")
  343. self.assertEqual(mytask.backend.get_status(uuid), states.FAILURE)
  344. def test_executed_bit(self):
  345. tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
  346. self.assertFalse(tw.executed)
  347. tw._set_executed_bit()
  348. self.assertTrue(tw.executed)
  349. self.assertRaises(AlreadyExecutedError, tw._set_executed_bit)
  350. def test_task_wrapper_mail_attrs(self):
  351. tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
  352. x = tw.success_msg % {"name": tw.task_name,
  353. "id": tw.task_id,
  354. "return_value": 10,
  355. "runtime": 0.3641}
  356. self.assertTrue(x)
  357. x = tw.error_msg % {"name": tw.task_name,
  358. "id": tw.task_id,
  359. "exc": "FOOBARBAZ",
  360. "traceback": "foobarbaz"}
  361. self.assertTrue(x)
  362. x = tw.email_subject % {"name": tw.task_name,
  363. "id": tw.task_id,
  364. "exc": "FOOBARBAZ",
  365. "hostname": "lana"}
  366. self.assertTrue(x)
  367. def test_from_message(self):
  368. body = {"task": mytask.name, "id": gen_unique_id(),
  369. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  370. m = Message(None, body=anyjson.serialize(body), backend="foo",
  371. content_type="application/json",
  372. content_encoding="utf-8")
  373. tw = TaskRequest.from_message(m, m.decode())
  374. self.assertIsInstance(tw, TaskRequest)
  375. self.assertEqual(tw.task_name, body["task"])
  376. self.assertEqual(tw.task_id, body["id"])
  377. self.assertEqual(tw.args, body["args"])
  378. self.assertEqual(tw.kwargs.keys()[0],
  379. u"æØåveéðƒeæ".encode("utf-8"))
  380. self.assertNotIsInstance(tw.kwargs.keys()[0], unicode)
  381. self.assertTrue(tw.logger)
  382. def test_from_message_nonexistant_task(self):
  383. body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
  384. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  385. m = Message(None, body=anyjson.serialize(body), backend="foo",
  386. content_type="application/json",
  387. content_encoding="utf-8")
  388. self.assertRaises(NotRegistered, TaskRequest.from_message,
  389. m, m.decode())
  390. def test_execute(self):
  391. tid = gen_unique_id()
  392. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  393. self.assertEqual(tw.execute(), 256)
  394. meta = mytask.backend.get_task_meta(tid)
  395. self.assertEqual(meta["result"], 256)
  396. self.assertEqual(meta["status"], states.SUCCESS)
  397. def test_execute_success_no_kwargs(self):
  398. tid = gen_unique_id()
  399. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  400. self.assertEqual(tw.execute(), 256)
  401. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  402. self.assertEqual(meta["result"], 256)
  403. self.assertEqual(meta["status"], states.SUCCESS)
  404. def test_execute_success_some_kwargs(self):
  405. tid = gen_unique_id()
  406. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  407. self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
  408. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  409. self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  410. self.assertEqual(meta["result"], 256)
  411. self.assertEqual(meta["status"], states.SUCCESS)
  412. def test_execute_ack(self):
  413. tid = gen_unique_id()
  414. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
  415. on_ack=on_ack)
  416. self.assertEqual(tw.execute(), 256)
  417. meta = mytask.backend.get_task_meta(tid)
  418. self.assertTrue(scratch["ACK"])
  419. self.assertEqual(meta["result"], 256)
  420. self.assertEqual(meta["status"], states.SUCCESS)
  421. def test_execute_fail(self):
  422. tid = gen_unique_id()
  423. tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
  424. self.assertIsInstance(tw.execute(), ExceptionInfo)
  425. meta = mytask_raising.backend.get_task_meta(tid)
  426. self.assertEqual(meta["status"], states.FAILURE)
  427. self.assertIsInstance(meta["result"], KeyError)
  428. def test_execute_using_pool(self):
  429. tid = gen_unique_id()
  430. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  431. class MockPool(BasePool):
  432. target = None
  433. args = None
  434. kwargs = None
  435. def __init__(self, *args, **kwargs):
  436. pass
  437. def apply_async(self, target, args=None, kwargs=None,
  438. *margs, **mkwargs):
  439. self.target = target
  440. self.args = args
  441. self.kwargs = kwargs
  442. p = MockPool()
  443. tw.execute_using_pool(p)
  444. self.assertTrue(p.target)
  445. self.assertEqual(p.args[0], mytask.name)
  446. self.assertEqual(p.args[1], tid)
  447. self.assertEqual(p.args[2], [4])
  448. self.assertIn("f", p.args[3])
  449. self.assertIn([4], p.args)
  450. def test_default_kwargs(self):
  451. tid = gen_unique_id()
  452. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  453. self.assertDictEqual(
  454. tw.extend_with_default_kwargs(10, "some_logfile"), {
  455. "f": "x",
  456. "logfile": "some_logfile",
  457. "loglevel": 10,
  458. "task_id": tw.task_id,
  459. "task_retries": 0,
  460. "task_is_eager": False,
  461. "delivery_info": {},
  462. "task_name": tw.task_name})
  463. def _test_on_failure(self, exception):
  464. app = app_or_default()
  465. tid = gen_unique_id()
  466. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  467. try:
  468. raise exception
  469. except Exception:
  470. exc_info = ExceptionInfo(sys.exc_info())
  471. logfh = StringIO()
  472. tw.logger.handlers = []
  473. tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
  474. root=False)
  475. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  476. tw.on_failure(exc_info)
  477. logvalue = logfh.getvalue()
  478. self.assertIn(mytask.name, logvalue)
  479. self.assertIn(tid, logvalue)
  480. self.assertIn("ERROR", logvalue)
  481. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  482. def test_on_failure(self):
  483. self._test_on_failure(Exception("Inside unit tests"))
  484. def test_on_failure_unicode_exception(self):
  485. self._test_on_failure(Exception(u"Бобры атакуют"))
  486. def test_on_failure_utf8_exception(self):
  487. self._test_on_failure(Exception(
  488. u"Бобры атакуют".encode('utf8')))