Browse Source

90% coverage

Ask Solem 14 years ago
parent
commit
a81eb571f3

+ 1 - 1
celery/bin/camqadm.py

@@ -371,5 +371,5 @@ def camqadm(*args, **options):
 def main():
     AMQPAdminCommand().execute_from_commandline()
 
-if __name__ == "__main__":
+if __name__ == "__main__":              # pragma: no cover
     main()

+ 1 - 1
celery/bin/celerybeat.py

@@ -63,5 +63,5 @@ def main():
     beat = BeatCommand()
     beat.execute_from_commandline()
 
-if __name__ == "__main__":
+if __name__ == "__main__":      # pragma: no cover
     main()

+ 1 - 1
celery/bin/celeryctl.py

@@ -298,5 +298,5 @@ def main():
     except KeyboardInterrupt:
         pass
 
-if __name__ == "__main__":
+if __name__ == "__main__":          # pragma: no cover
     main()

+ 1 - 1
celery/bin/celeryd.py

@@ -153,5 +153,5 @@ def main():
     worker = WorkerCommand()
     worker.execute_from_commandline()
 
-if __name__ == "__main__":
+if __name__ == "__main__":          # pragma: no cover
     main()

+ 1 - 1
celery/bin/celeryd_multi.py

@@ -242,5 +242,5 @@ def main():
     MultiTool()(sys.argv[1:])
 
 
-if __name__ == "__main__":
+if __name__ == "__main__":              # pragma: no cover
     main()

+ 1 - 1
celery/bin/celeryev.py

@@ -53,5 +53,5 @@ def main():
     ev = EvCommand()
     ev.execute_from_commandline()
 
-if __name__ == "__main__":
+if __name__ == "__main__":              # pragma: no cover
     main()

+ 4 - 2
celery/concurrency/processes/__init__.py

@@ -87,9 +87,11 @@ class TaskPool(object):
 
         i = 0
         while pids ^ seen:
-            if time() - time_start > timeout:
+            print("%r > %r" % (time() - time_start, timeout))
+            if timeout and time() - time_start > timeout:
+                print("TIMED OUT i==%r" % (i, ))
                 break
-            results[i] = self._pool.apply_async(pingback,
+            x = results[i] = self._pool.apply_async(pingback,
                                                 args=(i, ),
                                                 callback=callback)
             sleep(0.1)

+ 4 - 0
celery/datastructures.py

@@ -241,6 +241,10 @@ class LimitedSet(object):
         self._expire_item()
         self._data[value] = time.time()
 
+    def clear(self):
+        """Remove all members"""
+        self._data.clear()
+
     def pop_value(self, value):
         """Remove membership by finding value."""
         self._data.pop(value, None)

+ 5 - 0
celery/loaders/default.py

@@ -4,6 +4,7 @@ from importlib import import_module
 
 from celery.datastructures import AttributeDict
 from celery.loaders.base import BaseLoader
+from celery.datastructures import AttributeDict
 from celery.exceptions import NotConfigured
 
 DEFAULT_CONFIG_MODULE = "celeryconfig"
@@ -22,7 +23,11 @@ DEFAULT_UNCONFIGURED_SETTINGS = {
 
 
 def wanted_module_item(item):
+<<<<<<< HEAD
     return item[0].isupper() and not item.startswith("_")
+=======
+    return not item.startswith("_")
+>>>>>>> 6a78496... 90% coverage
 
 
 class Loader(BaseLoader):

+ 49 - 0
celery/tests/test_bin/test_base.py

@@ -0,0 +1,49 @@
+import unittest2 as unittest
+
+from celery.bin.base import Command
+from celery.datastructures import AttributeDict
+
+
+class Object(object):
+    pass
+
+
+class MockCommand(Command):
+
+    def parse_options(self, prog_name, arguments):
+        options = Object()
+        options.foo = "bar"
+        options.prog_name = prog_name
+        return options, (10, 20, 30)
+
+    def run(self, *args, **kwargs):
+        return args, kwargs
+
+
+class test_Command(unittest.TestCase):
+
+    def test_defaults(self):
+        cmd1 = Command(defaults=None)
+        self.assertTrue(cmd1.defaults)
+
+        cmd2 = Command(defaults=AttributeDict({"foo": "bar"}))
+        self.assertTrue(cmd2.defaults)
+
+    def test_get_options(self):
+        cmd = Command()
+        cmd.option_list = (1, 2, 3)
+        self.assertTupleEqual(cmd.get_options(), (1, 2, 3))
+
+    def test_run_interface(self):
+        self.assertRaises(NotImplementedError, Command().run)
+
+    def test_execute_from_commandline(self):
+        cmd = MockCommand()
+        args1, kwargs1 = cmd.execute_from_commandline()     # sys.argv
+        self.assertTupleEqual(args1, (10, 20, 30))
+        self.assertDictContainsSubset({"foo": "bar"}, kwargs1)
+        self.assertTrue(kwargs1.get("prog_name"))
+        args2, kwargs2 = cmd.execute_from_commandline(["foo"])   # pass list
+        self.assertTupleEqual(args2, (10, 20, 30))
+        self.assertDictContainsSubset({"foo": "bar", "prog_name": "foo"},
+                                      kwargs2)

+ 95 - 0
celery/tests/test_concurrency_processes.py

@@ -1,8 +1,19 @@
 import sys
 import unittest2 as unittest
 
+from itertools import cycle
+
 from celery.concurrency import processes as mp
 from celery.datastructures import ExceptionInfo
+from celery.utils import noop
+
+from celery.tests.utils import skip_if_quick
+
+
+class Object(object):   # for writeable attributes.
+
+    def __init__(self, **kwargs):
+        [setattr(self, k, v) for k, v in kwargs.items()]
 
 
 def to_excinfo(exc):
@@ -12,6 +23,19 @@ def to_excinfo(exc):
         return ExceptionInfo(sys.exc_info())
 
 
+class MockResult(object):
+
+    def __init__(self, value, pid):
+        self.value = value
+        self.pid = pid
+
+    def worker_pids(self):
+        return [self.pid]
+
+    def get(self):
+        return self.value
+
+
 class MockPool(object):
     started = False
     closed = False
@@ -22,6 +46,9 @@ class MockPool(object):
     def __init__(self, *args, **kwargs):
         self.started = True
         self._state = mp.RUN
+        self.processes = kwargs.get("processes")
+        self._pool = [Object(pid=i) for i in range(self.processes)]
+        self._current_proc = cycle(xrange(self.processes)).next
 
     def close(self):
         self.closed = True
@@ -37,10 +64,23 @@ class MockPool(object):
         pass
 
 
+class ExeMockPool(MockPool):
+
+    def apply_async(self, target, args=(), kwargs={}, callback=noop):
+        from threading import Timer
+        res = target(*args, **kwargs)
+        Timer(0.1, callback, (res, )).start()
+        return MockResult(res, self._current_proc())
+
+
 class TaskPool(mp.TaskPool):
     Pool = MockPool
 
 
+class ExeMockTaskPool(mp.TaskPool):
+    Pool = ExeMockPool
+
+
 class test_TaskPool(unittest.TestCase):
 
     def test_start(self):
@@ -60,6 +100,24 @@ class test_TaskPool(unittest.TestCase):
         pool.terminate()
         self.assertTrue(_pool.terminated)
 
+    def test_pingback(self):
+        for i in xrange(10):
+            self.assertEqual(mp.pingback(i), i)
+
+    def test_on_worker_error(self):
+        scratch = [None]
+
+        def errback(einfo):
+            scratch[0] = einfo
+
+        pool = TaskPool(10)
+        exc = KeyError("foo")
+        pool.on_worker_error([errback], exc)
+
+        self.assertTrue(scratch[0])
+        self.assertIs(scratch[0].exception, exc)
+        self.assertTrue(scratch[0].traceback)
+
     def test_on_ready_exception(self):
         scratch = [None]
 
@@ -71,6 +129,25 @@ class test_TaskPool(unittest.TestCase):
         pool.on_ready([], [errback], exc)
         self.assertEqual(exc, scratch[0])
 
+    def test_safe_apply_callback(self):
+
+        _good_called = [0]
+        _evil_called = [0]
+
+        def good(x):
+            _good_called[0] = 1
+            return x
+
+        def evil(x):
+            _evil_called[0] = 1
+            raise KeyError(x)
+
+        pool = TaskPool(10)
+        self.assertIsNone(pool.safe_apply_callback(good, 10))
+        self.assertIsNone(pool.safe_apply_callback(evil, 10))
+        self.assertTrue(_good_called[0])
+        self.assertTrue(_evil_called[0])
+
     def test_on_ready_value(self):
         scratch = [None]
 
@@ -91,3 +168,21 @@ class test_TaskPool(unittest.TestCase):
         pool = TaskPool(10)
         pool.start()
         pool.apply_async(lambda x: x, (2, ), {})
+
+    def test_info(self):
+        pool = TaskPool(10)
+        procs = [Object(pid=i) for i in range(pool.limit)]
+        pool._pool = Object(_pool=procs)
+        info = pool.info
+        self.assertEqual(info["max-concurrency"], pool.limit)
+        self.assertEqual(len(info["processes"]), pool.limit)
+
+    @skip_if_quick
+    def test_diagnose(self):
+        pool = ExeMockTaskPool(10)
+        pool.start()
+
+        r = pool.diagnose(timeout=None)
+        self.assertEqual(len(r["active"]), pool.limit)
+        self.assertFalse(r["waiting"])
+        self.assertTrue(r["iterations"])

+ 27 - 0
celery/tests/test_loaders.py

@@ -80,3 +80,30 @@ class TestDefaultLoader(unittest.TestCase):
             l.on_worker_init()
         finally:
             sys.modules[configname] = prevconfig
+
+    def test_import_from_cwd(self):
+        l = default.Loader()
+        old_path = list(sys.path)
+        try:
+            sys.path.remove(os.getcwd())
+        except ValueError:
+            pass
+        celery = sys.modules.pop("celery", None)
+        try:
+            self.assertTrue(l.import_from_cwd("celery"))
+            sys.modules.pop("celery", None)
+            sys.path.insert(0, os.getcwd())
+            self.assertTrue(l.import_from_cwd("celery"))
+        finally:
+            sys.path = old_path
+            sys.modules["celery"] = celery
+
+    def test_unconfigured_settings(self):
+
+        class _Loader(default.Loader):
+
+            def import_from_cwd(self, name):
+                raise ImportError(name)
+
+        l = _Loader()
+        self.assertEqual(l.conf.CELERY_RESULT_BACKEND, "amqp")

+ 65 - 0
celery/tests/test_task_control.py

@@ -48,6 +48,71 @@ def with_mock_broadcast(fun):
     return _mocked
 
 
+class test_inspect(unittest.TestCase):
+
+    def setUp(self):
+        self.i = control.inspect()
+
+    def test_prepare_reply(self):
+        self.assertDictEqual(self.i._prepare([{"w1": {"ok": 1}},
+                                              {"w2": {"ok": 1}}]),
+                             {"w1": {"ok": 1}, "w2": {"ok": 1}})
+
+        i = control.inspect(destination="w1")
+        self.assertEqual(i._prepare([{"w1": {"ok": 1}}]),
+                         {"ok": 1})
+
+    @with_mock_broadcast
+    def test_active(self):
+        self.i.active()
+        self.assertIn("dump_active", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_scheduled(self):
+        self.i.scheduled()
+        self.assertIn("dump_schedule", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_reserved(self):
+        self.i.reserved()
+        self.assertIn("dump_reserved", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_stats(self):
+        self.i.stats()
+        self.assertIn("stats", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_revoked(self):
+        self.i.revoked()
+        self.assertIn("dump_revoked", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_registered_tasks(self):
+        self.i.registered_tasks()
+        self.assertIn("dump_tasks", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_enable_events(self):
+        self.i.enable_events()
+        self.assertIn("enable_events", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_disable_events(self):
+        self.i.disable_events()
+        self.assertIn("disable_events", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_diagnose(self):
+        self.i.diagnose()
+        self.assertIn("diagnose", MockBroadcastPublisher.sent)
+
+    @with_mock_broadcast
+    def test_ping(self):
+        self.i.ping()
+        self.assertIn("ping", MockBroadcastPublisher.sent)
+
+
 class test_Broadcast(unittest.TestCase):
 
     def test_discard_all(self):

+ 107 - 0
celery/tests/test_worker_state.py

@@ -0,0 +1,107 @@
+import unittest2 as unittest
+
+from celery.datastructures import LimitedSet
+from celery.worker import state
+
+
+class StateResetCase(unittest.TestCase):
+
+    def setUp(self):
+        self.reset_state()
+        self.on_setup()
+
+    def tearDown(self):
+        self.reset_state()
+        self.on_teardown()
+
+    def reset_state(self):
+        state.active_requests.clear()
+        state.revoked.clear()
+        state.total_count.clear()
+
+    def on_setup(self):
+        pass
+
+    def on_teardown(self):
+        pass
+
+
+class MockShelve(dict):
+    filename = None
+    in_sync = False
+    closed = False
+
+    def open(self, filename):
+        self.filename = filename
+        return self
+
+    def sync(self):
+        self.in_sync = True
+
+    def close(self):
+        self.closed = True
+
+
+class MyPersistent(state.Persistent):
+    storage = MockShelve()
+
+
+class test_Persistent(StateResetCase):
+
+    def on_setup(self):
+        self.p = MyPersistent(filename="celery-state")
+
+    def test_constructor(self):
+        self.assertDictEqual(self.p.db, {})
+        self.assertEqual(self.p.db.filename, self.p.filename)
+
+    def test_save(self):
+        self.p.db["foo"] = "bar"
+        self.p.save()
+        self.assertTrue(self.p.db.in_sync)
+        self.assertTrue(self.p.db.closed)
+
+    def add_revoked(self, *ids):
+        map(self.p.db.setdefault("revoked", LimitedSet()).add, ids)
+
+    def test_merge(self, data=["foo", "bar", "baz"]):
+        self.add_revoked(*data)
+        self.p.merge(self.p.db)
+        for item in data:
+            self.assertIn(item, state.revoked)
+
+    def test_sync(self, data1=["foo", "bar", "baz"],
+                        data2=["baz", "ini", "koz"]):
+        self.add_revoked(*data1)
+        map(state.revoked.add, data2)
+        self.p.sync(self.p.db)
+
+        for item in data2:
+            self.assertIn(item, self.p.db["revoked"])
+
+
+class SimpleReq(object):
+
+    def __init__(self, task_name):
+        self.task_name = task_name
+
+
+class test_state(StateResetCase):
+
+    def test_accepted(self, requests=[SimpleReq("foo"),
+                                      SimpleReq("bar"),
+                                      SimpleReq("baz"),
+                                      SimpleReq("baz")]):
+        map(state.task_accepted, requests)
+        for req in requests:
+            self.assertIn(req, state.active_requests)
+        self.assertEqual(state.total_count["foo"], 1)
+        self.assertEqual(state.total_count["bar"], 1)
+        self.assertEqual(state.total_count["baz"], 2)
+
+    def test_ready(self, requests=[SimpleReq("foo"),
+                                   SimpleReq("bar")]):
+        map(state.task_accepted, requests)
+        self.assertEqual(len(state.active_requests), 2)
+        map(state.task_ready, requests)
+        self.assertEqual(len(state.active_requests), 0)

+ 2 - 1
celery/worker/state.py

@@ -41,6 +41,7 @@ def task_ready(request):
 
 
 class Persistent(object):
+    storage = shelve
     _open = None
 
     def __init__(self, filename):
@@ -66,7 +67,7 @@ class Persistent(object):
         return d
 
     def open(self):
-        return shelve.open(self.filename)
+        return self.storage.open(self.filename)
 
     def close(self):
         if self._open: