debug.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.debug
  4. ~~~~~~~~~~~~~~~~~~
  5. Utilities for debugging memory usage.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import os
  9. from contextlib import contextmanager
  10. from functools import partial
  11. from celery.five import range
  12. from celery.platforms import signals
  13. try:
  14. from psutil import Process
  15. except ImportError:
  16. Process = None # noqa
  17. __all__ = [
  18. 'blockdetection', 'sample_mem', 'memdump', 'sample',
  19. 'humanbytes', 'mem_rss', 'ps',
  20. ]
  21. UNITS = (
  22. (2 ** 40.0, 'TB'),
  23. (2 ** 30.0, 'GB'),
  24. (2 ** 20.0, 'MB'),
  25. (2 ** 10.0, 'kB'),
  26. (0.0, '{0!d}b'),
  27. )
  28. _process = None
  29. _mem_sample = []
  30. def _on_blocking(signum, frame):
  31. import inspect
  32. raise RuntimeError(
  33. 'Blocking detection timed-out at: {0}'.format(
  34. inspect.getframeinfo(frame)
  35. )
  36. )
  37. @contextmanager
  38. def blockdetection(timeout):
  39. """A timeout context using ``SIGALRM`` that can be used to detect blocking
  40. functions."""
  41. if not timeout:
  42. yield
  43. else:
  44. old_handler = signals['ALRM']
  45. old_handler = None if old_handler == _on_blocking else old_handler
  46. signals['ALRM'] = _on_blocking
  47. try:
  48. yield signals.arm_alarm(timeout)
  49. finally:
  50. if old_handler:
  51. signals['ALRM'] = old_handler
  52. signals.reset_alarm()
  53. def sample_mem():
  54. """Sample RSS memory usage.
  55. Statistics can then be output by calling :func:`memdump`.
  56. """
  57. current_rss = mem_rss()
  58. _mem_sample.append(current_rss)
  59. return current_rss
  60. def _memdump(samples=10):
  61. S = _mem_sample
  62. prev = list(S) if len(S) <= samples else sample(S, samples)
  63. _mem_sample[:] = []
  64. import gc
  65. gc.collect()
  66. after_collect = mem_rss()
  67. return prev, after_collect
  68. def memdump(samples=10, file=None):
  69. """Dump memory statistics.
  70. Will print a sample of all RSS memory samples added by
  71. calling :func:`sample_mem`, and in addition print
  72. used RSS memory after :func:`gc.collect`.
  73. """
  74. say = partial(print, file=file)
  75. if ps() is None:
  76. say('- rss: (psutil not installed).')
  77. return
  78. prev, after_collect = _memdump(samples)
  79. if prev:
  80. say('- rss (sample):')
  81. for mem in prev:
  82. say('- > {0},'.format(mem))
  83. say('- rss (end): {0}.'.format(after_collect))
  84. def sample(x, n, k=0):
  85. """Given a list `x` a sample of length ``n`` of that list is returned.
  86. E.g. if `n` is 10, and `x` has 100 items, a list of every 10th
  87. item is returned.
  88. ``k`` can be used as offset.
  89. """
  90. j = len(x) // n
  91. for _ in range(n):
  92. try:
  93. yield x[k]
  94. except IndexError:
  95. break
  96. k += j
  97. def hfloat(f, p=5):
  98. """Convert float to value suitable for humans.
  99. :keyword p: Float precision.
  100. """
  101. i = int(f)
  102. return i if i == f else '{0:.{p}}'.format(f, p=p)
  103. def humanbytes(s):
  104. """Convert bytes to human-readable form (e.g. kB, MB)."""
  105. return next(
  106. '{0}{1}'.format(hfloat(s / div if div else s), unit)
  107. for div, unit in UNITS if s >= div
  108. )
  109. def mem_rss():
  110. """Return RSS memory usage as a humanized string."""
  111. p = ps()
  112. if p is not None:
  113. return humanbytes(p.get_memory_info().rss)
  114. def ps():
  115. """Return the global :class:`psutil.Process` instance,
  116. or :const:`None` if :mod:`psutil` is not installed."""
  117. global _process
  118. if _process is None and Process is not None:
  119. _process = Process(os.getpid())
  120. return _process