debug.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.debug
  4. ~~~~~~~~~~~~~~~~~~
  5. Utilities for debugging memory usage.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import os
  9. from contextlib import contextmanager
  10. from functools import partial
  11. from celery.five import range
  12. from celery.platforms import signals
  13. try:
  14. from psutil import Process
  15. except ImportError:
  16. Process = None # noqa
  17. _process = None
  18. _mem_sample = []
  19. def _on_blocking(signum, frame):
  20. import inspect
  21. raise RuntimeError(
  22. 'Blocking detection timed-out at: %s' % (
  23. inspect.getframeinfo(frame), ))
  24. @contextmanager
  25. def blockdetection(timeout):
  26. if not timeout:
  27. yield
  28. else:
  29. old_handler = signals['ALRM']
  30. old_handler = None if old_handler == _on_blocking else old_handler
  31. signals['ALRM'] = _on_blocking
  32. try:
  33. yield signals.arm_alarm(timeout)
  34. finally:
  35. if old_handler:
  36. signals['ALRM'] = old_handler
  37. signals.reset_alarm()
  38. def sample_mem():
  39. """Sample RSS memory usage.
  40. Statistics can then be output by calling :func:`memdump`.
  41. """
  42. current_rss = mem_rss()
  43. _mem_sample.append(current_rss)
  44. return current_rss
  45. def _memdump(samples=10):
  46. S = _mem_sample
  47. prev = list(S) if len(S) <= samples else sample(S, samples)
  48. _mem_sample[:] = []
  49. import gc
  50. gc.collect()
  51. after_collect = mem_rss()
  52. return prev, after_collect
  53. def memdump(samples=10, file=None):
  54. """Dump memory statistics.
  55. Will print a sample of all RSS memory samples added by
  56. calling :func:`sample_mem`, and in addition print
  57. used RSS memory after :func:`gc.collect`.
  58. """
  59. say = partial(print, file=file)
  60. if ps() is None:
  61. say('- rss: (psutil not installed).')
  62. return
  63. prev, after_collect = _memdump(samples)
  64. if prev:
  65. say('- rss (sample):')
  66. for mem in prev:
  67. say('- > {0},'.format(mem))
  68. say('- rss (end): {0}.'.format(after_collect))
  69. def sample(x, n, k=0):
  70. """Given a list `x` a sample of length ``n`` of that list is returned.
  71. E.g. if `n` is 10, and `x` has 100 items, a list of every 10th
  72. item is returned.
  73. ``k`` can be used as offset.
  74. """
  75. j = len(x) // n
  76. for _ in range(n):
  77. yield x[k]
  78. k += j
  79. UNITS = (
  80. (2 ** 40.0, 'TB'),
  81. (2 ** 30.0, 'GB'),
  82. (2 ** 20.0, 'MB'),
  83. (2 ** 10.0, 'kB'),
  84. (0.0, '{0!d}b'),
  85. )
  86. def hfloat(f, p=5):
  87. i = int(f)
  88. return i if i == f else '{0:.{p}}'.format(f, p=p)
  89. def humanbytes(s):
  90. return next(
  91. '{0}{1}'.format(hfloat(s / div if div else s), unit)
  92. for div, unit in UNITS if s >= div
  93. )
  94. def mem_rss():
  95. """Returns RSS memory usage as a humanized string."""
  96. p = ps()
  97. if p is not None:
  98. return humanbytes(p.get_memory_info().rss)
  99. def ps():
  100. """Returns the global :class:`psutil.Process` instance,
  101. or :const:`None` if :mod:`psutil` is not installed."""
  102. global _process
  103. if _process is None and Process is not None:
  104. _process = Process(os.getpid())
  105. return _process