Procházet zdrojové kódy

Started on Statistics/Monitoring.

Ask Solem před 16 roky
rodič
revize
a6428a9eb6
3 změnil soubory, kde provedl 76 přidání a 5 odebrání
  1. 18 0
      celery/messaging.py
  2. 45 0
      celery/monitoring.py
  3. 13 5
      celery/worker.py

+ 18 - 0
celery/messaging.py

@@ -71,3 +71,21 @@ class TaskConsumer(Consumer):
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     auto_ack = True
     auto_ack = True
     decoder = pickle.loads
     decoder = pickle.loads
+
+
+class StatsPublisher(Publisher):
+    exchange = "celerygraph"
+    routing_key = "stats"
+    encoder = pickle.dumps
+
+
+class StatsConsumer(Consumer):
+    queue = "celerygraph"
+    exchange = "celerygraph"
+    routing_key = "stats"
+    exchange_type = "direct"
+    auto_ack = True
+    decoder = pickle.loads
+
+    def receive(self, message_data, message):
+        pass

+ 45 - 0
celery/monitoring.py

@@ -0,0 +1,45 @@
+from carrot.connection import DjangoAMQPConnection
+from celery.messaging import StatsPublisher, StatsConsumer
+from django.conf import settings
+
+def send_statistics(stat_name, **data):
+    send_stats = getattr(settings, "CELERY_STATISTICS", False)
+    if send_stats:
+        connection = DjangoAMQPConnection()
+        publisher = StatsPublisher(connection=connection)
+        publisher.send({"stat_name": stat_name, "data": data})
+        publisher.close()
+        connection.close()
+
+
+class Statistics(object):
+
+    def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
+        send_statistics("task_time_running",
+                        task_id=task_id,
+                        task_name=task_name,
+                        args=args,
+                        kwargs=kwargs,
+                        nsecs=nsecs)
+
+
+class StatsCollector(object):
+    allowed_stats = ["task_time_running"]
+
+    def run(self):
+        connection = DjangoAMQPConnection()
+        consumer = StatsConsumer(connection=connection)
+        it = consumer.iterqueue(infinite=False)
+        total = 0
+        for message in it:
+            data = message.decode()
+            stat_name = data.get("stat_name")
+            if stat_name in self.allowed_stats:
+                handler = getattr(self, stat_name)
+                handler(**data["data"])
+                total += 1
+        return total
+
+    def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
+        print("Task %s[%s](%s, %s): %d" % (
+                task_id, task_name, args, kwargs, nsecs))

+ 13 - 5
celery/worker.py

@@ -11,6 +11,7 @@ from celery.datastructures import ExceptionInfo
 from celery.backends import default_backend, default_periodic_status_backend
 from celery.backends import default_backend, default_periodic_status_backend
 from celery.timer import EventTimer
 from celery.timer import EventTimer
 from django.core.mail import mail_admins
 from django.core.mail import mail_admins
+from celery.monitoring import Statistics
 import multiprocessing
 import multiprocessing
 import traceback
 import traceback
 import threading
 import threading
@@ -45,7 +46,7 @@ class UnknownTask(Exception):
     ignored."""
     ignored."""
 
 
 
 
-def jail(task_id, func, args, kwargs):
+def jail(task_id, task_name, func, args, kwargs):
     """Wraps the task in a jail, which catches all exceptions, and
     """Wraps the task in a jail, which catches all exceptions, and
     saves the status and result of the task execution to the task
     saves the status and result of the task execution to the task
     meta backend.
     meta backend.
@@ -65,6 +66,7 @@ def jail(task_id, func, args, kwargs):
         the exception instance on failure.
         the exception instance on failure.
 
 
     """
     """
+    time_start = time.time()
 
 
     # See: http://groups.google.com/group/django-users/browse_thread/
     # See: http://groups.google.com/group/django-users/browse_thread/
     #       thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
     #       thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
@@ -86,10 +88,16 @@ def jail(task_id, func, args, kwargs):
         result = func(*args, **kwargs)
         result = func(*args, **kwargs)
     except Exception, exc:
     except Exception, exc:
         default_backend.mark_as_failure(task_id, exc)
         default_backend.mark_as_failure(task_id, exc)
-        return ExceptionInfo(sys.exc_info())
+        retval = ExceptionInfo(sys.exc_info())
     else:
     else:
         default_backend.mark_as_done(task_id, result)
         default_backend.mark_as_done(task_id, result)
-        return result
+        retval = result
+
+    time_finished = time.time() - time_start
+    Statistics().task_time_running(task_id, task_name, args, kwargs,
+                                   time_finished)
+    
+    return retval
 
 
     
     
 
 
@@ -199,7 +207,7 @@ class TaskWrapper(object):
 
 
         """
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        return jail(self.task_id, [
+        return jail(self.task_id, self.task_name, [
                         self.task_func, self.args, task_func_kwargs])
                         self.task_func, self.args, task_func_kwargs])
 
 
     def on_success(self, ret_value, meta):
     def on_success(self, ret_value, meta):
@@ -244,7 +252,7 @@ class TaskWrapper(object):
 
 
         """
         """
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        jail_args = [self.task_id, self.task_func,
+        jail_args = [self.task_id, self.task_name, self.task_func,
                      self.args, task_func_kwargs]
                      self.args, task_func_kwargs]
         return pool.apply_async(jail, args=jail_args,
         return pool.apply_async(jail, args=jail_args,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 callbacks=[self.on_success], errbacks=[self.on_failure],