rdb.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # -*- coding: utf-8 -*-
  2. """Remote Debugger.
  3. Introduction
  4. ============
  5. This is a remote debugger for Celery tasks running in multiprocessing
  6. pool workers. Inspired by http://snippets.dzone.com/posts/show/7248
  7. Usage
  8. -----
  9. .. code-block:: python
  10. from celery.contrib import rdb
  11. from celery import task
  12. @task()
  13. def add(x, y):
  14. result = x + y
  15. rdb.set_trace()
  16. return result
  17. Environment Variables
  18. =====================
  19. .. envvar:: CELERY_RDB_HOST
  20. ``CELERY_RDB_HOST``
  21. -------------------
  22. Hostname to bind to. Default is '127.0.01' (only accessable from
  23. localhost).
  24. .. envvar:: CELERY_RDB_PORT
  25. ``CELERY_RDB_PORT``
  26. -------------------
  27. Base port to bind to. Default is 6899.
  28. The debugger will try to find an available port starting from the
  29. base port. The selected port will be logged by the worker.
  30. """
  31. from __future__ import absolute_import, print_function, unicode_literals
  32. import errno
  33. import os
  34. import socket
  35. import sys
  36. from pdb import Pdb
  37. from billiard.process import current_process
  38. from celery.five import range
  39. __all__ = [
  40. 'CELERY_RDB_HOST', 'CELERY_RDB_PORT', 'DEFAULT_PORT',
  41. 'Rdb', 'debugger', 'set_trace',
  42. ]
  43. DEFAULT_PORT = 6899
  44. CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
  45. CELERY_RDB_PORT = int(os.environ.get('CELERY_RDB_PORT') or DEFAULT_PORT)
  46. #: Holds the currently active debugger.
  47. _current = [None]
  48. _frame = getattr(sys, '_getframe')
  49. NO_AVAILABLE_PORT = """\
  50. {self.ident}: Couldn't find an available port.
  51. Please specify one using the CELERY_RDB_PORT environment variable.
  52. """
  53. BANNER = """\
  54. {self.ident}: Ready to connect: telnet {self.host} {self.port}
  55. Type `exit` in session to continue.
  56. {self.ident}: Waiting for client...
  57. """
  58. SESSION_STARTED = '{self.ident}: Now in session with {self.remote_addr}.'
  59. SESSION_ENDED = '{self.ident}: Session with {self.remote_addr} ended.'
  60. class Rdb(Pdb):
  61. """Remote debugger."""
  62. me = 'Remote Debugger'
  63. _prev_outs = None
  64. _sock = None
  65. def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
  66. port_search_limit=100, port_skew=+0, out=sys.stdout):
  67. self.active = True
  68. self.out = out
  69. self._prev_handles = sys.stdin, sys.stdout
  70. self._sock, this_port = self.get_avail_port(
  71. host, port, port_search_limit, port_skew,
  72. )
  73. self._sock.setblocking(1)
  74. self._sock.listen(1)
  75. self.ident = '{0}:{1}'.format(self.me, this_port)
  76. self.host = host
  77. self.port = this_port
  78. self.say(BANNER.format(self=self))
  79. self._client, address = self._sock.accept()
  80. self._client.setblocking(1)
  81. self.remote_addr = ':'.join(str(v) for v in address)
  82. self.say(SESSION_STARTED.format(self=self))
  83. self._handle = sys.stdin = sys.stdout = self._client.makefile('rw')
  84. Pdb.__init__(self, completekey='tab',
  85. stdin=self._handle, stdout=self._handle)
  86. def get_avail_port(self, host, port, search_limit=100, skew=+0):
  87. try:
  88. _, skew = current_process().name.split('-')
  89. skew = int(skew)
  90. except ValueError:
  91. pass
  92. this_port = None
  93. for i in range(search_limit):
  94. _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  95. this_port = port + skew + i
  96. try:
  97. _sock.bind((host, this_port))
  98. except socket.error as exc:
  99. if exc.errno in [errno.EADDRINUSE, errno.EINVAL]:
  100. continue
  101. raise
  102. else:
  103. return _sock, this_port
  104. else:
  105. raise Exception(NO_AVAILABLE_PORT.format(self=self))
  106. def say(self, m):
  107. print(m, file=self.out)
  108. def __enter__(self):
  109. return self
  110. def __exit__(self, *exc_info):
  111. self._close_session()
  112. def _close_session(self):
  113. self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
  114. if self.active:
  115. if self._handle is not None:
  116. self._handle.close()
  117. if self._client is not None:
  118. self._client.close()
  119. if self._sock is not None:
  120. self._sock.close()
  121. self.active = False
  122. self.say(SESSION_ENDED.format(self=self))
  123. def do_continue(self, arg):
  124. self._close_session()
  125. self.set_continue()
  126. return 1
  127. do_c = do_cont = do_continue
  128. def do_quit(self, arg):
  129. self._close_session()
  130. self.set_quit()
  131. return 1
  132. do_q = do_exit = do_quit
  133. def set_quit(self):
  134. # this raises a BdbQuit exception that we're unable to catch.
  135. sys.settrace(None)
  136. def debugger():
  137. """Return the current debugger instance, or create if none."""
  138. rdb = _current[0]
  139. if rdb is None or not rdb.active:
  140. rdb = _current[0] = Rdb()
  141. return rdb
  142. def set_trace(frame=None):
  143. """Set break-point at current location, or a specified frame."""
  144. if frame is None:
  145. frame = _frame().f_back
  146. return debugger().set_trace(frame)