|
@@ -1,4 +1,6 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
|
+from __future__ import with_statement
|
|
|
+
|
|
|
import sys
|
|
|
import logging
|
|
|
import unittest
|
|
@@ -156,12 +158,125 @@ class TestJail(unittest.TestCase):
|
|
|
del(cache.parse_backend_uri)
|
|
|
|
|
|
|
|
|
+class MockEventDispatcher(object):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.sent = []
|
|
|
+
|
|
|
+ def send(self, event):
|
|
|
+ self.sent.append(event)
|
|
|
+
|
|
|
+
|
|
|
class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_task_wrapper_repr(self):
|
|
|
tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
|
|
|
self.assertTrue(repr(tw))
|
|
|
|
|
|
+ def test_send_event(self):
|
|
|
+ tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
|
|
|
+ tw.eventer = MockEventDispatcher()
|
|
|
+ tw.send_event("task-frobulated")
|
|
|
+ self.assertTrue("task-frobulated" in tw.eventer.sent)
|
|
|
+
|
|
|
+ def test_send_email(self):
|
|
|
+ from celery import conf
|
|
|
+ from celery.worker import job
|
|
|
+ old_mail_admins = job.mail_admins
|
|
|
+ old_enable_mails = conf.CELERY_SEND_TASK_ERROR_EMAILS
|
|
|
+ mail_sent = [False]
|
|
|
+
|
|
|
+ def mock_mail_admins(*args, **kwargs):
|
|
|
+ mail_sent[0] = True
|
|
|
+
|
|
|
+ job.mail_admins = mock_mail_admins
|
|
|
+ conf.CELERY_SEND_TASK_ERROR_EMAILS = True
|
|
|
+ try:
|
|
|
+ tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
|
|
|
+ try:
|
|
|
+ raise KeyError("foo")
|
|
|
+ except KeyError, exc:
|
|
|
+ einfo = ExceptionInfo(sys.exc_info())
|
|
|
+
|
|
|
+ tw.on_failure(einfo)
|
|
|
+ self.assertTrue(mail_sent[0])
|
|
|
+
|
|
|
+ mail_sent[0] = False
|
|
|
+ conf.CELERY_SEND_TASK_ERROR_EMAILS = False
|
|
|
+ tw.on_failure(einfo)
|
|
|
+ self.assertFalse(mail_sent[0])
|
|
|
+
|
|
|
+ finally:
|
|
|
+ job.mail_admins = old_mail_admins
|
|
|
+ conf.CELERY_SEND_TASK_ERROR_EMAILS = old_enable_mails
|
|
|
+
|
|
|
+ def test_execute_and_trace(self):
|
|
|
+ from celery.worker.job import execute_and_trace
|
|
|
+ res = execute_and_trace(mytask.name, gen_unique_id(), [4], {})
|
|
|
+ self.assertEquals(res, 4 ** 4)
|
|
|
+
|
|
|
+ def test_execute_safe_catches_exception(self):
|
|
|
+ from celery.worker.job import execute_and_trace, WorkerTaskTrace
|
|
|
+ old_exec = WorkerTaskTrace.execute
|
|
|
+
|
|
|
+ def _error_exec(self, *args, **kwargs):
|
|
|
+ raise KeyError("baz")
|
|
|
+
|
|
|
+ WorkerTaskTrace.execute = _error_exec
|
|
|
+ try:
|
|
|
+ import warnings
|
|
|
+ with warnings.catch_warnings(record=True) as log:
|
|
|
+ res = execute_and_trace(mytask.name, gen_unique_id(),
|
|
|
+ [4], {})
|
|
|
+ self.assertTrue(isinstance(res, ExceptionInfo))
|
|
|
+ self.assertTrue(log)
|
|
|
+ self.assertTrue("Exception outside" in log[0].message.args[0])
|
|
|
+ self.assertTrue("KeyError" in log[0].message.args[0])
|
|
|
+ finally:
|
|
|
+ WorkerTaskTrace.execute = old_exec
|
|
|
+
|
|
|
+ def create_exception(self, exc):
|
|
|
+ try:
|
|
|
+ raise exc
|
|
|
+ except exc.__class__, thrown:
|
|
|
+ return sys.exc_info()
|
|
|
+
|
|
|
+ def test_worker_task_trace_handle_retry(self):
|
|
|
+ from celery.exceptions import RetryTaskError
|
|
|
+ uuid = gen_unique_id()
|
|
|
+ w = WorkerTaskTrace(mytask.name, uuid, [4], {})
|
|
|
+ type_, value_, tb_ = self.create_exception(ValueError("foo"))
|
|
|
+ type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
|
|
|
+ exc=value_))
|
|
|
+ w._store_errors = False
|
|
|
+ w.handle_retry(value_, type_, tb_, "")
|
|
|
+ self.assertEquals(mytask.backend.get_status(uuid), "PENDING")
|
|
|
+ w._store_errors = True
|
|
|
+ w.handle_retry(value_, type_, tb_, "")
|
|
|
+ self.assertEquals(mytask.backend.get_status(uuid), "RETRY")
|
|
|
+
|
|
|
+
|
|
|
+ def test_worker_task_trace_handle_failure(self):
|
|
|
+ from celery.worker.job import WorkerTaskTrace
|
|
|
+ uuid = gen_unique_id()
|
|
|
+ w = WorkerTaskTrace(mytask.name, uuid, [4], {})
|
|
|
+ type_, value_, tb_ = self.create_exception(ValueError("foo"))
|
|
|
+ w._store_errors = False
|
|
|
+ w.handle_failure(value_, type_, tb_, "")
|
|
|
+ self.assertEquals(mytask.backend.get_status(uuid), "PENDING")
|
|
|
+ w._store_errors = True
|
|
|
+ w.handle_failure(value_, type_, tb_, "")
|
|
|
+ self.assertEquals(mytask.backend.get_status(uuid), "FAILURE")
|
|
|
+
|
|
|
+
|
|
|
+ def test_executed_bit(self):
|
|
|
+ from celery.worker.job import AlreadyExecutedError
|
|
|
+ tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
|
|
|
+ self.assertFalse(tw.executed)
|
|
|
+ tw._set_executed_bit()
|
|
|
+ self.assertTrue(tw.executed)
|
|
|
+ self.assertRaises(AlreadyExecutedError, tw._set_executed_bit)
|
|
|
+
|
|
|
def test_task_wrapper_mail_attrs(self):
|
|
|
tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
|
|
|
x = tw.success_msg % {"name": tw.task_name,
|