|
@@ -38,7 +38,7 @@ from celery.worker.state import revoked
|
|
|
|
|
|
from celery.tests.utils import Case
|
|
|
|
|
|
-scratch = {"ACK": False}
|
|
|
+scratch = {'ACK': False}
|
|
|
some_kwargs_scratchpad = {}
|
|
|
|
|
|
|
|
@@ -62,17 +62,17 @@ class test_mro_lookup(Case):
|
|
|
return ()
|
|
|
|
|
|
A.x = 10
|
|
|
- self.assertEqual(mro_lookup(C, "x"), A)
|
|
|
- self.assertIsNone(mro_lookup(C, "x", stop=(A, )))
|
|
|
+ self.assertEqual(mro_lookup(C, 'x'), A)
|
|
|
+ self.assertIsNone(mro_lookup(C, 'x', stop=(A, )))
|
|
|
B.x = 10
|
|
|
- self.assertEqual(mro_lookup(C, "x"), B)
|
|
|
+ self.assertEqual(mro_lookup(C, 'x'), B)
|
|
|
C.x = 10
|
|
|
- self.assertEqual(mro_lookup(C, "x"), C)
|
|
|
- self.assertIsNone(mro_lookup(D, "x"))
|
|
|
+ self.assertEqual(mro_lookup(C, 'x'), C)
|
|
|
+ self.assertIsNone(mro_lookup(D, 'x'))
|
|
|
|
|
|
|
|
|
def jail(task_id, name, args, kwargs):
|
|
|
- request = {"id": task_id}
|
|
|
+ request = {'id': task_id}
|
|
|
task = current_app.tasks[name]
|
|
|
task.__trace__ = None # rebuild
|
|
|
return trace_task(task,
|
|
@@ -80,7 +80,7 @@ def jail(task_id, name, args, kwargs):
|
|
|
|
|
|
|
|
|
def on_ack(*args, **kwargs):
|
|
|
- scratch["ACK"] = True
|
|
|
+ scratch['ACK'] = True
|
|
|
|
|
|
|
|
|
@task_dec(accept_magic_kwargs=False)
|
|
@@ -102,7 +102,7 @@ class MyTaskIgnoreResult(Task):
|
|
|
|
|
|
@task_dec(accept_magic_kwargs=True)
|
|
|
def mytask_some_kwargs(i, task_id):
|
|
|
- some_kwargs_scratchpad["task_id"] = task_id
|
|
|
+ some_kwargs_scratchpad['task_id'] = task_id
|
|
|
return i ** i
|
|
|
|
|
|
|
|
@@ -115,21 +115,21 @@ class test_default_encode(Case):
|
|
|
|
|
|
def setUp(self):
|
|
|
if sys.version_info >= (3, 0):
|
|
|
- raise SkipTest("py3k: not relevant")
|
|
|
+ raise SkipTest('py3k: not relevant')
|
|
|
|
|
|
def test_jython(self):
|
|
|
- prev, sys.platform = sys.platform, "java 1.6.1"
|
|
|
+ prev, sys.platform = sys.platform, 'java 1.6.1'
|
|
|
try:
|
|
|
- self.assertEqual(default_encode("foo"), "foo")
|
|
|
+ self.assertEqual(default_encode('foo'), 'foo')
|
|
|
finally:
|
|
|
sys.platform = prev
|
|
|
|
|
|
def test_cython(self):
|
|
|
- prev, sys.platform = sys.platform, "darwin"
|
|
|
+ prev, sys.platform = sys.platform, 'darwin'
|
|
|
gfe, sys.getfilesystemencoding = sys.getfilesystemencoding, \
|
|
|
- lambda: "utf-8"
|
|
|
+ lambda: 'utf-8'
|
|
|
try:
|
|
|
- self.assertEqual(default_encode("foo"), "foo")
|
|
|
+ self.assertEqual(default_encode('foo'), 'foo')
|
|
|
finally:
|
|
|
sys.platform = prev
|
|
|
sys.getfilesystemencoding = gfe
|
|
@@ -139,15 +139,15 @@ class test_RetryTaskError(Case):
|
|
|
|
|
|
def test_retry_task_error(self):
|
|
|
try:
|
|
|
- raise Exception("foo")
|
|
|
+ raise Exception('foo')
|
|
|
except Exception, exc:
|
|
|
- ret = RetryTaskError("Retrying task", exc)
|
|
|
+ ret = RetryTaskError('Retrying task', exc)
|
|
|
self.assertEqual(ret.exc, exc)
|
|
|
|
|
|
|
|
|
class test_trace_task(Case):
|
|
|
|
|
|
- @patch("celery.task.trace._logger")
|
|
|
+ @patch('celery.task.trace._logger')
|
|
|
def test_process_cleanup_fails(self, _logger):
|
|
|
backend = mytask.backend
|
|
|
mytask.backend = Mock()
|
|
@@ -158,7 +158,7 @@ class test_trace_task(Case):
|
|
|
self.assertEqual(ret, 4)
|
|
|
mytask.backend.store_result.assert_called_with(tid, 4,
|
|
|
states.SUCCESS)
|
|
|
- self.assertIn("Process cleanup failed",
|
|
|
+ self.assertIn('Process cleanup failed',
|
|
|
_logger.error.call_args[0][0])
|
|
|
finally:
|
|
|
mytask.backend = backend
|
|
@@ -229,15 +229,15 @@ class MockEventDispatcher(object):
|
|
|
class test_TaskRequest(Case):
|
|
|
|
|
|
def test_task_wrapper_repr(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
self.assertTrue(repr(tw))
|
|
|
|
|
|
- @patch("celery.worker.job.kwdict")
|
|
|
+ @patch('celery.worker.job.kwdict')
|
|
|
def test_kwdict(self, kwdict):
|
|
|
|
|
|
prev, module.NEEDS_KWDICT = module.NEEDS_KWDICT, True
|
|
|
try:
|
|
|
- TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
self.assertTrue(kwdict.called)
|
|
|
finally:
|
|
|
module.NEEDS_KWDICT = prev
|
|
@@ -245,30 +245,30 @@ class test_TaskRequest(Case):
|
|
|
def test_sets_store_errors(self):
|
|
|
mytask.ignore_result = True
|
|
|
try:
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
self.assertFalse(tw.store_errors)
|
|
|
mytask.store_errors_even_if_ignored = True
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
self.assertTrue(tw.store_errors)
|
|
|
finally:
|
|
|
mytask.ignore_result = False
|
|
|
mytask.store_errors_even_if_ignored = False
|
|
|
|
|
|
def test_send_event(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.eventer = MockEventDispatcher()
|
|
|
- tw.send_event("task-frobulated")
|
|
|
- self.assertIn("task-frobulated", tw.eventer.sent)
|
|
|
+ tw.send_event('task-frobulated')
|
|
|
+ self.assertIn('task-frobulated', tw.eventer.sent)
|
|
|
|
|
|
def test_on_retry(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.eventer = MockEventDispatcher()
|
|
|
try:
|
|
|
- raise RetryTaskError("foo", KeyError("moofoobar"))
|
|
|
+ raise RetryTaskError('foo', KeyError('moofoobar'))
|
|
|
except:
|
|
|
einfo = ExceptionInfo()
|
|
|
tw.on_failure(einfo)
|
|
|
- self.assertIn("task-retried", tw.eventer.sent)
|
|
|
+ self.assertIn('task-retried', tw.eventer.sent)
|
|
|
prev, module._does_info = module._does_info, False
|
|
|
try:
|
|
|
tw.on_failure(einfo)
|
|
@@ -278,33 +278,33 @@ class test_TaskRequest(Case):
|
|
|
tw.on_failure(einfo)
|
|
|
|
|
|
def test_compat_properties(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
self.assertEqual(tw.task_id, tw.id)
|
|
|
self.assertEqual(tw.task_name, tw.name)
|
|
|
- tw.task_id = "ID"
|
|
|
- self.assertEqual(tw.id, "ID")
|
|
|
- tw.task_name = "NAME"
|
|
|
- self.assertEqual(tw.name, "NAME")
|
|
|
+ tw.task_id = 'ID'
|
|
|
+ self.assertEqual(tw.id, 'ID')
|
|
|
+ tw.task_name = 'NAME'
|
|
|
+ self.assertEqual(tw.name, 'NAME')
|
|
|
|
|
|
def test_terminate__task_started(self):
|
|
|
pool = Mock()
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = time.time()
|
|
|
tw.worker_pid = 313
|
|
|
- tw.terminate(pool, signal="KILL")
|
|
|
- pool.terminate_job.assert_called_with(tw.worker_pid, "KILL")
|
|
|
+ tw.terminate(pool, signal='KILL')
|
|
|
+ pool.terminate_job.assert_called_with(tw.worker_pid, 'KILL')
|
|
|
|
|
|
def test_terminate__task_reserved(self):
|
|
|
pool = Mock()
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = None
|
|
|
- tw.terminate(pool, signal="KILL")
|
|
|
+ tw.terminate(pool, signal='KILL')
|
|
|
self.assertFalse(pool.terminate_job.call_count)
|
|
|
- self.assertTupleEqual(tw._terminate_on_ack, (True, pool, "KILL"))
|
|
|
- tw.terminate(pool, signal="KILL")
|
|
|
+ self.assertTupleEqual(tw._terminate_on_ack, (True, pool, 'KILL'))
|
|
|
+ tw.terminate(pool, signal='KILL')
|
|
|
|
|
|
def test_revoked_expires_expired(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
|
|
|
expires=datetime.utcnow() - timedelta(days=1))
|
|
|
tw.revoked()
|
|
|
self.assertIn(tw.id, revoked)
|
|
@@ -312,7 +312,7 @@ class test_TaskRequest(Case):
|
|
|
states.REVOKED)
|
|
|
|
|
|
def test_revoked_expires_not_expired(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
|
|
|
expires=datetime.utcnow() + timedelta(days=1))
|
|
|
tw.revoked()
|
|
|
self.assertNotIn(tw.id, revoked)
|
|
@@ -321,7 +321,7 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def test_revoked_expires_ignore_result(self):
|
|
|
mytask.ignore_result = True
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'},
|
|
|
expires=datetime.utcnow() - timedelta(days=1))
|
|
|
try:
|
|
|
tw.revoked()
|
|
@@ -343,14 +343,14 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def get_ei():
|
|
|
try:
|
|
|
- raise KeyError("moofoobar")
|
|
|
+ raise KeyError('moofoobar')
|
|
|
except:
|
|
|
return ExceptionInfo()
|
|
|
|
|
|
app.mail_admins = mock_mail_admins
|
|
|
mytask.send_error_emails = True
|
|
|
try:
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
|
|
|
einfo = get_ei()
|
|
|
tw.on_failure(einfo)
|
|
@@ -382,19 +382,19 @@ class test_TaskRequest(Case):
|
|
|
mytask.error_whitelist = ()
|
|
|
|
|
|
def test_already_revoked(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw._already_revoked = True
|
|
|
self.assertTrue(tw.revoked())
|
|
|
|
|
|
def test_revoked(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
revoked.add(tw.id)
|
|
|
self.assertTrue(tw.revoked())
|
|
|
self.assertTrue(tw._already_revoked)
|
|
|
self.assertTrue(tw.acknowledged)
|
|
|
|
|
|
def test_execute_does_not_execute_revoked(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
revoked.add(tw.id)
|
|
|
tw.execute()
|
|
|
|
|
@@ -410,12 +410,12 @@ class test_TaskRequest(Case):
|
|
|
mytask_raising.acks_late = False
|
|
|
|
|
|
def test_execute_using_pool_does_not_execute_revoked(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
revoked.add(tw.id)
|
|
|
tw.execute_using_pool(None)
|
|
|
|
|
|
def test_on_accepted_acks_early(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
|
|
|
self.assertTrue(tw.acknowledged)
|
|
|
prev, module._does_debug = module._does_debug, False
|
|
@@ -425,7 +425,7 @@ class test_TaskRequest(Case):
|
|
|
module._does_debug = prev
|
|
|
|
|
|
def test_on_accepted_acks_late(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
mytask.acks_late = True
|
|
|
try:
|
|
|
tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
|
|
@@ -434,15 +434,15 @@ class test_TaskRequest(Case):
|
|
|
mytask.acks_late = False
|
|
|
|
|
|
def test_on_accepted_terminates(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
pool = Mock()
|
|
|
- tw.terminate(pool, signal="KILL")
|
|
|
+ tw.terminate(pool, signal='KILL')
|
|
|
self.assertFalse(pool.terminate_job.call_count)
|
|
|
tw.on_accepted(pid=314, time_accepted=time.time())
|
|
|
- pool.terminate_job.assert_called_with(314, "KILL")
|
|
|
+ pool.terminate_job.assert_called_with(314, 'KILL')
|
|
|
|
|
|
def test_on_success_acks_early(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = 1
|
|
|
tw.on_success(42)
|
|
|
prev, module._does_info = module._does_info, False
|
|
@@ -453,7 +453,7 @@ class test_TaskRequest(Case):
|
|
|
module._does_info = prev
|
|
|
|
|
|
def test_on_success_BaseException(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = 1
|
|
|
with self.assertRaises(SystemExit):
|
|
|
try:
|
|
@@ -464,7 +464,7 @@ class test_TaskRequest(Case):
|
|
|
assert False
|
|
|
|
|
|
def test_on_success_eventer(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = 1
|
|
|
tw.eventer = Mock()
|
|
|
tw.send_event = Mock()
|
|
@@ -472,17 +472,17 @@ class test_TaskRequest(Case):
|
|
|
self.assertTrue(tw.send_event.called)
|
|
|
|
|
|
def test_on_success_when_failure(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = 1
|
|
|
tw.on_failure = Mock()
|
|
|
try:
|
|
|
- raise KeyError("foo")
|
|
|
+ raise KeyError('foo')
|
|
|
except Exception:
|
|
|
tw.on_success(ExceptionInfo())
|
|
|
self.assertTrue(tw.on_failure.called)
|
|
|
|
|
|
def test_on_success_acks_late(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = 1
|
|
|
mytask.acks_late = True
|
|
|
try:
|
|
@@ -495,11 +495,11 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def get_ei():
|
|
|
try:
|
|
|
- raise WorkerLostError("do re mi")
|
|
|
+ raise WorkerLostError('do re mi')
|
|
|
except WorkerLostError:
|
|
|
return ExceptionInfo()
|
|
|
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
exc_info = get_ei()
|
|
|
tw.on_failure(exc_info)
|
|
|
self.assertEqual(mytask.backend.get_status(tw.id),
|
|
@@ -508,7 +508,7 @@ class test_TaskRequest(Case):
|
|
|
mytask.ignore_result = True
|
|
|
try:
|
|
|
exc_info = get_ei()
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.on_failure(exc_info)
|
|
|
self.assertEqual(mytask.backend.get_status(tw.id),
|
|
|
states.PENDING)
|
|
@@ -516,12 +516,12 @@ class test_TaskRequest(Case):
|
|
|
mytask.ignore_result = False
|
|
|
|
|
|
def test_on_failure_acks_late(self):
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.time_start = 1
|
|
|
mytask.acks_late = True
|
|
|
try:
|
|
|
try:
|
|
|
- raise KeyError("foo")
|
|
|
+ raise KeyError('foo')
|
|
|
except KeyError:
|
|
|
exc_info = ExceptionInfo()
|
|
|
tw.on_failure(exc_info)
|
|
@@ -530,25 +530,25 @@ class test_TaskRequest(Case):
|
|
|
mytask.acks_late = False
|
|
|
|
|
|
def test_from_message_invalid_kwargs(self):
|
|
|
- body = dict(task=mytask.name, id=1, args=(), kwargs="foo")
|
|
|
+ body = dict(task=mytask.name, id=1, args=(), kwargs='foo')
|
|
|
with self.assertRaises(InvalidTaskError):
|
|
|
TaskRequest.from_message(None, body)
|
|
|
|
|
|
- @patch("celery.worker.job.error")
|
|
|
- @patch("celery.worker.job.warn")
|
|
|
+ @patch('celery.worker.job.error')
|
|
|
+ @patch('celery.worker.job.warn')
|
|
|
def test_on_timeout(self, warn, error):
|
|
|
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.on_timeout(soft=True, timeout=1337)
|
|
|
- self.assertIn("Soft time limit", warn.call_args[0][0])
|
|
|
+ self.assertIn('Soft time limit', warn.call_args[0][0])
|
|
|
tw.on_timeout(soft=False, timeout=1337)
|
|
|
- self.assertIn("Hard time limit", error.call_args[0][0])
|
|
|
+ self.assertIn('Hard time limit', error.call_args[0][0])
|
|
|
self.assertEqual(mytask.backend.get_status(tw.id),
|
|
|
states.FAILURE)
|
|
|
|
|
|
mytask.ignore_result = True
|
|
|
try:
|
|
|
- tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, uuid(), [1], {'f': 'x'})
|
|
|
tw.on_timeout(soft=True, timeout=1336)
|
|
|
self.assertEqual(mytask.backend.get_status(tw.id),
|
|
|
states.PENDING)
|
|
@@ -557,18 +557,18 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def test_trace_task_ret(self):
|
|
|
mytask.__trace__ = build_tracer(mytask.name, mytask,
|
|
|
- current_app.loader, "test")
|
|
|
+ current_app.loader, 'test')
|
|
|
res = trace_task_ret(mytask.name, uuid(), [4], {})
|
|
|
self.assertEqual(res, 4 ** 4)
|
|
|
|
|
|
def test_execute_safe_catches_exception(self):
|
|
|
|
|
|
def _error_exec(self, *args, **kwargs):
|
|
|
- raise KeyError("baz")
|
|
|
+ raise KeyError('baz')
|
|
|
|
|
|
@task_dec(request=None)
|
|
|
def raising():
|
|
|
- raise KeyError("baz")
|
|
|
+ raise KeyError('baz')
|
|
|
|
|
|
with self.assertWarnsRegex(RuntimeWarning,
|
|
|
r'Exception raised outside'):
|
|
@@ -578,9 +578,9 @@ class test_TaskRequest(Case):
|
|
|
def test_worker_task_trace_handle_retry(self):
|
|
|
from celery.exceptions import RetryTaskError
|
|
|
tid = uuid()
|
|
|
- mytask.request.update({"id": tid})
|
|
|
+ mytask.request.update({'id': tid})
|
|
|
try:
|
|
|
- raise ValueError("foo")
|
|
|
+ raise ValueError('foo')
|
|
|
except Exception, exc:
|
|
|
try:
|
|
|
raise RetryTaskError(str(exc), exc=exc)
|
|
@@ -597,10 +597,10 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def test_worker_task_trace_handle_failure(self):
|
|
|
tid = uuid()
|
|
|
- mytask.request.update({"id": tid})
|
|
|
+ mytask.request.update({'id': tid})
|
|
|
try:
|
|
|
try:
|
|
|
- raise ValueError("foo")
|
|
|
+ raise ValueError('foo')
|
|
|
except Exception, exc:
|
|
|
w = TraceInfo(states.FAILURE, exc)
|
|
|
w.handle_failure(mytask, store_errors=False)
|
|
@@ -614,39 +614,39 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def test_task_wrapper_mail_attrs(self):
|
|
|
tw = TaskRequest(mytask.name, uuid(), [], {})
|
|
|
- x = tw.success_msg % {"name": tw.name,
|
|
|
- "id": tw.id,
|
|
|
- "return_value": 10,
|
|
|
- "runtime": 0.3641}
|
|
|
+ x = tw.success_msg % {'name': tw.name,
|
|
|
+ 'id': tw.id,
|
|
|
+ 'return_value': 10,
|
|
|
+ 'runtime': 0.3641}
|
|
|
self.assertTrue(x)
|
|
|
- x = tw.error_msg % {"name": tw.name,
|
|
|
- "id": tw.id,
|
|
|
- "exc": "FOOBARBAZ",
|
|
|
- "traceback": "foobarbaz"}
|
|
|
+ x = tw.error_msg % {'name': tw.name,
|
|
|
+ 'id': tw.id,
|
|
|
+ 'exc': 'FOOBARBAZ',
|
|
|
+ 'traceback': 'foobarbaz'}
|
|
|
self.assertTrue(x)
|
|
|
|
|
|
def test_from_message(self):
|
|
|
- us = u"æØåveéðƒeæ"
|
|
|
- body = {"task": mytask.name, "id": uuid(),
|
|
|
- "args": [2], "kwargs": {us: "bar"}}
|
|
|
- m = Message(None, body=anyjson.dumps(body), backend="foo",
|
|
|
- content_type="application/json",
|
|
|
- content_encoding="utf-8")
|
|
|
+ us = u'æØåveéðƒeæ'
|
|
|
+ body = {'task': mytask.name, 'id': uuid(),
|
|
|
+ 'args': [2], 'kwargs': {us: 'bar'}}
|
|
|
+ m = Message(None, body=anyjson.dumps(body), backend='foo',
|
|
|
+ content_type='application/json',
|
|
|
+ content_encoding='utf-8')
|
|
|
tw = TaskRequest.from_message(m, m.decode())
|
|
|
self.assertIsInstance(tw, Request)
|
|
|
- self.assertEqual(tw.name, body["task"])
|
|
|
- self.assertEqual(tw.id, body["id"])
|
|
|
- self.assertEqual(tw.args, body["args"])
|
|
|
+ self.assertEqual(tw.name, body['task'])
|
|
|
+ self.assertEqual(tw.id, body['id'])
|
|
|
+ self.assertEqual(tw.args, body['args'])
|
|
|
us = from_utf8(us)
|
|
|
if sys.version_info < (2, 6):
|
|
|
self.assertEqual(tw.kwargs.keys()[0], us)
|
|
|
self.assertIsInstance(tw.kwargs.keys()[0], str)
|
|
|
|
|
|
def test_from_message_empty_args(self):
|
|
|
- body = {"task": mytask.name, "id": uuid()}
|
|
|
- m = Message(None, body=anyjson.dumps(body), backend="foo",
|
|
|
- content_type="application/json",
|
|
|
- content_encoding="utf-8")
|
|
|
+ body = {'task': mytask.name, 'id': uuid()}
|
|
|
+ m = Message(None, body=anyjson.dumps(body), backend='foo',
|
|
|
+ content_type='application/json',
|
|
|
+ content_encoding='utf-8')
|
|
|
tw = TaskRequest.from_message(m, m.decode())
|
|
|
self.assertIsInstance(tw, Request)
|
|
|
self.assertEquals(tw.args, [])
|
|
@@ -654,67 +654,67 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def test_from_message_missing_required_fields(self):
|
|
|
body = {}
|
|
|
- m = Message(None, body=anyjson.dumps(body), backend="foo",
|
|
|
- content_type="application/json",
|
|
|
- content_encoding="utf-8")
|
|
|
+ m = Message(None, body=anyjson.dumps(body), backend='foo',
|
|
|
+ content_type='application/json',
|
|
|
+ content_encoding='utf-8')
|
|
|
with self.assertRaises(KeyError):
|
|
|
TaskRequest.from_message(m, m.decode())
|
|
|
|
|
|
def test_from_message_nonexistant_task(self):
|
|
|
- body = {"task": "cu.mytask.doesnotexist", "id": uuid(),
|
|
|
- "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
|
|
|
- m = Message(None, body=anyjson.dumps(body), backend="foo",
|
|
|
- content_type="application/json",
|
|
|
- content_encoding="utf-8")
|
|
|
+ body = {'task': 'cu.mytask.doesnotexist', 'id': uuid(),
|
|
|
+ 'args': [2], 'kwargs': {u'æØåveéðƒeæ': 'bar'}}
|
|
|
+ m = Message(None, body=anyjson.dumps(body), backend='foo',
|
|
|
+ content_type='application/json',
|
|
|
+ content_encoding='utf-8')
|
|
|
with self.assertRaises(KeyError):
|
|
|
TaskRequest.from_message(m, m.decode())
|
|
|
|
|
|
def test_execute(self):
|
|
|
tid = uuid()
|
|
|
- tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
meta = mytask.backend.get_task_meta(tid)
|
|
|
- self.assertEqual(meta["result"], 256)
|
|
|
- self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
+ self.assertEqual(meta['result'], 256)
|
|
|
+ self.assertEqual(meta['status'], states.SUCCESS)
|
|
|
|
|
|
def test_execute_success_no_kwargs(self):
|
|
|
tid = uuid()
|
|
|
tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
meta = mytask_no_kwargs.backend.get_task_meta(tid)
|
|
|
- self.assertEqual(meta["result"], 256)
|
|
|
- self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
+ self.assertEqual(meta['result'], 256)
|
|
|
+ self.assertEqual(meta['status'], states.SUCCESS)
|
|
|
|
|
|
def test_execute_success_some_kwargs(self):
|
|
|
tid = uuid()
|
|
|
tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
meta = mytask_some_kwargs.backend.get_task_meta(tid)
|
|
|
- self.assertEqual(some_kwargs_scratchpad.get("task_id"), tid)
|
|
|
- self.assertEqual(meta["result"], 256)
|
|
|
- self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
+ self.assertEqual(some_kwargs_scratchpad.get('task_id'), tid)
|
|
|
+ self.assertEqual(meta['result'], 256)
|
|
|
+ self.assertEqual(meta['status'], states.SUCCESS)
|
|
|
|
|
|
def test_execute_ack(self):
|
|
|
tid = uuid()
|
|
|
- tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
|
|
|
+ tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'},
|
|
|
on_ack=on_ack)
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
meta = mytask.backend.get_task_meta(tid)
|
|
|
- self.assertTrue(scratch["ACK"])
|
|
|
- self.assertEqual(meta["result"], 256)
|
|
|
- self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
+ self.assertTrue(scratch['ACK'])
|
|
|
+ self.assertEqual(meta['result'], 256)
|
|
|
+ self.assertEqual(meta['status'], states.SUCCESS)
|
|
|
|
|
|
def test_execute_fail(self):
|
|
|
tid = uuid()
|
|
|
tw = TaskRequest(mytask_raising.name, tid, [4])
|
|
|
self.assertIsInstance(tw.execute(), ExceptionInfo)
|
|
|
meta = mytask_raising.backend.get_task_meta(tid)
|
|
|
- self.assertEqual(meta["status"], states.FAILURE)
|
|
|
- self.assertIsInstance(meta["result"], KeyError)
|
|
|
+ self.assertEqual(meta['status'], states.FAILURE)
|
|
|
+ self.assertIsInstance(meta['result'], KeyError)
|
|
|
|
|
|
def test_execute_using_pool(self):
|
|
|
tid = uuid()
|
|
|
- tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
|
|
|
|
|
|
class MockPool(BasePool):
|
|
|
target = None
|
|
@@ -736,7 +736,7 @@ class test_TaskRequest(Case):
|
|
|
self.assertEqual(p.args[0], mytask.name)
|
|
|
self.assertEqual(p.args[1], tid)
|
|
|
self.assertEqual(p.args[2], [4])
|
|
|
- self.assertIn("f", p.args[3])
|
|
|
+ self.assertIn('f', p.args[3])
|
|
|
self.assertIn([4], p.args)
|
|
|
|
|
|
tw.task.accept_magic_kwargs = False
|
|
@@ -744,23 +744,23 @@ class test_TaskRequest(Case):
|
|
|
|
|
|
def test_default_kwargs(self):
|
|
|
tid = uuid()
|
|
|
- tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
|
|
|
self.assertDictEqual(
|
|
|
tw.extend_with_default_kwargs(), {
|
|
|
- "f": "x",
|
|
|
- "logfile": None,
|
|
|
- "loglevel": None,
|
|
|
- "task_id": tw.id,
|
|
|
- "task_retries": 0,
|
|
|
- "task_is_eager": False,
|
|
|
- "delivery_info": {"exchange": None, "routing_key": None},
|
|
|
- "task_name": tw.name})
|
|
|
-
|
|
|
- @patch("celery.worker.job.logger")
|
|
|
+ 'f': 'x',
|
|
|
+ 'logfile': None,
|
|
|
+ 'loglevel': None,
|
|
|
+ 'task_id': tw.id,
|
|
|
+ 'task_retries': 0,
|
|
|
+ 'task_is_eager': False,
|
|
|
+ 'delivery_info': {'exchange': None, 'routing_key': None},
|
|
|
+ 'task_name': tw.name})
|
|
|
+
|
|
|
+ @patch('celery.worker.job.logger')
|
|
|
def _test_on_failure(self, exception, logger):
|
|
|
app = app_or_default()
|
|
|
tid = uuid()
|
|
|
- tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
|
|
|
+ tw = TaskRequest(mytask.name, tid, [4], {'f': 'x'})
|
|
|
try:
|
|
|
raise exception
|
|
|
except Exception:
|
|
@@ -770,17 +770,17 @@ class test_TaskRequest(Case):
|
|
|
tw.on_failure(exc_info)
|
|
|
self.assertTrue(logger.log.called)
|
|
|
context = logger.log.call_args[0][2]
|
|
|
- self.assertEqual(mytask.name, context["name"])
|
|
|
- self.assertIn(tid, context["id"])
|
|
|
+ self.assertEqual(mytask.name, context['name'])
|
|
|
+ self.assertIn(tid, context['id'])
|
|
|
finally:
|
|
|
app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
|
|
|
|
|
|
def test_on_failure(self):
|
|
|
- self._test_on_failure(Exception("Inside unit tests"))
|
|
|
+ self._test_on_failure(Exception('Inside unit tests'))
|
|
|
|
|
|
def test_on_failure_unicode_exception(self):
|
|
|
- self._test_on_failure(Exception(u"Бобры атакуют"))
|
|
|
+ self._test_on_failure(Exception(u'Бобры атакуют'))
|
|
|
|
|
|
def test_on_failure_utf8_exception(self):
|
|
|
self._test_on_failure(Exception(
|
|
|
- from_utf8(u"Бобры атакуют")))
|
|
|
+ from_utf8(u'Бобры атакуют')))
|