monitoring.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.messaging import StatsPublisher, StatsConsumer
  3. from django.conf import settings
  4. def send_statistics(stat_name, **data):
  5. send_stats = getattr(settings, "CELERY_STATISTICS", False)
  6. if send_stats:
  7. connection = DjangoAMQPConnection()
  8. publisher = StatsPublisher(connection=connection)
  9. publisher.send({"stat_name": stat_name, "data": data})
  10. publisher.close()
  11. connection.close()
  12. class Statistics(object):
  13. def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
  14. send_statistics("task_time_running",
  15. task_id=task_id,
  16. task_name=task_name,
  17. args=args,
  18. kwargs=kwargs,
  19. nsecs=nsecs)
  20. class StatsCollector(object):
  21. allowed_stats = ["task_time_running"]
  22. def run(self):
  23. connection = DjangoAMQPConnection()
  24. consumer = StatsConsumer(connection=connection)
  25. it = consumer.iterqueue(infinite=False)
  26. total = 0
  27. for message in it:
  28. data = message.decode()
  29. stat_name = data.get("stat_name")
  30. if stat_name in self.allowed_stats:
  31. handler = getattr(self, stat_name)
  32. handler(**data["data"])
  33. total += 1
  34. return total
  35. def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
  36. print("Task %s[%s](%s, %s): %d" % (
  37. task_id, task_name, args, kwargs, nsecs))