123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- # -*- coding: utf-8 -*-
- """Utilities for debugging memory usage, blocking calls, etc."""
- from __future__ import absolute_import, print_function, unicode_literals
- import os
- import sys
- import traceback
- from contextlib import contextmanager
- from functools import partial
- from pprint import pprint
- from celery.five import WhateverIO, items, range
- from celery.platforms import signals
- try:
- from psutil import Process
- except ImportError:
- Process = None # noqa
- __all__ = [
- 'blockdetection', 'sample_mem', 'memdump', 'sample',
- 'humanbytes', 'mem_rss', 'ps', 'cry',
- ]
- UNITS = (
- (2 ** 40.0, 'TB'),
- (2 ** 30.0, 'GB'),
- (2 ** 20.0, 'MB'),
- (2 ** 10.0, 'KB'),
- (0.0, 'b'),
- )
- _process = None
- _mem_sample = []
- def _on_blocking(signum, frame):
- import inspect
- raise RuntimeError(
- 'Blocking detection timed-out at: {0}'.format(
- inspect.getframeinfo(frame)
- )
- )
- @contextmanager
- def blockdetection(timeout):
- """Context that raises an exception if process is blocking.
- Uses ``SIGALRM`` to detect blocking functions.
- """
- if not timeout:
- yield
- else:
- old_handler = signals['ALRM']
- old_handler = None if old_handler == _on_blocking else old_handler
- signals['ALRM'] = _on_blocking
- try:
- yield signals.arm_alarm(timeout)
- finally:
- if old_handler:
- signals['ALRM'] = old_handler
- signals.reset_alarm()
- def sample_mem():
- """Sample RSS memory usage.
- Statistics can then be output by calling :func:`memdump`.
- """
- current_rss = mem_rss()
- _mem_sample.append(current_rss)
- return current_rss
- def _memdump(samples=10): # pragma: no cover
- S = _mem_sample
- prev = list(S) if len(S) <= samples else sample(S, samples)
- _mem_sample[:] = []
- import gc
- gc.collect()
- after_collect = mem_rss()
- return prev, after_collect
- def memdump(samples=10, file=None): # pragma: no cover
- """Dump memory statistics.
- Will print a sample of all RSS memory samples added by
- calling :func:`sample_mem`, and in addition print
- used RSS memory after :func:`gc.collect`.
- """
- say = partial(print, file=file)
- if ps() is None:
- say('- rss: (psutil not installed).')
- return
- prev, after_collect = _memdump(samples)
- if prev:
- say('- rss (sample):')
- for mem in prev:
- say('- > {0},'.format(mem))
- say('- rss (end): {0}.'.format(after_collect))
- def sample(x, n, k=0):
- """Given a list `x` a sample of length ``n`` of that list is returned.
- For example, if `n` is 10, and `x` has 100 items, a list of every tenth.
- item is returned.
- ``k`` can be used as offset.
- """
- j = len(x) // n
- for _ in range(n):
- try:
- yield x[k]
- except IndexError:
- break
- k += j
- def hfloat(f, p=5):
- """Convert float to value suitable for humans.
- Arguments:
- f (float): The floating point number.
- p (int): Floating point precision (default is 5).
- """
- i = int(f)
- return i if i == f else '{0:.{p}}'.format(f, p=p)
- def humanbytes(s):
- """Convert bytes to human-readable form (e.g., KB, MB)."""
- return next(
- '{0}{1}'.format(hfloat(s / div if div else s), unit)
- for div, unit in UNITS if s >= div
- )
- def mem_rss():
- """Return RSS memory usage as a humanized string."""
- p = ps()
- if p is not None:
- return humanbytes(_process_memory_info(p).rss)
- def ps(): # pragma: no cover
- """Return the global :class:`psutil.Process` instance.
- Note:
- Returns :const:`None` if :pypi:`psutil` is not installed.
- """
- global _process
- if _process is None and Process is not None:
- _process = Process(os.getpid())
- return _process
- def _process_memory_info(process):
- try:
- return process.memory_info()
- except AttributeError:
- return process.get_memory_info()
- def cry(out=None, sepchr='=', seplen=49): # pragma: no cover
- """Return stack-trace of all active threads.
- See Also:
- Taken from https://gist.github.com/737056.
- """
- import threading
- out = WhateverIO() if out is None else out
- P = partial(print, file=out)
- # get a map of threads by their ID so we can print their names
- # during the traceback dump
- tmap = {t.ident: t for t in threading.enumerate()}
- sep = sepchr * seplen
- for tid, frame in items(sys._current_frames()):
- thread = tmap.get(tid)
- if not thread:
- # skip old junk (left-overs from a fork)
- continue
- P('{0.name}'.format(thread))
- P(sep)
- traceback.print_stack(frame, file=out)
- P(sep)
- P('LOCAL VARIABLES')
- P(sep)
- pprint(frame.f_locals, stream=out)
- P('\n')
- return out.getvalue()
|