test_monitoring.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import unittest
  2. import time
  3. from celery.monitoring import TaskTimerStats, Statistics, StatsCollector
  4. from carrot.connection import DjangoAMQPConnection
  5. from celery.messaging import StatsConsumer
  6. class PartialStatistics(Statistics):
  7. type = "c.u.partial"
  8. class TestStatisticsInterface(unittest.TestCase):
  9. def test_must_have_type(self):
  10. self.assertRaises(NotImplementedError, Statistics)
  11. def test_must_have_on_start(self):
  12. self.assertRaises(NotImplementedError, PartialStatistics().on_start)
  13. def test_must_have_on_stop(self):
  14. self.assertRaises(NotImplementedError, PartialStatistics().on_stop)
  15. class TestTaskTimerStats(unittest.TestCase):
  16. def test_time(self):
  17. self.assertTimeElapsed(0.5, 1, 0, "0.5")
  18. self.assertTimeElapsed(0.002, 0.05, 0, "0.0")
  19. self.assertTimeElapsed(0.1, 0.5, 0, "0.1")
  20. def test_not_enabled(self):
  21. t = TaskTimerStats()
  22. t.enabled = False
  23. self.assertFalse(t.publish(isnot="enabled"))
  24. self.assertFalse(getattr(t, "time_start", None))
  25. t.run("foo", "bar", [], {})
  26. t.stop()
  27. def assertTimeElapsed(self, time_sleep, max_appx, min_appx, appx):
  28. t = TaskTimerStats()
  29. t.enabled = True
  30. t.run("foo", "bar", [], {})
  31. self.assertTrue(t.time_start)
  32. time.sleep(time_sleep)
  33. time_stop = t.stop()
  34. self.assertTrue(time_stop)
  35. self.assertFalse(time_stop > max_appx)
  36. self.assertFalse(time_stop <= min_appx)
  37. strstop = str(time_stop)[0:3]
  38. # Time elapsed is approximately 0.1 seconds.
  39. self.assertTrue(strstop == appx)
  40. class TestStatsCollector(unittest.TestCase):
  41. def setUp(self):
  42. conn = DjangoAMQPConnection()
  43. consumer = StatsConsumer(connection=conn)
  44. consumer.discard_all()
  45. conn.close()
  46. consumer.close()
  47. self.s = StatsCollector()
  48. self.assertEquals(self.s.total_tasks_processed, 0)
  49. self.assertEquals(self.s.total_tasks_processed_by_type, {})
  50. self.assertEquals(self.s.total_task_time_running, 0.0)
  51. self.assertEquals(self.s.total_task_time_running_by_type, {})
  52. def test_collect_report_dump(self):
  53. timer1 = TaskTimerStats()
  54. timer1.enabled = True
  55. timer1.run("foo", "bar", [], {})
  56. timer2 = TaskTimerStats()
  57. timer2.enabled = True
  58. timer2.run("foo", "bar", [], {})
  59. timer3 = TaskTimerStats()
  60. timer3.enabled = True
  61. timer3.run("foo", "bar", [], {})
  62. for timer in (timer1, timer2, timer3):
  63. timer.stop()
  64. # Collect
  65. self.s.collect()
  66. self.assertEquals(self.s.total_tasks_processed, 3)
  67. # Report
  68. import sys
  69. from StringIO import StringIO
  70. out = StringIO()
  71. sys.stdout = out
  72. self.s.report()
  73. sys.stdout = sys.__stdout__
  74. output = out.getvalue()
  75. self.assertTrue(
  76. "Total processing time by task type:" in output)
  77. # Dump to cache
  78. self.s.dump_to_cache()