|
@@ -12,6 +12,7 @@ from carrot.backends.base import BaseMessage
|
|
|
from StringIO import StringIO
|
|
|
from celery.log import setup_logger
|
|
|
from django.core import cache
|
|
|
+from celery.decorators import task as task_dec
|
|
|
import simplejson
|
|
|
import logging
|
|
|
|
|
@@ -19,35 +20,37 @@ scratch = {"ACK": False}
|
|
|
some_kwargs_scratchpad = {}
|
|
|
|
|
|
|
|
|
-def jail(task_id, task_name, fun, args, kwargs):
|
|
|
- return ExecuteWrapper(fun, task_id, task_name, args, kwargs)()
|
|
|
+def jail(task_id, task_name, args, kwargs):
|
|
|
+ return ExecuteWrapper(task_name, task_id, args, kwargs)()
|
|
|
|
|
|
|
|
|
def on_ack():
|
|
|
scratch["ACK"] = True
|
|
|
|
|
|
|
|
|
+@task_dec()
|
|
|
def mytask(i, **kwargs):
|
|
|
return i ** i
|
|
|
-tasks.register(mytask, name="cu.mytask")
|
|
|
|
|
|
|
|
|
+@task_dec()
|
|
|
def mytask_no_kwargs(i):
|
|
|
return i ** i
|
|
|
-tasks.register(mytask_no_kwargs, name="mytask_no_kwargs")
|
|
|
|
|
|
|
|
|
+
|
|
|
+@task_dec()
|
|
|
def mytask_some_kwargs(i, logfile):
|
|
|
some_kwargs_scratchpad["logfile"] = logfile
|
|
|
return i ** i
|
|
|
-tasks.register(mytask_some_kwargs, name="mytask_some_kwargs")
|
|
|
|
|
|
|
|
|
+@task_dec()
|
|
|
def mytask_raising(i, **kwargs):
|
|
|
raise KeyError(i)
|
|
|
-tasks.register(mytask_raising, name="cu.mytask-raising")
|
|
|
|
|
|
|
|
|
+@task_dec()
|
|
|
def get_db_connection(i, **kwargs):
|
|
|
from django.db import connection
|
|
|
return id(connection)
|
|
@@ -57,11 +60,12 @@ get_db_connection.ignore_result = True
|
|
|
class TestJail(unittest.TestCase):
|
|
|
|
|
|
def test_execute_jail_success(self):
|
|
|
- ret = jail(gen_unique_id(), gen_unique_id(), mytask, [2], {})
|
|
|
+ ret = jail(gen_unique_id(), mytask.name, [2], {})
|
|
|
self.assertEquals(ret, 4)
|
|
|
|
|
|
def test_execute_jail_failure(self):
|
|
|
- ret = jail(gen_unique_id(), gen_unique_id(), mytask_raising, [4], {})
|
|
|
+ ret = jail(gen_unique_id(), mytask_raising.name,
|
|
|
+ [4], {})
|
|
|
self.assertTrue(isinstance(ret, ExceptionInfo))
|
|
|
self.assertEquals(ret.exception.args, (4, ))
|
|
|
|
|
@@ -76,8 +80,8 @@ class TestJail(unittest.TestCase):
|
|
|
|
|
|
connection.close = monkeypatched_connection_close
|
|
|
|
|
|
- ret = jail(gen_unique_id(), gen_unique_id(),
|
|
|
- get_db_connection, [2], {})
|
|
|
+ ret = jail(gen_unique_id(),
|
|
|
+ get_db_connection.name, [2], {})
|
|
|
self.assertTrue(connection._was_closed)
|
|
|
|
|
|
connection.close = old_connection_close
|
|
@@ -96,7 +100,7 @@ class TestJail(unittest.TestCase):
|
|
|
|
|
|
cache.cache.close = monkeypatched_cache_close
|
|
|
|
|
|
- jail(gen_unique_id(), gen_unique_id(), mytask, [4], {})
|
|
|
+ jail(gen_unique_id(), mytask.name, [4], {})
|
|
|
self.assertTrue(cache._was_closed)
|
|
|
cache.cache.close = old_cache_close
|
|
|
cache.settings.CACHE_BACKEND = old_backend
|
|
@@ -116,7 +120,7 @@ class TestJail(unittest.TestCase):
|
|
|
|
|
|
cache.cache.close = monkeypatched_cache_close
|
|
|
|
|
|
- jail(gen_unique_id(), gen_unique_id(), mytask, [4], {})
|
|
|
+ jail(gen_unique_id(), mytask.name, [4], {})
|
|
|
self.assertTrue(cache._was_closed)
|
|
|
cache.cache.close = old_cache_close
|
|
|
cache.settings.CACHE_BACKEND = old_backend
|
|
@@ -128,19 +132,12 @@ class TestJail(unittest.TestCase):
|
|
|
|
|
|
class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
- def test_task_wrapper_attrs(self):
|
|
|
- tw = TaskWrapper(gen_unique_id(), gen_unique_id(),
|
|
|
- mytask, [1], {"f": "x"})
|
|
|
- for attr in ("task_name", "task_id", "args", "kwargs", "logger"):
|
|
|
- self.assertTrue(getattr(tw, attr, None))
|
|
|
-
|
|
|
def test_task_wrapper_repr(self):
|
|
|
- tw = TaskWrapper(gen_unique_id(), gen_unique_id(),
|
|
|
- mytask, [1], {"f": "x"})
|
|
|
+ tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
|
|
|
self.assertTrue(repr(tw))
|
|
|
|
|
|
def test_task_wrapper_mail_attrs(self):
|
|
|
- tw = TaskWrapper(gen_unique_id(), gen_unique_id(), mytask, [], {})
|
|
|
+ tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
|
|
|
x = tw.success_msg % {"name": tw.task_name,
|
|
|
"id": tw.task_id,
|
|
|
"return_value": 10}
|
|
@@ -157,7 +154,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
self.assertTrue(x)
|
|
|
|
|
|
def test_from_message(self):
|
|
|
- body = {"task": "cu.mytask", "id": gen_unique_id(),
|
|
|
+ body = {"task": mytask.name, "id": gen_unique_id(),
|
|
|
"args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
|
|
|
m = BaseMessage(body=simplejson.dumps(body), backend="foo",
|
|
|
content_type="application/json",
|
|
@@ -170,7 +167,6 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
self.assertEquals(tw.kwargs.keys()[0],
|
|
|
u"æØåveéðƒeæ".encode("utf-8"))
|
|
|
self.assertFalse(isinstance(tw.kwargs.keys()[0], unicode))
|
|
|
- self.assertEquals(id(mytask), id(tw.task_func))
|
|
|
self.assertTrue(tw.logger)
|
|
|
|
|
|
def test_from_message_nonexistant_task(self):
|
|
@@ -184,7 +180,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_execute(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
|
|
|
+ tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
|
|
|
self.assertEquals(tw.execute(), 256)
|
|
|
meta = TaskMeta.objects.get(task_id=tid)
|
|
|
self.assertEquals(meta.result, 256)
|
|
@@ -192,8 +188,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_execute_success_no_kwargs(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask_no_kwargs", tid, mytask_no_kwargs,
|
|
|
- [4], {})
|
|
|
+ tw = TaskWrapper(mytask_no_kwargs.name, tid, [4], {})
|
|
|
self.assertEquals(tw.execute(), 256)
|
|
|
meta = TaskMeta.objects.get(task_id=tid)
|
|
|
self.assertEquals(meta.result, 256)
|
|
@@ -201,8 +196,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_execute_success_some_kwargs(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask_some_kwargs", tid, mytask_some_kwargs,
|
|
|
- [4], {})
|
|
|
+ tw = TaskWrapper(mytask_some_kwargs.name, tid, [4], {})
|
|
|
self.assertEquals(tw.execute(logfile="foobaz.log"), 256)
|
|
|
meta = TaskMeta.objects.get(task_id=tid)
|
|
|
self.assertEquals(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
|
|
@@ -211,7 +205,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_execute_ack(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"},
|
|
|
+ tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"},
|
|
|
on_ack=on_ack)
|
|
|
self.assertEquals(tw.execute(), 256)
|
|
|
meta = TaskMeta.objects.get(task_id=tid)
|
|
@@ -221,8 +215,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_execute_fail(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask-raising", tid, mytask_raising, [4],
|
|
|
- {"f": "x"})
|
|
|
+ tw = TaskWrapper(mytask_raising.name, tid, [4], {"f": "x"})
|
|
|
self.assertTrue(isinstance(tw.execute(), ExceptionInfo))
|
|
|
meta = TaskMeta.objects.get(task_id=tid)
|
|
|
self.assertEquals(meta.status, "FAILURE")
|
|
@@ -230,7 +223,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_execute_using_pool(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
|
|
|
+ tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
|
|
|
p = TaskPool(2)
|
|
|
p.start()
|
|
|
asyncres = tw.execute_using_pool(p)
|
|
@@ -239,7 +232,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_default_kwargs(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
|
|
|
+ tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
|
|
|
self.assertEquals(tw.extend_with_default_kwargs(10, "some_logfile"), {
|
|
|
"f": "x",
|
|
|
"logfile": "some_logfile",
|
|
@@ -250,7 +243,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
def test_on_failure(self):
|
|
|
tid = gen_unique_id()
|
|
|
- tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
|
|
|
+ tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
|
|
|
try:
|
|
|
raise Exception("Inside unit tests")
|
|
|
except Exception:
|
|
@@ -265,7 +258,7 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
|
|
|
tw.on_failure(exc_info)
|
|
|
logvalue = logfh.getvalue()
|
|
|
- self.assertTrue("cu.mytask" in logvalue)
|
|
|
+ self.assertTrue(mytask.name in logvalue)
|
|
|
self.assertTrue(tid in logvalue)
|
|
|
self.assertTrue("ERROR" in logvalue)
|
|
|
|