rdb.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.contrib.rdb
  4. ==================
  5. Remote debugger for Celery tasks running in multiprocessing pool workers.
  6. Inspired by http://snippets.dzone.com/posts/show/7248
  7. **Usage**
  8. .. code-block:: python
  9. from celery.contrib import rdb
  10. from celery import task
  11. @task()
  12. def add(x, y):
  13. result = x + y
  14. rdb.set_trace()
  15. return result
  16. **Environment Variables**
  17. .. envvar:: CELERY_RDB_HOST
  18. Hostname to bind to. Default is '127.0.01', which means the socket
  19. will only be accessible from the local host.
  20. .. envvar:: CELERY_RDB_PORT
  21. Base port to bind to. Default is 6899.
  22. The debugger will try to find an available port starting from the
  23. base port. The selected port will be logged by the worker.
  24. """
  25. from __future__ import absolute_import
  26. import errno
  27. import os
  28. import socket
  29. import sys
  30. from pdb import Pdb
  31. from billiard import current_process
  32. default_port = 6899
  33. CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
  34. CELERY_RDB_PORT = int(os.environ.get('CELERY_RDB_PORT') or default_port)
  35. #: Holds the currently active debugger.
  36. _current = [None]
  37. _frame = getattr(sys, '_getframe')
  38. class Rdb(Pdb):
  39. me = 'Remote Debugger'
  40. _prev_outs = None
  41. _sock = None
  42. def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
  43. port_search_limit=100, port_skew=+0, out=sys.stdout):
  44. self.active = True
  45. self.out = out
  46. self._prev_handles = sys.stdin, sys.stdout
  47. self._sock, this_port = self.get_avail_port(host, port,
  48. port_search_limit, port_skew)
  49. self._sock.listen(1)
  50. me = '%s:%s' % (self.me, this_port)
  51. context = self.context = {'me': me, 'host': host, 'port': this_port}
  52. self.say('%(me)s: Please telnet %(host)s %(port)s.'
  53. ' Type `exit` in session to continue.' % context)
  54. self.say('%(me)s: Waiting for client...' % context)
  55. self._client, address = self._sock.accept()
  56. context['remote_addr'] = ':'.join(map(str, address))
  57. self.say('%(me)s: In session with %(remote_addr)s' % context)
  58. self._handle = sys.stdin = sys.stdout = self._client.makefile('rw')
  59. Pdb.__init__(self, completekey='tab',
  60. stdin=self._handle, stdout=self._handle)
  61. def get_avail_port(self, host, port, search_limit=100, skew=+0):
  62. try:
  63. _, skew = current_process().name.split('-')
  64. skew = int(skew)
  65. except ValueError:
  66. pass
  67. this_port = None
  68. for i in xrange(search_limit):
  69. _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  70. this_port = port + skew + i
  71. try:
  72. _sock.bind((host, this_port))
  73. except socket.error, exc:
  74. if exc.errno in [errno.EADDRINUSE, errno.EINVAL]:
  75. continue
  76. raise
  77. else:
  78. return _sock, this_port
  79. else:
  80. raise Exception(
  81. '%s: Could not find available port. Please set using '
  82. 'environment variable CELERY_RDB_PORT' % (self.me, ))
  83. def say(self, m):
  84. self.out.write(m + '\n')
  85. def _close_session(self):
  86. self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
  87. self._handle.close()
  88. self._client.close()
  89. self._sock.close()
  90. self.active = False
  91. self.say('%(me)s: Session %(remote_addr)s ended.' % self.context)
  92. def do_continue(self, arg):
  93. self._close_session()
  94. self.set_continue()
  95. return 1
  96. do_c = do_cont = do_continue
  97. def do_quit(self, arg):
  98. self._close_session()
  99. self.set_quit()
  100. return 1
  101. do_q = do_exit = do_quit
  102. def set_trace(self, frame=None):
  103. if frame is None:
  104. frame = _frame().f_back
  105. try:
  106. Pdb.set_trace(self, frame)
  107. except socket.error, exc:
  108. # connection reset by peer.
  109. if exc.errno != errno.ECONNRESET:
  110. raise
  111. def set_quit(self):
  112. # this raises a BdbQuit exception that we are unable to catch.
  113. sys.settrace(None)
  114. def debugger():
  115. """Returns the current debugger instance (if any),
  116. or creates a new one."""
  117. rdb = _current[0]
  118. if rdb is None or not rdb.active:
  119. rdb = _current[0] = Rdb()
  120. return rdb
  121. def set_trace(frame=None):
  122. """Set breakpoint at current location, or a specified frame"""
  123. if frame is None:
  124. frame = _frame().f_back
  125. return debugger().set_trace(frame)