Bläddra i källkod

More statistics

Ask Solem 16 år sedan
förälder
incheckning
b9d223cdc9
3 ändrade filer med 60 tillägg och 26 borttagningar
  1. 23 3
      celery/bin/celeryd.py
  2. 36 22
      celery/monitoring.py
  3. 1 1
      celery/worker.py

+ 23 - 3
celery/bin/celeryd.py

@@ -20,6 +20,11 @@
 
 
     Path to pidfile.
     Path to pidfile.
 
 
+.. cmdoption:: -s, --statistics
+
+    Turn on reporting of statistics (remember to flush the statistics message
+    queue from time to time).
+
 .. cmdoption:: -w, --wakeup-after
 .. cmdoption:: -w, --wakeup-after
 
 
     If the queue is empty, this is the time *in seconds* the
     If the queue is empty, this is the time *in seconds* the
@@ -78,6 +83,9 @@ import atexit
 from daemon import DaemonContext
 from daemon import DaemonContext
 from daemon.pidlockfile import PIDLockFile
 from daemon.pidlockfile import PIDLockFile
 
 
+USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
+settings.CELERY_STATISTICS = USE_STATISTICS
+
 
 
 def acquire_pidlock(pidfile):
 def acquire_pidlock(pidfile):
     """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
     """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
@@ -113,8 +121,14 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
         pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER,
         pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER,
         umask=0, uid=None, gid=None, working_directory=None, chroot=None,
         umask=0, uid=None, gid=None, working_directory=None, chroot=None,
-        **kwargs):
+        statistics=None, **kwargs):
     """Run the celery daemon."""
     """Run the celery daemon."""
+
+    print(">>> Launching celery, please hold on to something...")
+
+    if statistics:
+        settings.CELERY_STATISTICS = statistics
+
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
         import warnings
         import warnings
         warnings.warn("The sqlite3 database engine doesn't support "
         warnings.warn("The sqlite3 database engine doesn't support "
@@ -130,8 +144,10 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         what = "message"
         what = "message"
         if discarded_count > 1:
         if discarded_count > 1:
             what = "messages"
             what = "messages"
-        sys.stderr.write("Discard: Erased %d %s from the queue.\n" % (
+        sys.stderr.write("* Discard: Erased %d %s from the queue.\n" % (
             discarded_count, what))
             discarded_count, what))
+    print("* Reporting of statistics is %s..." % (
+        settings.CELERY_STATISTICS and "ON" or "OFF"))
     if daemon:
     if daemon:
         # Since without stderr any errors will be silently suppressed,
         # Since without stderr any errors will be silently suppressed,
         # we need to know that we have access to the logfile
         # we need to know that we have access to the logfile
@@ -143,7 +159,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         uid = uid and int(uid) or os.geteuid()
         uid = uid and int(uid) or os.geteuid()
         gid = gid and int(gid) or os.getegid()
         gid = gid and int(gid) or os.getegid()
         working_directory = working_directory or os.getcwd()
         working_directory = working_directory or os.getcwd()
-        sys.stderr.write("Launching celeryd in the background...\n")
+        sys.stderr.write("* Launching celeryd in the background...\n")
         context = DaemonContext(chroot_directory=chroot,
         context = DaemonContext(chroot_directory=chroot,
                                 working_directory=working_directory,
                                 working_directory=working_directory,
                                 umask=umask,
                                 umask=umask,
@@ -185,6 +201,10 @@ OPTION_LIST = (
     optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
     optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
             action="store", dest="pidfile",
             action="store", dest="pidfile",
             help="Path to pidfile."),
             help="Path to pidfile."),
+    optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
+            action="store_true", dest="statistics",
+            help="Turn on reporting of statistics (remember to flush the "
+                 "statistics message queue from time to time)."),
     optparse.make_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
     optparse.make_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
             action="store", type="float", dest="queue_wakeup_after",
             action="store", type="float", dest="queue_wakeup_after",
             help="If the queue is empty, this is the time *in seconds* the "
             help="If the queue is empty, this is the time *in seconds* the "

+ 36 - 22
celery/monitoring.py

@@ -7,7 +7,7 @@ class Statistics(object):
     type = None
     type = None
 
 
     def __init__(self, **kwargs):
     def __init__(self, **kwargs):
-        self.enabled = getattr(settings, "CELERY_STATISTICS", False))
+        self.enabled = getattr(settings, "CELERY_STATISTICS", False)
         if not self.type:
         if not self.type:
             raise NotImplementedError(
             raise NotImplementedError(
                 "Statistic classes must define their type.")
                 "Statistic classes must define their type.")
@@ -21,19 +21,19 @@ class Statistics(object):
         publisher.close()
         publisher.close()
         connection.close()
         connection.close()
 
 
-        @classmethod
-        def start(cls, *args, **kwargs):
-            stat = cls()
-            stat.run()
-            return stat
+    @classmethod
+    def start(cls, *args, **kwargs):
+        stat = cls()
+        stat.run()
+        return stat
 
 
-        def run(self, *args, **kwargs):
-            if stat.enabled:
-                stat.on_start(*args, **kwargs)
+    def run(self, *args, **kwargs):
+        if stat.enabled:
+            stat.on_start(*args, **kwargs)
 
 
-        def stop(self, *args, **kwargs):
-            if self.enabled:
-                self.on_finish(*args, **kwargs)
+    def stop(self, *args, **kwargs):
+        if self.enabled:
+            self.on_finish(*args, **kwargs)
 
 
 
 
 class TimerStats(Statistics):
 class TimerStats(Statistics):
@@ -60,22 +60,36 @@ class TaskTimerStats(TimerStats):
 
 
 
 
 class StatsCollector(object):
 class StatsCollector(object):
-    allowed_stats = ["task_time_running"]
+    allowed_types = ["task_time_running"]
+    total_tasks_processed = 0
+    total_task_time_running = 0
+    total_task_time_running_by_type = {}
 
 
-    def run(self):
+    def collect(self):
         connection = DjangoAMQPConnection()
         connection = DjangoAMQPConnection()
         consumer = StatsConsumer(connection=connection)
         consumer = StatsConsumer(connection=connection)
         it = consumer.iterqueue(infinite=False)
         it = consumer.iterqueue(infinite=False)
-        total = 0
         for message in it:
         for message in it:
-            data = message.decode()
-            stat_name = data.get("type")
-            if stat_name in self.allowed_stats:
-                handler = getattr(self, stat_name)
-                handler(**data["data"])
-                total += 1
-        return total
+            stats_entry = message.decode()
+            stat_type = stats_entry["type"]
+            if stat_type in self.allowed_types:
+                handler = getattr(self, stat_type)
+                handler(**stats_entry["data"])
+        return self.on_cycle_end()
 
 
     def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
     def task_time_running(self, task_id, task_name, args, kwargs, nsecs):
+        self.total_task_time_running += nsecs
+        self.total_task_time_running_by_type[task_name] = \
+                self.total_task_time_running_by_type.get(task_name, nsecs)
+        self.total_task_time_running_by_type[task_name] += nsecs
         print("Task %s[%s](%s, %s): %d" % (
         print("Task %s[%s](%s, %s): %d" % (
                 task_id, task_name, args, kwargs, nsecs))
                 task_id, task_name, args, kwargs, nsecs))
+
+    def on_cycle_end(self):
+        print("-" * 64)
+        print("Total processing time by task type:")
+        for task_name, nsecs in self.total_task_time_running_by_type.items():
+            print("\t%s: %d" % (task_name, nsecs))
+        print("Total task processing time: %d" % (
+            self.total_task_time_running))
+        print("Total tasks processed: %d" % self.total_tasks_processed)

+ 1 - 1
celery/worker.py

@@ -87,7 +87,7 @@ def jail(task_id, task_name, func, args, kwargs):
     else:
     else:
         # Django <= 1.0.2
         # Django <= 1.0.2
         cache_scheme = cache_backend.split(":", 1)[0]
         cache_scheme = cache_backend.split(":", 1)[0]
-    if "memcached" in scheme:
+    if "memcached" in cache_scheme:
         cache.cache.close()
         cache.cache.close()
 
 
     # Backend process cleanup
     # Backend process cleanup