Browse Source

Looks good...

Ask Solem 16 years ago
parent
commit
7fb67e0577
2 changed files with 53 additions and 19 deletions
  1. 49 13
      celery/monitoring.py
  2. 4 6
      celery/worker.py

+ 49 - 13
celery/monitoring.py

@@ -2,25 +2,61 @@ from carrot.connection import DjangoAMQPConnection
 from celery.messaging import StatsPublisher, StatsConsumer
 from django.conf import settings
 
-def send_statistics(stat_name, **data):
-    send_stats = getattr(settings, "CELERY_STATISTICS", False)
-    if send_stats:
+
+class Statistics(object):
+    type = None
+
+    def __init__(self, **kwargs):
+        self.enabled = getattr(settings, "CELERY_STATISTICS", False))
+        if not self.type:
+            raise NotImplementedError(
+                "Statistic classes must define their type.")
+
+    def publish(self, **data):
+        if not self.enabled:
+            return
         connection = DjangoAMQPConnection()
         publisher = StatsPublisher(connection=connection)
-        publisher.send({"stat_name": stat_name, "data": data})
+        publisher.send({"type": self.type, "data": data})
         publisher.close()
         connection.close()
 
+        @classmethod
+        def start(cls, *args, **kwargs):
+            stat = cls()
+            stat.run()
+            return stat
 
-class Statistics(object):
+        def run(self, *args, **kwargs):
+            if stat.enabled:
+                stat.on_start(*args, **kwargs)
 
-    def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
-        send_statistics("task_time_running",
-                        task_id=task_id,
-                        task_name=task_name,
-                        args=args,
-                        kwargs=kwargs,
-                        nsecs=nsecs)
+        def stop(self, *args, **kwargs):
+            if self.enabled:
+                self.on_finish(*args, **kwargs)
+
+
+class TimerStats(Statistics):
+    time_start = None
+
+    def on_start(self, task_id, task_name, args, kwargs):
+        self.task_id = task_id
+        self.task_name = task_name
+        self.args = self.args
+        self.kwargs = self.kwargs
+        self.time_start = time.time()
+    
+    def on_finish(self):
+        nsecs = time.time() - self.time_start
+        self.publish(task_id=task_id,
+                     task_name=task_name,
+                     args=args,
+                     kwargs=kwargs,
+                     nsecs=nsecs)
+
+
+class TaskTimerStats(TimerStats):
+    type = "task_time_running"
 
 
 class StatsCollector(object):
@@ -33,7 +69,7 @@ class StatsCollector(object):
         total = 0
         for message in it:
             data = message.decode()
-            stat_name = data.get("stat_name")
+            stat_name = data.get("type")
             if stat_name in self.allowed_stats:
                 handler = getattr(self, stat_name)
                 handler(**data["data"])

+ 4 - 6
celery/worker.py

@@ -11,7 +11,7 @@ from celery.datastructures import ExceptionInfo
 from celery.backends import default_backend, default_periodic_status_backend
 from celery.timer import EventTimer
 from django.core.mail import mail_admins
-from celery.monitoring import Statistics
+from celery.monitoring import TaskTimerStats
 import multiprocessing
 import traceback
 import threading
@@ -58,6 +58,7 @@ def jail(task_id, task_name, func, args, kwargs):
     result, and sets the task status to ``"FAILURE"``.
 
     :param task_id: The id of the task.
+    :param task_name: The name of the task.
     :param func: Callable object to execute.
     :param args: List of positional args to pass on to the function.
     :param kwargs: Keyword arguments mapping to pass on to the function.
@@ -66,7 +67,7 @@ def jail(task_id, task_name, func, args, kwargs):
         the exception instance on failure.
 
     """
-    time_start = time.time()
+    timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
 
     # See: http://groups.google.com/group/django-users/browse_thread/
     #       thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
@@ -93,10 +94,7 @@ def jail(task_id, task_name, func, args, kwargs):
         default_backend.mark_as_done(task_id, result)
         retval = result
 
-    time_finished = time.time() - time_start
-    Statistics().task_time_running(task_id, task_name, args, kwargs,
-                                   time_finished)
-    
+    timer_stat.stop()
     return retval