Browse Source

Adds memory debug stuff to celery.utils.debug

Ask Solem 13 years ago
parent
commit
293a587008
2 changed files with 50 additions and 36 deletions
  1. 47 0
      celery/utils/debug.py
  2. 3 36
      celery/worker/state.py

+ 47 - 0
celery/utils/debug.py

@@ -0,0 +1,47 @@
+from __future__ import absolute_import
+
+import os
+
+from .compat import format_d
+
+_process = None
+_mem_sample = []
+
+def ps():
+    global _process
+    if _process is None:
+        try:
+            from psutil import Process
+        except ImportError:
+            return None
+        _process = Process(os.getpid())
+    return _process
+
+def mem_rss():
+    p = ps()
+    if p is not None:
+        return "%sMB" % (format_d(p.get_memory_info().rss // 1024), )
+
+def sample(x, n=10, k=0):
+    j = len(x) // n
+    for _ in xrange(n):
+        yield x[k]
+        k += j
+
+
+def memdump():
+    if ps() is None:
+        print("- rss: (psutil not installed).")
+        return
+    if filter(None, _mem_sample):
+        print("- rss (sample):")
+        for mem in sample(_mem_sample):
+            print("-    > %s," % mem)
+        _mem_sample[:] = []
+    import gc
+    gc.collect()
+    print("- rss (end): %s." % (mem_rss()))
+
+
+def sample_mem():
+    _mem_sample.append(mem_rss())

+ 3 - 36
celery/worker/state.py

@@ -73,38 +73,16 @@ if os.environ.get("CELERY_BENCH"):  # pragma: no cover
     from time import time
     from billiard import current_process
     from celery.utils.compat import format_d
+    from celery.utils.debug import memdump, sample_mem
 
     all_count = 0
     bench_first = None
-    bench_mem_sample = []
     bench_start = None
     bench_last = None
     bench_every = int(os.environ.get("CELERY_BENCH_EVERY", 1000))
     bench_sample = []
     __reserved = task_reserved
     __ready = task_ready
-    _process = None
-
-    def ps():
-        global _process
-        if _process is None:
-            try:
-                from psutil import Process
-            except ImportError:
-                return None
-            _process = Process(os.getpid())
-        return _process
-
-    def mem_rss():
-        p = ps()
-        if p is not None:
-            return "%sMB" % (format_d(p.get_memory_info().rss // 1024), )
-
-    def sample(x, n=10, k=0):
-        j = len(x) // n
-        for _ in xrange(n):
-            yield x[k]
-            k += j
 
     if current_process()._name == 'MainProcess':
         @atexit.register
@@ -113,17 +91,7 @@ if os.environ.get("CELERY_BENCH"):  # pragma: no cover
                 print("- Time spent in benchmark: %r" % (
                     bench_last - bench_first))
                 print("- Avg: %s" % (sum(bench_sample) / len(bench_sample)))
-                if filter(None, bench_mem_sample):
-                    print("- rss (sample):")
-                    for mem in sample(bench_mem_sample):
-                        print("-    > %s," % mem)
-                    bench_mem_sample[:] = []
-                    bench_sample[:] = []
-                    import gc
-                    gc.collect()
-                    print("- rss (shutdown): %s." % (mem_rss()))
-                else:
-                    print("- rss: (psutil not installed).")
+                memdump()
 
     def task_reserved(request):  # noqa
         global bench_start
@@ -150,8 +118,7 @@ if os.environ.get("CELERY_BENCH"):  # pragma: no cover
             sys.stdout.flush()
             bench_start = bench_last = now
             bench_sample.append(diff)
-            bench_mem_sample.append(mem_rss())
-
+            sample_mem()
         return __ready(request)