Browse Source

Started writing unit tests (conf, log, models, registry so far).

Ask Solem 16 years ago
parent
commit
7b99900ad9
4 changed files with 240 additions and 0 deletions
  1. 46 0
      celery/tests/test_conf.py
  2. 56 0
      celery/tests/test_log.py
  3. 62 0
      celery/tests/test_models.py
  4. 76 0
      celery/tests/test_registry.py

+ 46 - 0
celery/tests/test_conf.py

@@ -0,0 +1,46 @@
+import unittest
+from celery import conf
+from django.conf import settings
+
+SETTING_VARS = (
+    ("CELERY_AMQP_CONSUMER_QUEUE", "AMQP_CONSUMER_QUEUE",
+        "DEFAULT_AMQP_CONSUMER_QUEUE"),
+    ("CELERY_AMQP_ROUTING_KEY", "AMQP_ROUTING_KEY",
+        "DEFAULT_AMQP_ROUTING_KEY"),
+    ("CELERY_AMQP_EXCHANGE", "AMQP_EXCHANGE",
+        "DEFAULT_AMQP_EXCHANGE"),
+    ("CELERYD_CONCURRENCY", "DAEMON_CONCURRENCY",
+        "DEFAULT_DAEMON_CONCURRENCY"),
+    ("CELERYD_PID_FILE", "DAEMON_PID_FILE",
+        "DEFAULT_DAEMON_PID_FILE"),
+    ("CELERYD_EMPTY_MSG_EMIT_EVERY", "DEFAULT_EMPTY_MSG_EMIT_EVERY",
+        "EMPTY_MSG_EMIT_EVERY"),
+    ("CELERYD_QUEUE_WAKEUP_AFTER", "DEFAULT_QUEUE_WAKEUP_AFTER",
+        "QUEUE_WAKEUP_AFTER"),
+    ("CELERYD_LOG_FILE", "DEFAULT_DAEMON_LOG_FILE",
+        "DAEMON_LOG_FILE"),
+    ("CELERYD_DAEMON_LOG_FORMAT", "DEFAULT_LOG_FMT",
+        "LOG_FORMAT"),
+    ("CELERY_TASK_META_USE_DB", "DEFAULT_TASK_META_USE_DB",
+        "TASK_META_USE_DB"),
+)
+
+
+class TestConf(unittest.TestCase):
+
+    def assertDefaultSetting(self, setting_name, result_var, default_var):
+        if hasattr(settings, setting_name):
+            self.assertEquals(getattr(conf, result_var),
+                              getattr(settings, setting_name),
+                              "Overwritten setting %s is written to %s" % (
+                                  setting_name, result_var))
+        else:
+            self.assertEqual(getattr(conf, default_var),
+                             getattr(conf, result_var),
+                             "Default setting %s is written to %s" % (
+                                 default_var, result_var))
+            
+    def test_configuration_cls(self):
+        for setting_name, result_var, default_var in SETTING_VARS:
+            self.assertDefaultSetting(setting_name, result_var, default_var)
+        self.assertTrue(isinstance(conf.DAEMON_LOG_LEVEL, int))

+ 56 - 0
celery/tests/test_log.py

@@ -0,0 +1,56 @@
+import unittest
+
+import sys
+import logging
+import multiprocessing
+from StringIO import StringIO
+from celery.log import setup_logger, emergency_error
+
+
+class TestLog(unittest.TestCase):
+
+    def _assertLog(self, logger, logmsg, loglevel=logging.ERROR):
+        # Save old handlers
+        old_handler = logger.handlers[0]
+        logger.removeHandler(old_handler)
+        sio = StringIO()
+        siohandler = logging.StreamHandler(sio)
+        logger.addHandler(siohandler)
+        logger.log(loglevel, logmsg)
+        logger.removeHandler(siohandler)
+        # Reset original handlers
+        logger.addHandler(old_handler)
+        return sio.getvalue().strip()
+
+    def assertDidLogTrue(self, logger, logmsg, reason, loglevel=None):
+        val = self._assertLog(logger, logmsg, loglevel=loglevel)
+        return self.assertEqual(val, logmsg, reason)
+
+    def assertDidLogFalse(self, logger, logmsg, reason, loglevel=None):
+        val = self._assertLog(logger, logmsg, loglevel=loglevel)
+        return self.assertFalse(val, reason)
+
+    def test_setup_logger(self):
+        logger = setup_logger(loglevel=logging.ERROR, logfile=None)
+        self.assertTrue(logger.handlers[0].stream is sys.stderr,
+                "setup_logger logs to stderr without logfile argument.")
+        self.assertTrue(logger._process_aware,
+                "setup_logger() returns process aware logger.")
+        self.assertDidLogTrue(logger, "Logging something",
+                "Logger logs error when loglevel is ERROR",
+                loglevel=logging.ERROR)
+        self.assertDidLogFalse(logger, "Logging something",
+                "Logger doesn't info when loglevel is ERROR",
+                loglevel=logging.INFO)
+
+    def test_emergency_error(self):
+        sio = StringIO()
+        emergency_error(sio, "Testing emergency error facility")
+        self.assertEquals(sio.getvalue().rpartition(":")[2].strip(),
+                             "Testing emergency error facility")
+
+
+
+
+
+

+ 62 - 0
celery/tests/test_models.py

@@ -0,0 +1,62 @@
+import unittest
+import uuid
+from datetime import datetime, timedelta
+from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.task import PeriodicTask
+from celery.registry import tasks
+
+
+class TestPeriodicTask(PeriodicTask):
+    name = "celery.unittest.test_models.test_periodic_task"
+    run_every = timedelta(days=1)
+
+
+class TestModels(unittest.TestCase):
+
+    def createTaskMeta(self):
+        id = str(uuid.uuid4())
+        taskmeta, created = TaskMeta.objects.get_or_create(task_id=id)
+        return taskmeta
+
+    def createPeriodicTaskMeta(self, name):
+        ptaskmeta, created = PeriodicTaskMeta.objects.get_or_create(name=name)
+        return ptaskmeta
+
+    def test_taskmeta(self):
+        m1 = self.createTaskMeta()
+        m2 = self.createTaskMeta()
+        m3 = self.createTaskMeta()
+        self.assertTrue(m1.task_id)
+        self.assertTrue(isinstance(m1.date_done, datetime))
+
+        self.assertEquals(TaskMeta.objects.get_task(m1.task_id).task_id,
+                m1.task_id)
+        self.assertFalse(TaskMeta.objects.is_done(m1.task_id))
+        TaskMeta.objects.mark_as_done(m1.task_id)
+        TaskMeta.objects.mark_as_done(m2.task_id)
+        self.assertTrue(TaskMeta.objects.is_done(m1.task_id))
+        self.assertTrue(TaskMeta.objects.is_done(m2.task_id))
+
+        # Have to avoid save() because it applies the auto_now=True.
+        TaskMeta.objects.filter(task_id=m1.task_id).update(
+                date_done=datetime.now() - timedelta(days=10))
+        
+        expired = TaskMeta.objects.get_all_expired()
+        self.assertTrue(m1 in expired)
+        self.assertFalse(m2 in expired)
+        self.assertFalse(m3 in expired)
+
+        TaskMeta.objects.delete_expired()
+        self.assertFalse(m1 in TaskMeta.objects.all())
+       
+    def test_periodic_taskmeta(self):
+        tasks.register(TestPeriodicTask)
+        p = self.createPeriodicTaskMeta(TestPeriodicTask.name)
+        self.assertFalse(p in PeriodicTaskMeta.objects.get_waiting_tasks())
+        # Have to avoid save() because it applies the auto_now=True.
+        PeriodicTaskMeta.objects.filter(name=p.name).update(
+                last_run_at=datetime.now() - TestPeriodicTask.run_every)
+        self.assertTrue(p in PeriodicTaskMeta.objects.get_waiting_tasks())
+        self.assertTrue(isinstance(p.task, TestPeriodicTask))
+        
+

+ 76 - 0
celery/tests/test_registry.py

@@ -0,0 +1,76 @@
+import unittest
+from celery import registry
+from celery.task import Task, PeriodicTask
+
+FUNC_TASK_NAME = "celery.unittest.func_task"
+
+
+class TestTask(Task):
+    name = "celery.unittest.test_task"
+
+    def run(self, **kwargs):
+        return True
+
+
+class TestPeriodicTask(PeriodicTask):
+    name = "celery.unittest.test_periodic_task"
+    run_every = 10
+
+    def run(self, **kwargs):
+        return True
+
+
+def func_task(**kwargs):
+    return True
+
+
+class TestTaskRegistry(unittest.TestCase):
+
+    def assertRegisterUnregisterCls(self, r, task):
+        self.assertRaises(r.NotRegistered, r.unregister, task)
+        r.register(task)
+        self.assertTrue(task.name in r)
+        self.assertRaises(r.AlreadyRegistered, r.register, task)
+    
+    def assertRegisterUnregisterFunc(self, r, task, task_name):
+        self.assertRaises(r.NotRegistered, r.unregister, task_name)
+        r.register(task, task_name)
+        self.assertTrue(task_name in r)
+        self.assertRaises(r.AlreadyRegistered, r.register, task, task_name)
+
+    def test_task_registry(self):
+        r = registry.TaskRegistry()
+        self.assertTrue(isinstance(r.data, dict),
+                "TaskRegistry has composited dict")
+
+        self.assertRegisterUnregisterCls(r, TestTask)
+        self.assertRegisterUnregisterFunc(r, func_task, FUNC_TASK_NAME)
+        self.assertRegisterUnregisterCls(r, TestPeriodicTask)
+
+        tasks = r.get_all()
+        self.assertTrue(isinstance(tasks.get(TestTask.name), TestTask))
+        self.assertTrue(isinstance(tasks.get(TestPeriodicTask.name),
+                                   TestPeriodicTask))
+        self.assertEquals(tasks.get(FUNC_TASK_NAME), func_task)
+
+        regular = r.get_all_regular()
+        self.assertTrue(TestTask.name in regular)
+        self.assertFalse(TestPeriodicTask.name in regular)
+        self.assertTrue(FUNC_TASK_NAME in regular)
+
+        periodic = r.get_all_periodic()
+        self.assertFalse(TestTask.name in periodic)
+        self.assertTrue(TestPeriodicTask.name in periodic)
+        self.assertFalse(FUNC_TASK_NAME in periodic)
+
+        self.assertTrue(isinstance(r.get_task(TestTask.name), TestTask))
+        self.assertTrue(isinstance(r.get_task(TestPeriodicTask.name),
+                                   TestPeriodicTask))
+        self.assertEquals(r.get_task(FUNC_TASK_NAME), func_task)
+
+        r.unregister(TestTask)
+        self.assertFalse(TestTask.name in r)
+        r.unregister(TestPeriodicTask)
+        self.assertFalse(TestPeriodicTask.name in r)
+        r.unregister(FUNC_TASK_NAME)
+        self.assertFalse(FUNC_TASK_NAME in r)