Ver Fonte

Tests for all of celery.views

Ask Solem há 15 anos atrás
pai
commit
3e57f99bcb
5 ficheiros alterados com 132 adições e 57 exclusões
  1. 24 23
      celery/datastructures.py
  2. 89 12
      celery/tests/test_views.py
  3. 14 19
      celery/views.py
  4. 0 2
      testproj/settings.py
  5. 5 1
      testproj/urls.py

+ 24 - 23
celery/datastructures.py

@@ -3,6 +3,7 @@
 Custom Datastructures
 
 """
+import sys
 import time
 import traceback
 from UserList import UserList
@@ -79,29 +80,6 @@ class ExceptionInfo(object):
                 str(self.exception))
 
 
-def consume_queue(queue):
-    """Iterator yielding all immediately available items in a
-    :class:`Queue.Queue`.
-
-    The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
-
-    Example
-
-        >>> q = Queue()
-        >>> map(q.put, range(4))
-        >>> list(consume_queue(q))
-        [0, 1, 2, 3]
-        >>> list(consume_queue(q))
-        []
-
-    """
-    while 1:
-        try:
-            yield queue.get_nowait()
-        except QueueEmpty:
-            break
-
-
 class SharedCounter(object):
     """Thread-safe counter.
 
@@ -221,3 +199,26 @@ class LimitedSet(object):
     def first(self):
         """Get the oldest member."""
         return self.chronologically[0]
+
+
+def consume_queue(queue):
+    """Iterator yielding all immediately available items in a
+    :class:`Queue.Queue`.
+
+    The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
+
+    Example
+
+        >>> q = Queue()
+        >>> map(q.put, range(4))
+        >>> list(consume_queue(q))
+        [0, 1, 2, 3]
+        >>> list(consume_queue(q))
+        []
+
+    """
+    while 1:
+        try:
+            yield queue.get_nowait()
+        except QueueEmpty:
+            break

+ 89 - 12
celery/tests/test_views.py

@@ -1,3 +1,4 @@
+import sys
 import unittest
 
 from django.test.client import Client
@@ -8,33 +9,109 @@ from django.http import HttpResponse
 from anyjson import deserialize as JSON_load
 from billiard.utils.functional import curry
 
-from celery.utils import gen_unique_id
+from celery import conf
+from celery.utils import gen_unique_id, get_full_cls_name
 from celery.backends import default_backend
+from celery.exceptions import RetryTaskError
+from celery.decorators import task
+from celery.datastructures import ExceptionInfo
 
 def reversestar(name, **kwargs):
     return reverse(name, kwargs=kwargs)
 
-is_successful = curry(reversestar, "celery-is_task_successful")
+task_is_successful = curry(reversestar, "celery-is_task_successful")
+task_status = curry(reversestar, "celery-task_status")
+task_apply = curry(reverse, "celery-apply")
+
+scratch = {}
+@task()
+def mytask(x, y):
+    ret = scratch["result"] = int(x) * int(y)
+    return ret
+
+
+def create_exception(name, base=Exception):
+    return type(name, (base, ), {})
+
+
+def catch_exception(exception):
+    try:
+        raise exception
+    except exception.__class__, exc:
+        exc = default_backend.prepare_exception(exc)
+        return exc, ExceptionInfo(sys.exc_info()).traceback
 
 
 class ViewTestCase(DjangoTestCase):
 
     def assertJSONEquals(self, json, py):
         json = isinstance(json, HttpResponse) and json.content or json
-        self.assertEquals(JSON_load(json), py)
+        try:
+            self.assertEquals(JSON_load(json), py)
+        except TypeError, exc:
+            raise TypeError("%s: %s" % (exc, json))
+
+
+class TestTaskApply(ViewTestCase):
+
+    def test_apply(self):
+        conf.ALWAYS_EAGER = True
+        try:
+            ret = self.client.get(task_apply(kwargs={"task_name": mytask.name,
+                                                     "args": "4/4"}))
+            self.assertEquals(scratch["result"], 16)
+        finally:
+            conf.ALWAYS_EAGER = False
+
+
+class TestTaskStatus(ViewTestCase):
+
+    def assertStatusForIs(self, status, res, traceback=None):
+        uuid = gen_unique_id()
+        default_backend.store_result(uuid, res, status,
+                                     traceback=traceback)
+        json = self.client.get(task_status(task_id=uuid))
+        expect = dict(id=uuid, status=status, result=res)
+        if status in default_backend.EXCEPTION_STATES:
+            instore = default_backend.get_result(uuid)
+            self.assertEquals(str(instore.args), str(res.args))
+            expect["result"] = str(res.args[0])
+            expect["exc"] = get_full_cls_name(res.__class__)
+            expect["traceback"] = traceback
+
+        self.assertJSONEquals(json, dict(task=expect))
+
+    def test_task_status_success(self):
+        self.assertStatusForIs("SUCCESS", "The quick brown fox")
+
+    def test_task_status_failure(self):
+        exc, tb = catch_exception(KeyError("foo"))
+        self.assertStatusForIs("FAILURE", exc, tb)
+
+    def test_task_status_retry(self):
+        oexc, _ = catch_exception(KeyError("Resource not available"))
+        exc, tb = catch_exception(RetryTaskError(str(oexc), oexc))
+        self.assertStatusForIs("RETRY", exc, tb)
 
 
 class TestTaskIsSuccessful(ViewTestCase):
 
-    def test_is_successful_success(self):
+    def assertStatusForIs(self, status, outcome):
         uuid = gen_unique_id()
-        default_backend.mark_as_done(uuid, "Quick")
-        json = self.client.get(is_successful(task_id=uuid))
+        result = gen_unique_id()
+        default_backend.store_result(uuid, result, status)
+        json = self.client.get(task_is_successful(task_id=uuid))
         self.assertJSONEquals(json, {"task": {"id": uuid,
-                                              "executed": True}})
+                                              "executed": outcome}})
+
+    def test_is_successful_success(self):
+        self.assertStatusForIs("SUCCESS", True)
+
     def test_is_successful_pending(self):
-        uuid = gen_unique_id()
-        default_backend.store_result(uuid, "Quick", "PENDING")
-        json = self.client.get(is_successful(task_id=uuid))
-        self.assertJSONEquals(json, {"task": {"id": uuid,
-                                             "executed": False}})
+        self.assertStatusForIs("PENDING", False)
+ 
+    def test_is_successful_failure(self):
+        self.assertStatusForIs("FAILURE", False)
+
+    def test_is_successful_retry(self):
+        self.assertStatusForIs("RETRY", False)

+ 14 - 19
celery/views.py

@@ -3,12 +3,13 @@ from django.http import HttpResponse, Http404
 
 from anyjson import serialize as JSON_dump
 
+from celery.utils import get_full_cls_name
 from celery.result import AsyncResult
-from celery.execute import apply_async
 from celery.registry import tasks
+from celery.backends import default_backend
 
 
-def apply(request, task_name, *args):
+def apply(request, task_name, args):
     """View applying a task.
 
     Example:
@@ -18,6 +19,7 @@ def apply(request, task_name, *args):
     without ensuring your code is safe!
 
     """
+    args = args.split("/")
     kwargs = request.method == "POST" and \
             request.POST.copy() or request.GET.copy()
     kwargs = dict((key.encode("utf-8"), value)
@@ -26,7 +28,7 @@ def apply(request, task_name, *args):
         raise Http404("apply: no such task")
 
     task = tasks[task_name]
-    result = apply_async(task, args=args, kwargs=kwargs)
+    result = task.apply_async(args=args, kwargs=kwargs)
     response_data = {"ok": "true", "task_id": result.task_id}
     return HttpResponse(JSON_dump(response_data), mimetype="application/json")
 
@@ -41,21 +43,14 @@ is_task_done = is_task_successful # Backward compatible
 
 def task_status(request, task_id):
     """Returns task status and result in JSON format."""
-    async_result = AsyncResult(task_id)
-    status = async_result.status
-    result = async_result.result
-    if status in ("FAILURE", "RETRY"):
-        response_data = {
-            "id": task_id,
-            "status": status,
-            "result": result.args[0],
-            "traceback": result.traceback,
-        }
-    else:
-        response_data = {
-            "id": task_id,
-            "status": status,
-            "result": result,
-        }
+    status = default_backend.get_status(task_id)
+    res = default_backend.get_result(task_id)
+    response_data = dict(id=task_id, status=status, result=res)
+    if status in default_backend.EXCEPTION_STATES:
+        traceback = default_backend.get_traceback(task_id)
+        response_data.update({"result": str(res.args[0]),
+                              "exc": get_full_cls_name(res.__class__),
+                              "traceback": traceback})
+
     return HttpResponse(JSON_dump({"task": response_data}),
             mimetype="application/json")

+ 0 - 2
testproj/settings.py

@@ -25,8 +25,6 @@ COVERAGE_EXCLUDE_MODULES = ("celery.tests.*",
                             "celery.contrib.*",
                             "celery.bin.*",
                             "celery.utils.patch",
-                            "celery.urls",
-                            "celery.views",
                             "celery.task.strategy")
 COVERAGE_HTML_REPORT = True
 COVERAGE_BRANCH_COVERAGE = True

+ 5 - 1
testproj/urls.py

@@ -1,4 +1,5 @@
-from django.conf.urls.defaults import patterns, url, include
+from django.conf.urls.defaults import patterns, url, include, handler500
+from celery.views import apply
 
 # Uncomment the next two lines to enable the admin:
 # from django.contrib import admin
@@ -14,5 +15,8 @@ urlpatterns = patterns('',
 
     # Uncomment the next line to enable the admin:
     # (r'^admin/(.*)', admin.site.root),
+    url(r"^apply/(?P<task_name>.+?)/(?P<args>.+)", apply,
+        name="celery-apply"),
     url(r"^celery/", include("celery.urls")),
+
 )