Bladeren bron

Fix and document the statistics (monitoring) module.

Ask Solem 16 jaren geleden
bovenliggende
commit
908fe7e24f
2 gewijzigde bestanden met toevoegingen van 146 en 24 verwijderingen
  1. 3 1
      celery/bin/celeryd.py
  2. 143 23
      celery/monitoring.py

+ 3 - 1
celery/bin/celeryd.py

@@ -148,6 +148,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
             discarded_count, what))
     print("* Reporting of statistics is %s..." % (
         settings.CELERY_STATISTICS and "ON" or "OFF"))
+    context = None
     if daemon:
         # Since without stderr any errors will be silently suppressed,
         # we need to know that we have access to the logfile
@@ -182,7 +183,8 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
     except:
-        context.close()
+        if context:
+            context.close()
         raise
 
 

+ 143 - 23
celery/monitoring.py

@@ -1,9 +1,24 @@
+"""
+
+    Publishing Statistics and Monitoring Celery.
+
+"""
 from carrot.connection import DjangoAMQPConnection
 from celery.messaging import StatsPublisher, StatsConsumer
 from django.conf import settings
+import time
 
 
 class Statistics(object):
+    """Base class for classes publishing celery statistics.
+    
+    .. attribute:: type
+
+        **REQUIRED** The type of statistics this class handles.
+
+    **Required handlers**
+
+    """
     type = None
 
     def __init__(self, **kwargs):
@@ -13,6 +28,13 @@ class Statistics(object):
                 "Statistic classes must define their type.")
 
     def publish(self, **data):
+        """Publish statistics to be collected later by
+        :class:`StatsCollector`.
+
+        :param data: An arbitrary Python object containing the statistics
+            to be published.
+
+        """
         if not self.enabled:
             return
         connection = DjangoAMQPConnection()
@@ -23,49 +45,117 @@ class Statistics(object):
 
     @classmethod
     def start(cls, *args, **kwargs):
+        """Convenience method instantiating and running :meth:`run` in
+        one swoop."""
         stat = cls()
-        stat.run()
+        stat.run(*args, **kwargs)
         return stat
 
     def run(self, *args, **kwargs):
-        if stat.enabled:
-            stat.on_start(*args, **kwargs)
+        """Start producing statistics."""
+        if self.enabled:
+            return self.on_start(*args, **kwargs)
 
     def stop(self, *args, **kwargs):
+        """Stop producing and publish statistics."""
         if self.enabled:
-            self.on_finish(*args, **kwargs)
+            return self.on_finish(*args, **kwargs)
+
+    def on_start(self, *args, **kwargs):
+        """What to do when the :meth:`run` method is called."""
+        raise NotImplementedError(
+                "Statistics classes must define a on_start handler.")
 
+    def on_stop(self, *args, **kwargs):
+        """What to do when the :meth:`stop` method is called."""
+        raise NotImplementedError(
+                "Statistics classes must define a on_stop handler.")
 
 class TimerStats(Statistics):
+    """A generic timer producing ``celery`` statistics.
+
+    .. attribute:: time_start
+
+        The time when this class was instantiated (in :func:`time.time`
+        format.)
+
+    """
     time_start = None
 
     def on_start(self, task_id, task_name, args, kwargs):
+        """What to do when the timers :meth:`run` method is called."""
         self.task_id = task_id
         self.task_name = task_name
-        self.args = self.args
-        self.kwargs = self.kwargs
+        self.args = args
+        self.kwargs = kwargs
         self.time_start = time.time()
     
     def on_finish(self):
+        """What to do when the timers :meth:`stop` method is called.
+
+        :returns: the time in seconds it took between calling :meth:`start` on
+            this class and :meth:`stop`.
+        """
         nsecs = time.time() - self.time_start
-        self.publish(task_id=task_id,
-                     task_name=task_name,
-                     args=args,
-                     kwargs=kwargs,
-                     nsecs=nsecs)
+        self.publish(task_id=self.task_id,
+                     task_name=self.task_name,
+                     args=self.args,
+                     kwargs=self.kwargs,
+                     nsecs=str(nsecs))
+        return nsecs
 
 
 class TaskTimerStats(TimerStats):
+    """Time a running :class:`celery.task.Task`."""
     type = "task_time_running"
 
 
 class StatsCollector(object):
+    """Collect and report Celery statistics.
+    
+    **NOTE**: Please run only one collector at any time, or your stats
+        will be skewed.
+
+    .. attribute:: total_tasks_processed
+
+        The number of tasks executed in total since the first time
+        :meth:`collect` was executed on this class instance.
+
+    .. attribute:: total_tasks_processed_by_type
+
+        A dictionary of task names and how many times they have been
+        executed in total since the first time :meth:`collect` was executed
+        on this class instance.
+
+    .. attribute:: total_task_time_running
+
+        The total time, in seconds, it took to process all the tasks executed
+        since the first time :meth:`collect` was executed on this class
+        instance.
+
+    .. attribute:: total_task_time_running_by_type
+        
+        A dictionary of task names and their total running time in seconds,
+        counting all the tasks that has been run since the first time
+        :meth:`collect` was executed on this class instance.
+
+    **NOTE**: You have to run :meth:`collect` for these attributes
+        to be filled.
+        
+
+    """
+
     allowed_types = ["task_time_running"]
-    total_tasks_processed = 0
-    total_task_time_running = 0
-    total_task_time_running_by_type = {}
+
+    def __init__(self):
+        self.total_tasks_processed = 0
+        self.total_tasks_processed_by_type = {}
+        self.total_task_time_running = 0.0
+        self.total_task_time_running_by_type = {}
 
     def collect(self):
+        """Collect any new statistics available since the last time
+        :methd:`collect` was executed."""
         connection = DjangoAMQPConnection()
         consumer = StatsConsumer(connection=connection)
         it = consumer.iterqueue(infinite=False)
@@ -75,21 +165,51 @@ class StatsCollector(object):
             if stat_type in self.allowed_types:
                 handler = getattr(self, stat_type)
                 handler(**stats_entry["data"])
-        return self.on_cycle_end()
 
     def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
+        """Process statistics regarding how long a task has been running
+        (the :class:TaskTimerStats` class is responsible for sending these).
+
+        :param task_id: The UUID of the task.
+        :param task_name: The name of task.
+        :param args: The tasks positional arguments.
+        :param kwargs: The tasks keyword arguments.
+        :param nsecs: The number of seconds (in :func:`time.time` format)
+            it took to execute the task.
+
+        """
+        nsecs = float(nsecs)
+        self.total_tasks_processed += 1
         self.total_task_time_running += nsecs
-        self.total_task_time_running_by_type[task_name] = \
-                self.total_task_time_running_by_type.get(task_name, nsecs)
-        self.total_task_time_running_by_type[task_name] += nsecs
-        print("Task %s[%s](%s, %s): %d" % (
-                task_id, task_name, args, kwargs, nsecs))
+        if task_name not in self.total_task_time_running_by_type:
+            self.total_task_time_running_by_type[task_name] = nsecs
+        else:
+            self.total_task_time_running_by_type[task_name] += nsecs
+        if task_name not in self.total_tasks_processed_by_type:
+            self.total_tasks_processed_by_type[task_name] = 1
+        else:
+            self.total_tasks_processed_by_type[task_name] += 1
+
+    def report(self):
+        """Dump a nice statistics report from the data collected since
+        the first time :meth:`collect` was executed on this instance.
+
+        It outputs the following information:
+
+            * Total processing time by task type and how many times each
+                task has been excuted.
+
+            * Total task processing time.
 
-    def on_cycle_end(self):
+            * Total number of tasks executed.
+        
+        """
         print("-" * 64)
         print("Total processing time by task type:")
         for task_name, nsecs in self.total_task_time_running_by_type.items():
-            print("\t%s: %d" % (task_name, nsecs))
-        print("Total task processing time: %d" % (
+            print("\t%s: %s secs. (for a total of %d executed.)" % (
+                    task_name, nsecs,
+                    self.total_tasks_processed_by_type.get(task_name)))
+        print("Total task processing time: %s secs." % (
             self.total_task_time_running))
         print("Total tasks processed: %d" % self.total_tasks_processed)