Просмотр исходного кода

Adds remote control commands memsample + memdump

Ask Solem 12 лет назад
Родитель
Сommit
c137d79242
4 измененных файлов с 53 добавлено и 15 удалено
  1. 6 0
      celery/app/control.py
  2. 3 1
      celery/bin/celery.py
  3. 29 13
      celery/utils/debug.py
  4. 15 1
      celery/worker/control.py

+ 6 - 0
celery/app/control.py

@@ -91,6 +91,12 @@ class Inspect(object):
     def hello(self):
         return self._request('hello')
 
+    def memsample(self):
+        return self._request('memsample')
+
+    def memdump(self, samples=10):
+        return self._request('memdump', samples=samples)
+
 
 class Control(object):
     Mailbox = Mailbox

+ 3 - 1
celery/bin/celery.py

@@ -365,7 +365,9 @@ class inspect(_RemoteControl):
         'ping': (0.2, 'ping worker(s)'),
         'clock': (1.0, 'get value of logical clock'),
         'conf': (1.0, 'dump worker configuration'),
-        'report': (1.0, 'get bugreport info')
+        'report': (1.0, 'get bugreport info'),
+        'memsample': (1.0, 'sample memory (requires psutil)'),
+        'memdump': (1.0, 'dump memory samples (requires psutil)'),
     }
 
     def call(self, method, *args, **options):

+ 29 - 13
celery/utils/debug.py

@@ -6,11 +6,12 @@
     Utilities for debugging memory usage.
 
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 import os
 
 from contextlib import contextmanager
+from functools import partial
 
 from celery.five import format_d, range
 from celery.platforms import signals
@@ -55,10 +56,22 @@ def sample_mem():
     Statistics can then be output by calling :func:`memdump`.
 
     """
-    _mem_sample.append(mem_rss())
+    current_rss = mem_rss()
+    _mem_sample.append(current_rss)
+    return current_rss
 
 
-def memdump(samples=10):
+def _memdump(samples=10):
+    S = _mem_sample
+    prev = list(S) if len(S) <= samples else sample(S, samples)
+    _mem_sample[:] = []
+    import gc
+    gc.collect()
+    after_collect = mem_rss()
+    return prev, after_collect
+
+
+def memdump(samples=10, file=None):
     """Dump memory statistics.
 
     Will print a sample of all RSS memory samples added by
@@ -66,17 +79,16 @@ def memdump(samples=10):
     used RSS memory after :func:`gc.collect`.
 
     """
+    say = partial(print, file=file)
     if ps() is None:
-        print('- rss: (psutil not installed).')
+        say('- rss: (psutil not installed).')
         return
-    if any(_mem_sample):
-        print('- rss (sample):')
-        for mem in sample(_mem_sample, samples):
-            print('-    > {0},'.format(mem))
-        _mem_sample[:] = []
-    import gc
-    gc.collect()
-    print('- rss (end): {0}.'.format(mem_rss()))
+    prev, after_collect = _memdump(samples)
+    if prev:
+        say('- rss (sample):')
+        for mem in prev:
+            say('-    > {0},'.format(mem))
+    say('- rss (end): {0}.'.format(after_collect))
 
 
 def sample(x, n, k=0):
@@ -94,11 +106,15 @@ def sample(x, n, k=0):
         k += j
 
 
+def humanbytes(s):
+    return '{0}MB'.format(format_d(s // 1024))
+
+
 def mem_rss():
     """Returns RSS memory usage as a humanized string."""
     p = ps()
     if p is not None:
-        return '{0}MB'.format(format_d(p.get_memory_info().rss // 1024))
+        return humanbytes(p.get_memory_info().rss)
 
 
 def ps():

+ 15 - 1
celery/worker/control.py

@@ -12,7 +12,7 @@ import logging
 
 from kombu.utils.encoding import safe_repr
 
-from celery.five import UserDict, items
+from celery.five import UserDict, items, StringIO
 from celery.platforms import signals as _signals
 from celery.utils import timeutils
 from celery.utils.log import get_logger
@@ -175,6 +175,20 @@ def stats(panel, **kwargs):
     return panel.consumer.controller.stats()
 
 
+@Panel.register
+def memsample(panel, **kwargs):
+    from celery.utils.debug import sample_mem
+    return sample_mem()
+
+
+@Panel.register
+def memdump(panel, samples=10, **kwargs):
+    from celery.utils.debug import memdump
+    out = StringIO()
+    memdump(file=out)
+    return out.getvalue()
+
+
 @Panel.register
 def clock(panel, **kwargs):
     return {'clock': panel.app.clock.value}