瀏覽代碼

Added test for Task.AsyncResult

Ask Solem 15 年之前
父節點
當前提交
ca0f8c210c
共有 1 個文件被更改,包括 7 次插入0 次删除
  1. 7 0
      celery/tests/test_task.py

+ 7 - 0
celery/tests/test_task.py

@@ -4,6 +4,7 @@ from datetime import datetime, timedelta
 
 
 from celery import task
 from celery import task
 from celery import messaging
 from celery import messaging
+from celery.utils import gen_unique_id
 from celery.result import EagerResult
 from celery.result import EagerResult
 from celery.execute import send_task
 from celery.execute import send_task
 from celery.backends import default_backend
 from celery.backends import default_backend
@@ -202,6 +203,12 @@ class TestCeleryTasks(unittest.TestCase):
         cls.run = return_True
         cls.run = return_True
         return cls
         return cls
 
 
+    def test_AsyncResult(self):
+        task_id = gen_unique_id()
+        result = RetryTask.AsyncResult(task_id)
+        self.assertEqual(result.backend, RetryTask.backend)
+        self.assertEqual(result.task_id, task_id)
+
     def test_ping(self):
     def test_ping(self):
         from celery import conf
         from celery import conf
         conf.ALWAYS_EAGER = True
         conf.ALWAYS_EAGER = True