Browse Source

91% coverage

Ask Solem 14 years ago
parent
commit
67c54fcbba
3 changed files with 111 additions and 2 deletions
  1. 40 0
      celery/tests/test_task.py
  2. 69 0
      celery/tests/test_timer2.py
  3. 2 2
      celery/utils/timer2.py

+ 40 - 0
celery/tests/test_task.py

@@ -7,6 +7,12 @@ from pyparsing import ParseException
 
 from celery import task
 from celery.app import app_or_default
+from celery import messaging
+from celery.backends import default_backend
+from celery.decorators import task as task_dec
+from celery.exceptions import RetryTaskError
+from celery.execute import send_task
+from celery.result import EagerResult
 from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
@@ -212,6 +218,15 @@ class MockPublisher(object):
 
 class TestCeleryTasks(unittest.TestCase):
 
+    def test_unpickle_task(self):
+        import pickle
+
+        @task_dec
+        def xxx():
+            pass
+
+        self.assertIs(pickle.loads(pickle.dumps(xxx)), xxx)
+
     def createTaskCls(self, cls_name, task_name=None):
         attrs = {"__module__": self.__module__}
         if task_name:
@@ -345,10 +360,35 @@ class TestCeleryTasks(unittest.TestCase):
             p = IncrementCounterTask.get_publisher(exchange="foo",
                                                    connection="bar")
             self.assertEqual(p.kwargs["exchange"], "foo")
+<<<<<<< HEAD
             self.assertTrue(p._declared)
+=======
+            p = IncrementCounterTask.get_publisher(exchange_type="fanout",
+                                                   connection="bar")
+            self.assertEqual(p.kwargs["exchange_type"], "fanout")
+>>>>>>> 503136a... 91% coverage
         finally:
             amqp.TaskPublisher = old_pub
 
+    def test_update_state(self):
+
+        @task_dec
+        def yyy():
+            pass
+
+        tid = gen_unique_id()
+        yyy.update_state(tid, "FROBULATING", {"fooz": "baaz"})
+        self.assertEqual(yyy.AsyncResult(tid).status, "FROBULATING")
+        self.assertDictEqual(yyy.AsyncResult(tid).result, {"fooz": "baaz"})
+
+    def test_has___name__(self):
+
+        @task_dec
+        def yyy2():
+            pass
+
+        self.assertTrue(yyy2.__name__)
+
     def test_get_logger(self):
         T1 = self.createTaskCls("T1", "c.unittest.t.t1")
         t1 = T1()

+ 69 - 0
celery/tests/test_timer2.py

@@ -0,0 +1,69 @@
+import time
+import unittest2 as unittest
+import celery.utils.timer2 as timer2
+
+from celery.tests.utils import skip_if_quick
+
+
+class test_Entry(unittest.TestCase):
+
+    def test_call(self):
+        scratch = [None]
+
+        def timed(x, y, moo="foo"):
+            scratch[0] = (x, y, moo)
+
+        tref = timer2.Entry(timed, (4, 4), {"moo": "baz"})
+        tref()
+
+        self.assertTupleEqual(scratch[0], (4, 4, "baz"))
+
+    def test_cancel(self):
+        tref = timer2.Entry(lambda x: x, (1, ), {})
+        tref.cancel()
+        self.assertTrue(tref.cancelled)
+
+
+class test_Schedule(unittest.TestCase):
+
+    def test_handle_error(self):
+        import time
+        from datetime import datetime
+        mktime = timer2.mktime
+        scratch = [None]
+
+        def _overflow(x):
+            raise OverflowError(x)
+
+        def on_error(exc_info):
+            scratch[0] = exc_info
+
+        s = timer2.Schedule(on_error=on_error)
+
+        timer2.mktime = _overflow
+        try:
+            s.enter(timer2.Entry(lambda: None, (), {}),
+                    eta=datetime.now())
+        finally:
+            timer2.mktime = mktime
+
+        _, exc, _ = scratch[0]
+        self.assertIsInstance(exc, OverflowError)
+
+
+class test_Timer(unittest.TestCase):
+
+    @skip_if_quick
+    def test_enter_after(self):
+        t = timer2.Timer()
+        done = [False]
+
+        def set_done():
+            done[0] = True
+
+        try:
+            t.apply_after(300, set_done)
+            while not done[0]:
+                time.sleep(0.1)
+        finally:
+            t.stop()

+ 2 - 2
celery/utils/timer2.py

@@ -31,7 +31,7 @@ class TimedFunctionFailed(UserWarning):
 class Entry(object):
     cancelled = False
 
-    def __init__(self, fun, args, kwargs):
+    def __init__(self, fun, args=None, kwargs=None):
         self.fun = fun
         self.args = args or []
         self.kwargs = kwargs or {}
@@ -157,7 +157,7 @@ class Timer(Thread):
             sleep(delay)
         try:
             self._stopped.set()
-        except TypeError:
+        except TypeError:           # pragma: no cover
             # we lost the race at interpreter shutdown,
             # so gc collected built-in modules.
             pass