monitoring.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. """
  2. Publishing Statistics and Monitoring Celery.
  3. """
  4. from carrot.connection import DjangoBrokerConnection
  5. from celery.messaging import StatsPublisher, StatsConsumer
  6. from celery.loaders import settings
  7. from django.core.cache import cache
  8. import time
  9. DEFAULT_CACHE_KEY_PREFIX = "celery-statistics"
  10. class Statistics(object):
  11. """Base class for classes publishing celery statistics.
  12. .. attribute:: type
  13. **REQUIRED** The type of statistics this class handles.
  14. **Required handlers**
  15. * on_start()
  16. * on_stop()
  17. """
  18. type = None
  19. def __init__(self, **kwargs):
  20. self.enabled = getattr(settings, "CELERY_STATISTICS", False)
  21. if not self.type:
  22. raise NotImplementedError(
  23. "Statistic classes must define their type.")
  24. def publish(self, **data):
  25. """Publish statistics to be collected later by
  26. :class:`StatsCollector`.
  27. :param data: An arbitrary Python object containing the statistics
  28. to be published.
  29. """
  30. if not self.enabled:
  31. return
  32. connection = DjangoBrokerConnection()
  33. publisher = StatsPublisher(connection=connection)
  34. publisher.send({"type": self.type, "data": data})
  35. publisher.close()
  36. connection.close()
  37. @classmethod
  38. def start(cls, *args, **kwargs):
  39. """Convenience method instantiating and running :meth:`run` in
  40. one swoop."""
  41. stat = cls()
  42. stat.run(*args, **kwargs)
  43. return stat
  44. def run(self, *args, **kwargs):
  45. """Start producing statistics."""
  46. if self.enabled:
  47. return self.on_start(*args, **kwargs)
  48. def stop(self, *args, **kwargs):
  49. """Stop producing and publish statistics."""
  50. if self.enabled:
  51. return self.on_finish(*args, **kwargs)
  52. def on_start(self, *args, **kwargs):
  53. """What to do when the :meth:`run` method is called."""
  54. raise NotImplementedError(
  55. "Statistics classes must define a on_start handler.")
  56. def on_stop(self, *args, **kwargs):
  57. """What to do when the :meth:`stop` method is called."""
  58. raise NotImplementedError(
  59. "Statistics classes must define a on_stop handler.")
  60. class TimerStats(Statistics):
  61. """A generic timer producing ``celery`` statistics.
  62. .. attribute:: time_start
  63. The time when this class was instantiated (in :func:`time.time`
  64. format.)
  65. """
  66. time_start = None
  67. def on_start(self, task_id, task_name, args, kwargs):
  68. """What to do when the timers :meth:`run` method is called."""
  69. self.task_id = task_id
  70. self.task_name = task_name
  71. self.args = args
  72. self.kwargs = kwargs
  73. self.time_start = time.time()
  74. def on_finish(self):
  75. """What to do when the timers :meth:`stop` method is called.
  76. :returns: the time in seconds it took between calling :meth:`start` on
  77. this class and :meth:`stop`.
  78. """
  79. nsecs = time.time() - self.time_start
  80. self.publish(task_id=self.task_id,
  81. task_name=self.task_name,
  82. args=self.args,
  83. kwargs=self.kwargs,
  84. nsecs=str(nsecs))
  85. return nsecs
  86. class TaskTimerStats(TimerStats):
  87. """Time a running :class:`celery.task.Task`."""
  88. type = "task_time_running"
  89. class StatsCollector(object):
  90. """Collect and report Celery statistics.
  91. **NOTE**: Please run only one collector at any time, or your stats
  92. will be skewed.
  93. .. attribute:: total_tasks_processed
  94. The number of tasks executed in total since the first time
  95. :meth:`collect` was executed on this class instance.
  96. .. attribute:: total_tasks_processed_by_type
  97. A dictionary of task names and how many times they have been
  98. executed in total since the first time :meth:`collect` was executed
  99. on this class instance.
  100. .. attribute:: total_task_time_running
  101. The total time, in seconds, it took to process all the tasks executed
  102. since the first time :meth:`collect` was executed on this class
  103. instance.
  104. .. attribute:: total_task_time_running_by_type
  105. A dictionary of task names and their total running time in seconds,
  106. counting all the tasks that has been run since the first time
  107. :meth:`collect` was executed on this class instance.
  108. **NOTE**: You have to run :meth:`collect` for these attributes
  109. to be filled.
  110. """
  111. allowed_types = ["task_time_running"]
  112. def __init__(self):
  113. self.total_tasks_processed = 0
  114. self.total_tasks_processed_by_type = {}
  115. self.total_task_time_running = 0.0
  116. self.total_task_time_running_by_type = {}
  117. def collect(self):
  118. """Collect any new statistics available since the last time
  119. :meth:`collect` was executed."""
  120. connection = DjangoBrokerConnection()
  121. consumer = StatsConsumer(connection=connection)
  122. it = consumer.iterqueue(infinite=False)
  123. for message in it:
  124. stats_entry = message.decode()
  125. stat_type = stats_entry["type"]
  126. if stat_type in self.allowed_types:
  127. # Decode keys to unicode for use as kwargs.
  128. data = dict((key.encode("utf-8"), value)
  129. for key, value in stats_entry["data"].items())
  130. handler = getattr(self, stat_type)
  131. handler(**data)
  132. def dump_to_cache(self, cache_key_prefix=DEFAULT_CACHE_KEY_PREFIX):
  133. """Store collected statistics in the cache."""
  134. cache.set("%s-total_tasks_processed" % cache_key_prefix,
  135. self.total_tasks_processed)
  136. cache.set("%s-total_tasks_processed_by_type" % cache_key_prefix,
  137. self.total_tasks_processed_by_type)
  138. cache.set("%s-total_task_time_running" % cache_key_prefix,
  139. self.total_task_time_running)
  140. cache.set("%s-total_task_time_running_by_type" % cache_key_prefix,
  141. self.total_task_time_running_by_type)
  142. def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
  143. """Process statistics regarding how long a task has been running
  144. (the :class:TaskTimerStats` class is responsible for sending these).
  145. :param task_id: The UUID of the task.
  146. :param task_name: The name of task.
  147. :param args: The tasks positional arguments.
  148. :param kwargs: The tasks keyword arguments.
  149. :param nsecs: The number of seconds (in :func:`time.time` format)
  150. it took to execute the task.
  151. """
  152. nsecs = float(nsecs)
  153. self.total_tasks_processed += 1
  154. self.total_task_time_running += nsecs
  155. if task_name not in self.total_task_time_running_by_type:
  156. self.total_task_time_running_by_type[task_name] = nsecs
  157. else:
  158. self.total_task_time_running_by_type[task_name] += nsecs
  159. if task_name not in self.total_tasks_processed_by_type:
  160. self.total_tasks_processed_by_type[task_name] = 1
  161. else:
  162. self.total_tasks_processed_by_type[task_name] += 1
  163. def report(self):
  164. """Dump a nice statistics report from the data collected since
  165. the first time :meth:`collect` was executed on this instance.
  166. It outputs the following information:
  167. * Total processing time by task type and how many times each
  168. task has been excuted.
  169. * Total task processing time.
  170. * Total number of tasks executed
  171. """
  172. print("Total processing time by task type:")
  173. for task_name, nsecs in self.total_task_time_running_by_type.items():
  174. print("\t%s: %s secs. (for a total of %d executed.)" % (
  175. task_name, nsecs,
  176. self.total_tasks_processed_by_type.get(task_name)))
  177. print("Total task processing time: %s secs." % (
  178. self.total_task_time_running))
  179. print("Total tasks processed: %d" % self.total_tasks_processed)