123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- # -*- coding: utf-8 -*-
- """
- celery.utils.debug
- ~~~~~~~~~~~~~~~~~~
- Utilities for debugging memory usage.
- """
- from __future__ import absolute_import, print_function
- import os
- from contextlib import contextmanager
- from functools import partial
- from celery.five import range
- from celery.platforms import signals
- try:
- from psutil import Process
- except ImportError:
- Process = None # noqa
- __all__ = [
- 'blockdetection', 'sample_mem', 'memdump', 'sample',
- 'humanbytes', 'mem_rss', 'ps',
- ]
- UNITS = (
- (2 ** 40.0, 'TB'),
- (2 ** 30.0, 'GB'),
- (2 ** 20.0, 'MB'),
- (2 ** 10.0, 'kB'),
- (0.0, '{0!d}b'),
- )
- _process = None
- _mem_sample = []
- def _on_blocking(signum, frame):
- import inspect
- raise RuntimeError(
- 'Blocking detection timed-out at: %s' % (
- inspect.getframeinfo(frame), ))
- @contextmanager
- def blockdetection(timeout):
- """A timeout context using ``SIGALRM`` that can be used to detect blocking
- functions."""
- if not timeout:
- yield
- else:
- old_handler = signals['ALRM']
- old_handler = None if old_handler == _on_blocking else old_handler
- signals['ALRM'] = _on_blocking
- try:
- yield signals.arm_alarm(timeout)
- finally:
- if old_handler:
- signals['ALRM'] = old_handler
- signals.reset_alarm()
- def sample_mem():
- """Sample RSS memory usage.
- Statistics can then be output by calling :func:`memdump`.
- """
- current_rss = mem_rss()
- _mem_sample.append(current_rss)
- return current_rss
- def _memdump(samples=10):
- S = _mem_sample
- prev = list(S) if len(S) <= samples else sample(S, samples)
- _mem_sample[:] = []
- import gc
- gc.collect()
- after_collect = mem_rss()
- return prev, after_collect
- def memdump(samples=10, file=None):
- """Dump memory statistics.
- Will print a sample of all RSS memory samples added by
- calling :func:`sample_mem`, and in addition print
- used RSS memory after :func:`gc.collect`.
- """
- say = partial(print, file=file)
- if ps() is None:
- say('- rss: (psutil not installed).')
- return
- prev, after_collect = _memdump(samples)
- if prev:
- say('- rss (sample):')
- for mem in prev:
- say('- > {0},'.format(mem))
- say('- rss (end): {0}.'.format(after_collect))
- def sample(x, n, k=0):
- """Given a list `x` a sample of length ``n`` of that list is returned.
- E.g. if `n` is 10, and `x` has 100 items, a list of every 10th
- item is returned.
- ``k`` can be used as offset.
- """
- j = len(x) // n
- for _ in range(n):
- try:
- yield x[k]
- except IndexError:
- break
- k += j
- def hfloat(f, p=5):
- """Convert float to value suitable for humans.
- :keyword p: Float precision.
- """
- i = int(f)
- return i if i == f else '{0:.{p}}'.format(f, p=p)
- def humanbytes(s):
- """Convert bytes to human-readable form (e.g. kB, MB)."""
- return next(
- '{0}{1}'.format(hfloat(s / div if div else s), unit)
- for div, unit in UNITS if s >= div
- )
- def mem_rss():
- """Return RSS memory usage as a humanized string."""
- p = ps()
- if p is not None:
- return humanbytes(p.get_memory_info().rss)
- def ps():
- """Return the global :class:`psutil.Process` instance,
- or :const:`None` if :mod:`psutil` is not installed."""
- global _process
- if _process is None and Process is not None:
- _process = Process(os.getpid())
- return _process
|