test_worker_job.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import simplejson
  4. import sys
  5. import unittest2 as unittest
  6. from StringIO import StringIO
  7. from carrot.backends.base import BaseMessage
  8. from celery import states
  9. from celery.app import app_or_default
  10. from celery.datastructures import ExceptionInfo
  11. from celery.decorators import task as task_dec
  12. from celery.exceptions import RetryTaskError, NotRegistered
  13. from celery.log import setup_logger
  14. from celery.result import AsyncResult
  15. from celery.task.base import Task
  16. from celery.utils import gen_unique_id
  17. from celery.worker.job import WorkerTaskTrace, TaskRequest
  18. from celery.worker.job import execute_and_trace, AlreadyExecutedError
  19. from celery.worker.job import InvalidTaskError
  20. from celery.worker.state import revoked
  21. from celery.tests.compat import catch_warnings
  22. from celery.tests.utils import execute_context
  23. scratch = {"ACK": False}
  24. some_kwargs_scratchpad = {}
  25. def jail(task_id, task_name, args, kwargs):
  26. return WorkerTaskTrace(task_name, task_id, args, kwargs)()
  27. def on_ack():
  28. scratch["ACK"] = True
  29. @task_dec()
  30. def mytask(i, **kwargs):
  31. return i ** i
  32. @task_dec # traverses coverage for decorator without parens
  33. def mytask_no_kwargs(i):
  34. return i ** i
  35. class MyTaskIgnoreResult(Task):
  36. ignore_result = True
  37. def run(self, i):
  38. return i ** i
  39. @task_dec()
  40. def mytask_some_kwargs(i, logfile):
  41. some_kwargs_scratchpad["logfile"] = logfile
  42. return i ** i
  43. @task_dec()
  44. def mytask_raising(i, **kwargs):
  45. raise KeyError(i)
  46. class test_RetryTaskError(unittest.TestCase):
  47. def test_retry_task_error(self):
  48. try:
  49. raise Exception("foo")
  50. except Exception, exc:
  51. ret = RetryTaskError("Retrying task", exc)
  52. self.assertEqual(ret.exc, exc)
  53. class test_WorkerTaskTrace(unittest.TestCase):
  54. def test_execute_jail_success(self):
  55. ret = jail(gen_unique_id(), mytask.name, [2], {})
  56. self.assertEqual(ret, 4)
  57. def test_marked_as_started(self):
  58. mytask.track_started = True
  59. try:
  60. jail(gen_unique_id(), mytask.name, [2], {})
  61. finally:
  62. mytask.track_started = False
  63. def test_execute_jail_failure(self):
  64. ret = jail(gen_unique_id(), mytask_raising.name,
  65. [4], {})
  66. self.assertIsInstance(ret, ExceptionInfo)
  67. self.assertTupleEqual(ret.exception.args, (4, ))
  68. def test_execute_ignore_result(self):
  69. task_id = gen_unique_id()
  70. ret = jail(id, MyTaskIgnoreResult.name,
  71. [4], {})
  72. self.assertEqual(ret, 256)
  73. self.assertFalse(AsyncResult(task_id).ready())
  74. class MockEventDispatcher(object):
  75. def __init__(self):
  76. self.sent = []
  77. def send(self, event):
  78. self.sent.append(event)
  79. class test_TaskRequest(unittest.TestCase):
  80. def test_task_wrapper_repr(self):
  81. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  82. self.assertTrue(repr(tw))
  83. def test_send_event(self):
  84. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  85. tw.eventer = MockEventDispatcher()
  86. tw.send_event("task-frobulated")
  87. self.assertIn("task-frobulated", tw.eventer.sent)
  88. def test_send_email(self):
  89. from celery.worker import job
  90. app = app_or_default()
  91. old_mail_admins = app.mail_admins
  92. old_enable_mails = mytask.send_error_emails
  93. mail_sent = [False]
  94. def mock_mail_admins(*args, **kwargs):
  95. mail_sent[0] = True
  96. app.mail_admins = mock_mail_admins
  97. mytask.send_error_emails = True
  98. try:
  99. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  100. try:
  101. raise KeyError("moofoobar")
  102. except:
  103. einfo = ExceptionInfo(sys.exc_info())
  104. tw.on_failure(einfo)
  105. self.assertTrue(mail_sent[0])
  106. mail_sent[0] = False
  107. mytask.send_error_emails = False
  108. tw.on_failure(einfo)
  109. self.assertFalse(mail_sent[0])
  110. finally:
  111. app.mail_admins = old_mail_admins
  112. mytask.send_error_emails = old_enable_mails
  113. def test_already_revoked(self):
  114. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  115. tw._already_revoked = True
  116. self.assertTrue(tw.revoked())
  117. def test_revoked(self):
  118. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  119. revoked.add(tw.task_id)
  120. self.assertTrue(tw.revoked())
  121. self.assertTrue(tw._already_revoked)
  122. self.assertTrue(tw.acknowledged)
  123. def test_execute_does_not_execute_revoked(self):
  124. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  125. revoked.add(tw.task_id)
  126. tw.execute()
  127. def test_execute_acks_late(self):
  128. mytask_raising.acks_late = True
  129. tw = TaskRequest(mytask_raising.name, gen_unique_id(), [1], {"f": "x"})
  130. try:
  131. tw.execute()
  132. self.assertTrue(tw.acknowledged)
  133. finally:
  134. mytask_raising.acks_late = False
  135. def test_execute_using_pool_does_not_execute_revoked(self):
  136. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  137. revoked.add(tw.task_id)
  138. tw.execute_using_pool(None)
  139. def test_on_accepted_acks_early(self):
  140. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  141. tw.on_accepted()
  142. self.assertTrue(tw.acknowledged)
  143. def test_on_accepted_acks_late(self):
  144. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  145. mytask.acks_late = True
  146. try:
  147. tw.on_accepted()
  148. self.assertFalse(tw.acknowledged)
  149. finally:
  150. mytask.acks_late = False
  151. def test_on_success_acks_early(self):
  152. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  153. tw.time_start = 1
  154. tw.on_success(42)
  155. self.assertFalse(tw.acknowledged)
  156. def test_on_success_acks_late(self):
  157. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  158. tw.time_start = 1
  159. mytask.acks_late = True
  160. try:
  161. tw.on_success(42)
  162. self.assertTrue(tw.acknowledged)
  163. finally:
  164. mytask.acks_late = False
  165. def test_on_failure_acks_late(self):
  166. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  167. tw.time_start = 1
  168. mytask.acks_late = True
  169. try:
  170. try:
  171. raise KeyError("foo")
  172. except KeyError:
  173. exc_info = ExceptionInfo(sys.exc_info())
  174. tw.on_failure(exc_info)
  175. self.assertTrue(tw.acknowledged)
  176. finally:
  177. mytask.acks_late = False
  178. def test_from_message_invalid_kwargs(self):
  179. message_data = dict(task="foo", id=1, args=(), kwargs="foo")
  180. self.assertRaises(InvalidTaskError, TaskRequest.from_message, None,
  181. message_data)
  182. def test_on_timeout(self):
  183. class MockLogger(object):
  184. def __init__(self):
  185. self.warnings = []
  186. self.errors = []
  187. def warning(self, msg, *args, **kwargs):
  188. self.warnings.append(msg)
  189. def error(self, msg, *args, **kwargs):
  190. self.errors.append(msg)
  191. tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
  192. tw.logger = MockLogger()
  193. tw.on_timeout(soft=True)
  194. self.assertIn("Soft time limit exceeded", tw.logger.warnings[0])
  195. tw.on_timeout(soft=False)
  196. self.assertIn("Hard time limit exceeded", tw.logger.errors[0])
  197. def test_execute_and_trace(self):
  198. res = execute_and_trace(mytask.name, gen_unique_id(), [4], {})
  199. self.assertEqual(res, 4 ** 4)
  200. def test_execute_safe_catches_exception(self):
  201. old_exec = WorkerTaskTrace.execute
  202. def _error_exec(self, *args, **kwargs):
  203. raise KeyError("baz")
  204. WorkerTaskTrace.execute = _error_exec
  205. try:
  206. def with_catch_warnings(log):
  207. res = execute_and_trace(mytask.name, gen_unique_id(),
  208. [4], {})
  209. self.assertIsInstance(res, ExceptionInfo)
  210. self.assertTrue(log)
  211. self.assertIn("Exception outside", log[0].message.args[0])
  212. self.assertIn("KeyError", log[0].message.args[0])
  213. context = catch_warnings(record=True)
  214. execute_context(context, with_catch_warnings)
  215. finally:
  216. WorkerTaskTrace.execute = old_exec
  217. def create_exception(self, exc):
  218. try:
  219. raise exc
  220. except exc.__class__:
  221. return sys.exc_info()
  222. def test_worker_task_trace_handle_retry(self):
  223. from celery.exceptions import RetryTaskError
  224. uuid = gen_unique_id()
  225. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  226. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  227. type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
  228. exc=value_))
  229. w._store_errors = False
  230. w.handle_retry(value_, type_, tb_, "")
  231. self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
  232. w._store_errors = True
  233. w.handle_retry(value_, type_, tb_, "")
  234. self.assertEqual(mytask.backend.get_status(uuid), states.RETRY)
  235. def test_worker_task_trace_handle_failure(self):
  236. uuid = gen_unique_id()
  237. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  238. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  239. w._store_errors = False
  240. w.handle_failure(value_, type_, tb_, "")
  241. self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
  242. w._store_errors = True
  243. w.handle_failure(value_, type_, tb_, "")
  244. self.assertEqual(mytask.backend.get_status(uuid), states.FAILURE)
  245. def test_executed_bit(self):
  246. tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
  247. self.assertFalse(tw.executed)
  248. tw._set_executed_bit()
  249. self.assertTrue(tw.executed)
  250. self.assertRaises(AlreadyExecutedError, tw._set_executed_bit)
  251. def test_task_wrapper_mail_attrs(self):
  252. tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
  253. x = tw.success_msg % {"name": tw.task_name,
  254. "id": tw.task_id,
  255. "return_value": 10}
  256. self.assertTrue(x)
  257. x = tw.error_msg % {"name": tw.task_name,
  258. "id": tw.task_id,
  259. "exc": "FOOBARBAZ",
  260. "traceback": "foobarbaz"}
  261. self.assertTrue(x)
  262. x = tw.email_subject % {"name": tw.task_name,
  263. "id": tw.task_id,
  264. "exc": "FOOBARBAZ",
  265. "hostname": "lana"}
  266. self.assertTrue(x)
  267. def test_from_message(self):
  268. body = {"task": mytask.name, "id": gen_unique_id(),
  269. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  270. m = BaseMessage(body=simplejson.dumps(body), backend="foo",
  271. content_type="application/json",
  272. content_encoding="utf-8")
  273. tw = TaskRequest.from_message(m, m.decode())
  274. self.assertIsInstance(tw, TaskRequest)
  275. self.assertEqual(tw.task_name, body["task"])
  276. self.assertEqual(tw.task_id, body["id"])
  277. self.assertEqual(tw.args, body["args"])
  278. self.assertEqual(tw.kwargs.keys()[0],
  279. u"æØåveéðƒeæ".encode("utf-8"))
  280. self.assertNotIsInstance(tw.kwargs.keys()[0], unicode)
  281. self.assertTrue(tw.logger)
  282. def test_from_message_nonexistant_task(self):
  283. body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
  284. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  285. m = BaseMessage(body=simplejson.dumps(body), backend="foo",
  286. content_type="application/json",
  287. content_encoding="utf-8")
  288. self.assertRaises(NotRegistered, TaskRequest.from_message,
  289. m, m.decode())
  290. def test_execute(self):
  291. tid = gen_unique_id()
  292. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  293. self.assertEqual(tw.execute(), 256)
  294. meta = mytask.backend.get_task_meta(tid)
  295. self.assertEqual(meta["result"], 256)
  296. self.assertEqual(meta["status"], states.SUCCESS)
  297. def test_execute_success_no_kwargs(self):
  298. tid = gen_unique_id()
  299. tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
  300. self.assertEqual(tw.execute(), 256)
  301. meta = mytask_no_kwargs.backend.get_task_meta(tid)
  302. self.assertEqual(meta["result"], 256)
  303. self.assertEqual(meta["status"], states.SUCCESS)
  304. def test_execute_success_some_kwargs(self):
  305. tid = gen_unique_id()
  306. tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
  307. self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
  308. meta = mytask_some_kwargs.backend.get_task_meta(tid)
  309. self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  310. self.assertEqual(meta["result"], 256)
  311. self.assertEqual(meta["status"], states.SUCCESS)
  312. def test_execute_ack(self):
  313. tid = gen_unique_id()
  314. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
  315. on_ack=on_ack)
  316. self.assertEqual(tw.execute(), 256)
  317. meta = mytask.backend.get_task_meta(tid)
  318. self.assertTrue(scratch["ACK"])
  319. self.assertEqual(meta["result"], 256)
  320. self.assertEqual(meta["status"], states.SUCCESS)
  321. def test_execute_fail(self):
  322. tid = gen_unique_id()
  323. tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
  324. self.assertIsInstance(tw.execute(), ExceptionInfo)
  325. meta = mytask_raising.backend.get_task_meta(tid)
  326. self.assertEqual(meta["status"], states.FAILURE)
  327. self.assertIsInstance(meta["result"], KeyError)
  328. def test_execute_using_pool(self):
  329. tid = gen_unique_id()
  330. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  331. class MockPool(object):
  332. target = None
  333. args = None
  334. kwargs = None
  335. def __init__(self, *args, **kwargs):
  336. pass
  337. def apply_async(self, target, args=None, kwargs=None,
  338. *margs, **mkwargs):
  339. self.target = target
  340. self.args = args
  341. self.kwargs = kwargs
  342. p = MockPool()
  343. tw.execute_using_pool(p)
  344. self.assertTrue(p.target)
  345. self.assertEqual(p.args[0], mytask.name)
  346. self.assertEqual(p.args[1], tid)
  347. self.assertEqual(p.args[2], [4])
  348. self.assertIn("f", p.args[3])
  349. self.assertIn([4], p.args)
  350. def test_default_kwargs(self):
  351. tid = gen_unique_id()
  352. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  353. self.assertDictEqual(
  354. tw.extend_with_default_kwargs(10, "some_logfile"), {
  355. "f": "x",
  356. "logfile": "some_logfile",
  357. "loglevel": 10,
  358. "task_id": tw.task_id,
  359. "task_retries": 0,
  360. "task_is_eager": False,
  361. "delivery_info": {},
  362. "task_name": tw.task_name})
  363. def _test_on_failure(self, exception):
  364. app = app_or_default()
  365. tid = gen_unique_id()
  366. tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
  367. try:
  368. raise exception
  369. except Exception:
  370. exc_info = ExceptionInfo(sys.exc_info())
  371. logfh = StringIO()
  372. tw.logger.handlers = []
  373. tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
  374. root=False)
  375. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  376. tw.on_failure(exc_info)
  377. logvalue = logfh.getvalue()
  378. self.assertIn(mytask.name, logvalue)
  379. self.assertIn(tid, logvalue)
  380. self.assertIn("ERROR", logvalue)
  381. app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  382. def test_on_failure(self):
  383. self._test_on_failure(Exception("Inside unit tests"))
  384. def test_on_failure_unicode_exception(self):
  385. self._test_on_failure(Exception(u"Бобры атакуют"))
  386. def test_on_failure_utf8_exception(self):
  387. self._test_on_failure(Exception(
  388. u"Бобры атакуют".encode('utf8')))