|
@@ -5,7 +5,6 @@ import unittest2 as unittest
|
|
|
import simplejson
|
|
|
from StringIO import StringIO
|
|
|
|
|
|
-from django.core import cache
|
|
|
from carrot.backends.base import BaseMessage
|
|
|
|
|
|
from celery import states
|
|
@@ -15,6 +14,7 @@ from celery.utils import gen_unique_id
|
|
|
from celery.result import AsyncResult
|
|
|
from celery.worker.job import WorkerTaskTrace, TaskWrapper
|
|
|
from celery.worker.pool import TaskPool
|
|
|
+from celery.backends import default_backend
|
|
|
from celery.exceptions import RetryTaskError, NotRegistered
|
|
|
from celery.decorators import task as task_dec
|
|
|
from celery.datastructures import ExceptionInfo
|
|
@@ -22,8 +22,6 @@ from celery.datastructures import ExceptionInfo
|
|
|
from celery.tests.utils import execute_context
|
|
|
from celery.tests.compat import catch_warnings
|
|
|
|
|
|
-from djcelery.models import TaskMeta
|
|
|
-
|
|
|
scratch = {"ACK": False}
|
|
|
some_kwargs_scratchpad = {}
|
|
|
|
|
@@ -64,13 +62,6 @@ def mytask_raising(i, **kwargs):
|
|
|
raise KeyError(i)
|
|
|
|
|
|
|
|
|
-@task_dec()
|
|
|
-def get_db_connection(i, **kwargs):
|
|
|
- from django.db import connection
|
|
|
- return id(connection)
|
|
|
-get_db_connection.ignore_result = True
|
|
|
-
|
|
|
-
|
|
|
class TestRetryTaskError(unittest.TestCase):
|
|
|
|
|
|
def test_retry_task_error(self):
|
|
@@ -101,65 +92,6 @@ class TestJail(unittest.TestCase):
|
|
|
self.assertEqual(ret, 256)
|
|
|
self.assertFalse(AsyncResult(task_id).ready())
|
|
|
|
|
|
- def test_django_db_connection_is_closed(self):
|
|
|
- from django.db import connection
|
|
|
- connection._was_closed = False
|
|
|
- old_connection_close = connection.close
|
|
|
-
|
|
|
- def monkeypatched_connection_close(*args, **kwargs):
|
|
|
- connection._was_closed = True
|
|
|
- return old_connection_close(*args, **kwargs)
|
|
|
-
|
|
|
- connection.close = monkeypatched_connection_close
|
|
|
- try:
|
|
|
- jail(gen_unique_id(), get_db_connection.name, [2], {})
|
|
|
- self.assertTrue(connection._was_closed)
|
|
|
- finally:
|
|
|
- connection.close = old_connection_close
|
|
|
-
|
|
|
- def test_django_cache_connection_is_closed(self):
|
|
|
- old_cache_close = getattr(cache.cache, "close", None)
|
|
|
- old_backend = cache.settings.CACHE_BACKEND
|
|
|
- cache.settings.CACHE_BACKEND = "libmemcached"
|
|
|
- cache._was_closed = False
|
|
|
- old_cache_parse_backend = getattr(cache, "parse_backend_uri", None)
|
|
|
- if old_cache_parse_backend: # checks to make sure attr exists
|
|
|
- delattr(cache, 'parse_backend_uri')
|
|
|
-
|
|
|
- def monkeypatched_cache_close(*args, **kwargs):
|
|
|
- cache._was_closed = True
|
|
|
-
|
|
|
- cache.cache.close = monkeypatched_cache_close
|
|
|
-
|
|
|
- jail(gen_unique_id(), mytask.name, [4], {})
|
|
|
- self.assertTrue(cache._was_closed)
|
|
|
- cache.cache.close = old_cache_close
|
|
|
- cache.settings.CACHE_BACKEND = old_backend
|
|
|
- if old_cache_parse_backend:
|
|
|
- cache.parse_backend_uri = old_cache_parse_backend
|
|
|
-
|
|
|
- def test_django_cache_connection_is_closed_django_1_1(self):
|
|
|
- old_cache_close = getattr(cache.cache, "close", None)
|
|
|
- old_backend = cache.settings.CACHE_BACKEND
|
|
|
- cache.settings.CACHE_BACKEND = "libmemcached"
|
|
|
- cache._was_closed = False
|
|
|
- old_cache_parse_backend = getattr(cache, "parse_backend_uri", None)
|
|
|
- cache.parse_backend_uri = lambda uri: ["libmemcached", "1", "2"]
|
|
|
-
|
|
|
- def monkeypatched_cache_close(*args, **kwargs):
|
|
|
- cache._was_closed = True
|
|
|
-
|
|
|
- cache.cache.close = monkeypatched_cache_close
|
|
|
-
|
|
|
- jail(gen_unique_id(), mytask.name, [4], {})
|
|
|
- self.assertTrue(cache._was_closed)
|
|
|
- cache.cache.close = old_cache_close
|
|
|
- cache.settings.CACHE_BACKEND = old_backend
|
|
|
- if old_cache_parse_backend:
|
|
|
- cache.parse_backend_uri = old_cache_parse_backend
|
|
|
- else:
|
|
|
- del(cache.parse_backend_uri)
|
|
|
-
|
|
|
|
|
|
class MockEventDispatcher(object):
|
|
|
|
|
@@ -326,44 +258,44 @@ class TestTaskWrapper(unittest.TestCase):
|
|
|
tid = gen_unique_id()
|
|
|
tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
- meta = TaskMeta.objects.get(task_id=tid)
|
|
|
- self.assertEqual(meta.result, 256)
|
|
|
- self.assertEqual(meta.status, states.SUCCESS)
|
|
|
+ meta = default_backend._get_task_meta_for(tid)
|
|
|
+ self.assertEqual(meta["result"], 256)
|
|
|
+ self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
|
|
|
def test_execute_success_no_kwargs(self):
|
|
|
tid = gen_unique_id()
|
|
|
tw = TaskWrapper(mytask_no_kwargs.name, tid, [4], {})
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
- meta = TaskMeta.objects.get(task_id=tid)
|
|
|
- self.assertEqual(meta.result, 256)
|
|
|
- self.assertEqual(meta.status, states.SUCCESS)
|
|
|
+ meta = default_backend._get_task_meta_for(tid)
|
|
|
+ self.assertEqual(meta["result"], 256)
|
|
|
+ self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
|
|
|
def test_execute_success_some_kwargs(self):
|
|
|
tid = gen_unique_id()
|
|
|
tw = TaskWrapper(mytask_some_kwargs.name, tid, [4], {})
|
|
|
self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
|
|
|
- meta = TaskMeta.objects.get(task_id=tid)
|
|
|
+ meta = default_backend._get_task_meta_for(tid)
|
|
|
self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
|
|
|
- self.assertEqual(meta.result, 256)
|
|
|
- self.assertEqual(meta.status, states.SUCCESS)
|
|
|
+ self.assertEqual(meta["result"], 256)
|
|
|
+ self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
|
|
|
def test_execute_ack(self):
|
|
|
tid = gen_unique_id()
|
|
|
tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"},
|
|
|
on_ack=on_ack)
|
|
|
self.assertEqual(tw.execute(), 256)
|
|
|
- meta = TaskMeta.objects.get(task_id=tid)
|
|
|
+ meta = default_backend._get_task_meta_for(tid)
|
|
|
self.assertTrue(scratch["ACK"])
|
|
|
- self.assertEqual(meta.result, 256)
|
|
|
- self.assertEqual(meta.status, states.SUCCESS)
|
|
|
+ self.assertEqual(meta["result"], 256)
|
|
|
+ self.assertEqual(meta["status"], states.SUCCESS)
|
|
|
|
|
|
def test_execute_fail(self):
|
|
|
tid = gen_unique_id()
|
|
|
tw = TaskWrapper(mytask_raising.name, tid, [4], {"f": "x"})
|
|
|
self.assertIsInstance(tw.execute(), ExceptionInfo)
|
|
|
- meta = TaskMeta.objects.get(task_id=tid)
|
|
|
- self.assertEqual(meta.status, states.FAILURE)
|
|
|
- self.assertIsInstance(meta.result, KeyError)
|
|
|
+ meta = default_backend._get_task_meta_for(tid)
|
|
|
+ self.assertEqual(meta["status"], states.FAILURE)
|
|
|
+ self.assertIsInstance(meta["result"], KeyError)
|
|
|
|
|
|
def test_execute_using_pool(self):
|
|
|
tid = gen_unique_id()
|