monitoring.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.messaging import StatsPublisher, StatsConsumer
  3. from django.conf import settings
  4. class Statistics(object):
  5. type = None
  6. def __init__(self, **kwargs):
  7. self.enabled = getattr(settings, "CELERY_STATISTICS", False)
  8. if not self.type:
  9. raise NotImplementedError(
  10. "Statistic classes must define their type.")
  11. def publish(self, **data):
  12. if not self.enabled:
  13. return
  14. connection = DjangoAMQPConnection()
  15. publisher = StatsPublisher(connection=connection)
  16. publisher.send({"type": self.type, "data": data})
  17. publisher.close()
  18. connection.close()
  19. @classmethod
  20. def start(cls, *args, **kwargs):
  21. stat = cls()
  22. stat.run()
  23. return stat
  24. def run(self, *args, **kwargs):
  25. if stat.enabled:
  26. stat.on_start(*args, **kwargs)
  27. def stop(self, *args, **kwargs):
  28. if self.enabled:
  29. self.on_finish(*args, **kwargs)
  30. class TimerStats(Statistics):
  31. time_start = None
  32. def on_start(self, task_id, task_name, args, kwargs):
  33. self.task_id = task_id
  34. self.task_name = task_name
  35. self.args = self.args
  36. self.kwargs = self.kwargs
  37. self.time_start = time.time()
  38. def on_finish(self):
  39. nsecs = time.time() - self.time_start
  40. self.publish(task_id=task_id,
  41. task_name=task_name,
  42. args=args,
  43. kwargs=kwargs,
  44. nsecs=nsecs)
  45. class TaskTimerStats(TimerStats):
  46. type = "task_time_running"
  47. class StatsCollector(object):
  48. allowed_types = ["task_time_running"]
  49. total_tasks_processed = 0
  50. total_task_time_running = 0
  51. total_task_time_running_by_type = {}
  52. def collect(self):
  53. connection = DjangoAMQPConnection()
  54. consumer = StatsConsumer(connection=connection)
  55. it = consumer.iterqueue(infinite=False)
  56. for message in it:
  57. stats_entry = message.decode()
  58. stat_type = stats_entry["type"]
  59. if stat_type in self.allowed_types:
  60. handler = getattr(self, stat_type)
  61. handler(**stats_entry["data"])
  62. return self.on_cycle_end()
  63. def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
  64. self.total_task_time_running += nsecs
  65. self.total_task_time_running_by_type[task_name] = \
  66. self.total_task_time_running_by_type.get(task_name, nsecs)
  67. self.total_task_time_running_by_type[task_name] += nsecs
  68. print("Task %s[%s](%s, %s): %d" % (
  69. task_id, task_name, args, kwargs, nsecs))
  70. def on_cycle_end(self):
  71. print("-" * 64)
  72. print("Total processing time by task type:")
  73. for task_name, nsecs in self.total_task_time_running_by_type.items():
  74. print("\t%s: %d" % (task_name, nsecs))
  75. print("Total task processing time: %d" % (
  76. self.total_task_time_running))
  77. print("Total tasks processed: %d" % self.total_tasks_processed)