test_worker_job.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. import sys
  3. import logging
  4. import unittest
  5. import simplejson
  6. from StringIO import StringIO
  7. from django.core import cache
  8. from carrot.backends.base import BaseMessage
  9. from celery import states
  10. from celery.log import setup_logger
  11. from celery.task.base import Task
  12. from celery.utils import gen_unique_id
  13. from celery.models import TaskMeta
  14. from celery.result import AsyncResult
  15. from celery.worker.job import WorkerTaskTrace, TaskWrapper
  16. from celery.worker.pool import TaskPool
  17. from celery.exceptions import RetryTaskError, NotRegistered
  18. from celery.decorators import task as task_dec
  19. from celery.datastructures import ExceptionInfo
  20. from testunits.utils import execute_context
  21. from testunits.compat import catch_warnings
  22. scratch = {"ACK": False}
  23. some_kwargs_scratchpad = {}
  24. def jail(task_id, task_name, args, kwargs):
  25. return WorkerTaskTrace(task_name, task_id, args, kwargs)()
  26. def on_ack():
  27. scratch["ACK"] = True
  28. @task_dec()
  29. def mytask(i, **kwargs):
  30. return i ** i
  31. @task_dec()
  32. def mytask_no_kwargs(i):
  33. return i ** i
  34. class MyTaskIgnoreResult(Task):
  35. ignore_result = True
  36. def run(self, i):
  37. return i ** i
  38. @task_dec()
  39. def mytask_some_kwargs(i, logfile):
  40. some_kwargs_scratchpad["logfile"] = logfile
  41. return i ** i
  42. @task_dec()
  43. def mytask_raising(i, **kwargs):
  44. raise KeyError(i)
  45. @task_dec()
  46. def get_db_connection(i, **kwargs):
  47. from django.db import connection
  48. return id(connection)
  49. get_db_connection.ignore_result = True
  50. class TestRetryTaskError(unittest.TestCase):
  51. def test_retry_task_error(self):
  52. try:
  53. raise Exception("foo")
  54. except Exception, exc:
  55. ret = RetryTaskError("Retrying task", exc)
  56. self.assertEquals(ret.exc, exc)
  57. class TestJail(unittest.TestCase):
  58. def test_execute_jail_success(self):
  59. ret = jail(gen_unique_id(), mytask.name, [2], {})
  60. self.assertEquals(ret, 4)
  61. def test_execute_jail_failure(self):
  62. ret = jail(gen_unique_id(), mytask_raising.name,
  63. [4], {})
  64. self.assertTrue(isinstance(ret, ExceptionInfo))
  65. self.assertEquals(ret.exception.args, (4, ))
  66. def test_execute_ignore_result(self):
  67. task_id = gen_unique_id()
  68. ret = jail(id, MyTaskIgnoreResult.name,
  69. [4], {})
  70. self.assertTrue(ret, 8)
  71. self.assertFalse(AsyncResult(task_id).ready())
  72. def test_django_db_connection_is_closed(self):
  73. from django.db import connection
  74. connection._was_closed = False
  75. old_connection_close = connection.close
  76. def monkeypatched_connection_close(*args, **kwargs):
  77. connection._was_closed = True
  78. return old_connection_close(*args, **kwargs)
  79. connection.close = monkeypatched_connection_close
  80. try:
  81. jail(gen_unique_id(), get_db_connection.name, [2], {})
  82. self.assertTrue(connection._was_closed)
  83. finally:
  84. connection.close = old_connection_close
  85. def test_django_cache_connection_is_closed(self):
  86. old_cache_close = getattr(cache.cache, "close", None)
  87. old_backend = cache.settings.CACHE_BACKEND
  88. cache.settings.CACHE_BACKEND = "libmemcached"
  89. cache._was_closed = False
  90. old_cache_parse_backend = getattr(cache, "parse_backend_uri", None)
  91. if old_cache_parse_backend: # checks to make sure attr exists
  92. delattr(cache, 'parse_backend_uri')
  93. def monkeypatched_cache_close(*args, **kwargs):
  94. cache._was_closed = True
  95. cache.cache.close = monkeypatched_cache_close
  96. jail(gen_unique_id(), mytask.name, [4], {})
  97. self.assertTrue(cache._was_closed)
  98. cache.cache.close = old_cache_close
  99. cache.settings.CACHE_BACKEND = old_backend
  100. if old_cache_parse_backend:
  101. cache.parse_backend_uri = old_cache_parse_backend
  102. def test_django_cache_connection_is_closed_django_1_1(self):
  103. old_cache_close = getattr(cache.cache, "close", None)
  104. old_backend = cache.settings.CACHE_BACKEND
  105. cache.settings.CACHE_BACKEND = "libmemcached"
  106. cache._was_closed = False
  107. old_cache_parse_backend = getattr(cache, "parse_backend_uri", None)
  108. cache.parse_backend_uri = lambda uri: ["libmemcached", "1", "2"]
  109. def monkeypatched_cache_close(*args, **kwargs):
  110. cache._was_closed = True
  111. cache.cache.close = monkeypatched_cache_close
  112. jail(gen_unique_id(), mytask.name, [4], {})
  113. self.assertTrue(cache._was_closed)
  114. cache.cache.close = old_cache_close
  115. cache.settings.CACHE_BACKEND = old_backend
  116. if old_cache_parse_backend:
  117. cache.parse_backend_uri = old_cache_parse_backend
  118. else:
  119. del(cache.parse_backend_uri)
  120. class MockEventDispatcher(object):
  121. def __init__(self):
  122. self.sent = []
  123. def send(self, event):
  124. self.sent.append(event)
  125. class TestTaskWrapper(unittest.TestCase):
  126. def test_task_wrapper_repr(self):
  127. tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
  128. self.assertTrue(repr(tw))
  129. def test_send_event(self):
  130. tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
  131. tw.eventer = MockEventDispatcher()
  132. tw.send_event("task-frobulated")
  133. self.assertTrue("task-frobulated" in tw.eventer.sent)
  134. def test_send_email(self):
  135. from celery import conf
  136. from celery.worker import job
  137. old_mail_admins = job.mail_admins
  138. old_enable_mails = conf.CELERY_SEND_TASK_ERROR_EMAILS
  139. mail_sent = [False]
  140. def mock_mail_admins(*args, **kwargs):
  141. mail_sent[0] = True
  142. job.mail_admins = mock_mail_admins
  143. conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  144. try:
  145. tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
  146. try:
  147. raise KeyError("foo")
  148. except KeyError, exc:
  149. einfo = ExceptionInfo(sys.exc_info())
  150. tw.on_failure(einfo)
  151. self.assertTrue(mail_sent[0])
  152. mail_sent[0] = False
  153. conf.CELERY_SEND_TASK_ERROR_EMAILS = False
  154. tw.on_failure(einfo)
  155. self.assertFalse(mail_sent[0])
  156. finally:
  157. job.mail_admins = old_mail_admins
  158. conf.CELERY_SEND_TASK_ERROR_EMAILS = old_enable_mails
  159. def test_execute_and_trace(self):
  160. from celery.worker.job import execute_and_trace
  161. res = execute_and_trace(mytask.name, gen_unique_id(), [4], {})
  162. self.assertEquals(res, 4 ** 4)
  163. def test_execute_safe_catches_exception(self):
  164. from celery.worker.job import execute_and_trace, WorkerTaskTrace
  165. old_exec = WorkerTaskTrace.execute
  166. def _error_exec(self, *args, **kwargs):
  167. raise KeyError("baz")
  168. WorkerTaskTrace.execute = _error_exec
  169. try:
  170. def with_catch_warnings(log):
  171. res = execute_and_trace(mytask.name, gen_unique_id(),
  172. [4], {})
  173. self.assertTrue(isinstance(res, ExceptionInfo))
  174. self.assertTrue(log)
  175. self.assertTrue("Exception outside" in log[0].message.args[0])
  176. self.assertTrue("KeyError" in log[0].message.args[0])
  177. context = catch_warnings(record=True)
  178. execute_context(context, with_catch_warnings)
  179. finally:
  180. WorkerTaskTrace.execute = old_exec
  181. def create_exception(self, exc):
  182. try:
  183. raise exc
  184. except exc.__class__, thrown:
  185. return sys.exc_info()
  186. def test_worker_task_trace_handle_retry(self):
  187. from celery.exceptions import RetryTaskError
  188. uuid = gen_unique_id()
  189. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  190. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  191. type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
  192. exc=value_))
  193. w._store_errors = False
  194. w.handle_retry(value_, type_, tb_, "")
  195. self.assertEquals(mytask.backend.get_status(uuid), states.PENDING)
  196. w._store_errors = True
  197. w.handle_retry(value_, type_, tb_, "")
  198. self.assertEquals(mytask.backend.get_status(uuid), states.RETRY)
  199. def test_worker_task_trace_handle_failure(self):
  200. from celery.worker.job import WorkerTaskTrace
  201. uuid = gen_unique_id()
  202. w = WorkerTaskTrace(mytask.name, uuid, [4], {})
  203. type_, value_, tb_ = self.create_exception(ValueError("foo"))
  204. w._store_errors = False
  205. w.handle_failure(value_, type_, tb_, "")
  206. self.assertEquals(mytask.backend.get_status(uuid), states.PENDING)
  207. w._store_errors = True
  208. w.handle_failure(value_, type_, tb_, "")
  209. self.assertEquals(mytask.backend.get_status(uuid), states.FAILURE)
  210. def test_executed_bit(self):
  211. from celery.worker.job import AlreadyExecutedError
  212. tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
  213. self.assertFalse(tw.executed)
  214. tw._set_executed_bit()
  215. self.assertTrue(tw.executed)
  216. self.assertRaises(AlreadyExecutedError, tw._set_executed_bit)
  217. def test_task_wrapper_mail_attrs(self):
  218. tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
  219. x = tw.success_msg % {"name": tw.task_name,
  220. "id": tw.task_id,
  221. "return_value": 10}
  222. self.assertTrue(x)
  223. x = tw.fail_msg % {"name": tw.task_name,
  224. "id": tw.task_id,
  225. "exc": "FOOBARBAZ",
  226. "traceback": "foobarbaz"}
  227. self.assertTrue(x)
  228. x = tw.fail_email_subject % {"name": tw.task_name,
  229. "id": tw.task_id,
  230. "exc": "FOOBARBAZ",
  231. "hostname": "lana"}
  232. self.assertTrue(x)
  233. def test_from_message(self):
  234. body = {"task": mytask.name, "id": gen_unique_id(),
  235. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  236. m = BaseMessage(body=simplejson.dumps(body), backend="foo",
  237. content_type="application/json",
  238. content_encoding="utf-8")
  239. tw = TaskWrapper.from_message(m, m.decode())
  240. self.assertTrue(isinstance(tw, TaskWrapper))
  241. self.assertEquals(tw.task_name, body["task"])
  242. self.assertEquals(tw.task_id, body["id"])
  243. self.assertEquals(tw.args, body["args"])
  244. self.assertEquals(tw.kwargs.keys()[0],
  245. u"æØåveéðƒeæ".encode("utf-8"))
  246. self.assertFalse(isinstance(tw.kwargs.keys()[0], unicode))
  247. self.assertTrue(tw.logger)
  248. def test_from_message_nonexistant_task(self):
  249. body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
  250. "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
  251. m = BaseMessage(body=simplejson.dumps(body), backend="foo",
  252. content_type="application/json",
  253. content_encoding="utf-8")
  254. self.assertRaises(NotRegistered, TaskWrapper.from_message,
  255. m, m.decode())
  256. def test_execute(self):
  257. tid = gen_unique_id()
  258. tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
  259. self.assertEquals(tw.execute(), 256)
  260. meta = TaskMeta.objects.get(task_id=tid)
  261. self.assertEquals(meta.result, 256)
  262. self.assertEquals(meta.status, states.SUCCESS)
  263. def test_execute_success_no_kwargs(self):
  264. tid = gen_unique_id()
  265. tw = TaskWrapper(mytask_no_kwargs.name, tid, [4], {})
  266. self.assertEquals(tw.execute(), 256)
  267. meta = TaskMeta.objects.get(task_id=tid)
  268. self.assertEquals(meta.result, 256)
  269. self.assertEquals(meta.status, states.SUCCESS)
  270. def test_execute_success_some_kwargs(self):
  271. tid = gen_unique_id()
  272. tw = TaskWrapper(mytask_some_kwargs.name, tid, [4], {})
  273. self.assertEquals(tw.execute(logfile="foobaz.log"), 256)
  274. meta = TaskMeta.objects.get(task_id=tid)
  275. self.assertEquals(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
  276. self.assertEquals(meta.result, 256)
  277. self.assertEquals(meta.status, states.SUCCESS)
  278. def test_execute_ack(self):
  279. tid = gen_unique_id()
  280. tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"},
  281. on_ack=on_ack)
  282. self.assertEquals(tw.execute(), 256)
  283. meta = TaskMeta.objects.get(task_id=tid)
  284. self.assertTrue(scratch["ACK"])
  285. self.assertEquals(meta.result, 256)
  286. self.assertEquals(meta.status, states.SUCCESS)
  287. def test_execute_fail(self):
  288. tid = gen_unique_id()
  289. tw = TaskWrapper(mytask_raising.name, tid, [4], {"f": "x"})
  290. self.assertTrue(isinstance(tw.execute(), ExceptionInfo))
  291. meta = TaskMeta.objects.get(task_id=tid)
  292. self.assertEquals(meta.status, states.FAILURE)
  293. self.assertTrue(isinstance(meta.result, KeyError))
  294. def test_execute_using_pool(self):
  295. tid = gen_unique_id()
  296. tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
  297. p = TaskPool(2)
  298. p.start()
  299. asyncres = tw.execute_using_pool(p)
  300. self.assertTrue(asyncres.get(), 256)
  301. p.stop()
  302. def test_default_kwargs(self):
  303. tid = gen_unique_id()
  304. tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
  305. self.assertEquals(tw.extend_with_default_kwargs(10, "some_logfile"), {
  306. "f": "x",
  307. "logfile": "some_logfile",
  308. "loglevel": 10,
  309. "task_id": tw.task_id,
  310. "task_retries": 0,
  311. "task_is_eager": False,
  312. "delivery_info": {},
  313. "task_name": tw.task_name})
  314. def test_on_failure(self):
  315. tid = gen_unique_id()
  316. tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
  317. try:
  318. raise Exception("Inside unit tests")
  319. except Exception:
  320. exc_info = ExceptionInfo(sys.exc_info())
  321. logfh = StringIO()
  322. tw.logger.handlers = []
  323. tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO)
  324. from celery import conf
  325. conf.CELERY_SEND_TASK_ERROR_EMAILS = True
  326. tw.on_failure(exc_info)
  327. logvalue = logfh.getvalue()
  328. self.assertTrue(mytask.name in logvalue)
  329. self.assertTrue(tid in logvalue)
  330. self.assertTrue("ERROR" in logvalue)
  331. conf.CELERY_SEND_TASK_ERROR_EMAILS = False