|
@@ -1,6 +1,8 @@
|
|
|
import unittest
|
|
|
import time
|
|
|
from celery.monitoring import TaskTimerStats, Statistics, StatsCollector
|
|
|
+from carrot.connection import DjangoAMQPConnection
|
|
|
+from celery.messaging import StatsConsumer
|
|
|
|
|
|
|
|
|
class PartialStatistics(Statistics):
|
|
@@ -21,6 +23,7 @@ class TestStatisticsInterface(unittest.TestCase):
|
|
|
|
|
|
class TestTaskTimerStats(unittest.TestCase):
|
|
|
|
|
|
+
|
|
|
def test_time(self):
|
|
|
self.assertTimeElapsed(0.5, 1, 0, "0.5")
|
|
|
self.assertTimeElapsed(0.002, 0.05, 0, "0.0")
|
|
@@ -51,15 +54,48 @@ class TestTaskTimerStats(unittest.TestCase):
|
|
|
|
|
|
|
|
|
class TestStatsCollector(unittest.TestCase):
|
|
|
-
|
|
|
- def test_attrs(self):
|
|
|
- s = StatsCollector()
|
|
|
- self.assertEquals(s.total_tasks_processed, 0)
|
|
|
- self.assertEquals(s.total_tasks_processed_by_type, {})
|
|
|
- self.assertEquals(s.total_task_time_running, 0.0)
|
|
|
- self.assertEquals(s.total_task_time_running_by_type, {})
|
|
|
-
|
|
|
- def test_collect(self):
|
|
|
- s = StatsCollector()
|
|
|
- s.collect()
|
|
|
- self.assertEquals(s.total_tasks_processed, 0)
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ conn = DjangoAMQPConnection()
|
|
|
+ consumer = StatsConsumer(connection=conn)
|
|
|
+ consumer.discard_all()
|
|
|
+ conn.close()
|
|
|
+ consumer.close()
|
|
|
+ self.s = StatsCollector()
|
|
|
+ self.assertEquals(self.s.total_tasks_processed, 0)
|
|
|
+ self.assertEquals(self.s.total_tasks_processed_by_type, {})
|
|
|
+ self.assertEquals(self.s.total_task_time_running, 0.0)
|
|
|
+ self.assertEquals(self.s.total_task_time_running_by_type, {})
|
|
|
+
|
|
|
+ def test_collect_report_dump(self):
|
|
|
+ timer1 = TaskTimerStats()
|
|
|
+ timer1.enabled = True
|
|
|
+ timer1.run("foo", "bar", [], {})
|
|
|
+ timer2 = TaskTimerStats()
|
|
|
+ timer2.enabled = True
|
|
|
+ timer2.run("foo", "bar", [], {})
|
|
|
+ timer3 = TaskTimerStats()
|
|
|
+ timer3.enabled = True
|
|
|
+ timer3.run("foo", "bar", [], {})
|
|
|
+ for timer in (timer1, timer2, timer3):
|
|
|
+ timer.stop()
|
|
|
+
|
|
|
+
|
|
|
+ # Collect
|
|
|
+ self.s.collect()
|
|
|
+ self.assertEquals(self.s.total_tasks_processed, 3)
|
|
|
+
|
|
|
+ # Report
|
|
|
+ import sys
|
|
|
+ from StringIO import StringIO
|
|
|
+ out = StringIO()
|
|
|
+ sys.stdout = out
|
|
|
+ self.s.report()
|
|
|
+ sys.stdout = sys.__stdout__
|
|
|
+
|
|
|
+ output = out.getvalue()
|
|
|
+ self.assertTrue(
|
|
|
+ "Total processing time by task type:" in output)
|
|
|
+
|
|
|
+ # Dump to cache
|
|
|
+ self.s.dump_to_cache()
|