rdb.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. """
  2. celery.contrib.rdb
  3. ==================
  4. Remote debugger for Celery tasks running in multiprocessing pool workers.
  5. Inspired by http://snippets.dzone.com/posts/show/7248
  6. **Usage**
  7. .. code-block:: python
  8. from celery.contrib import rdb
  9. from celery.decorators import task
  10. @task
  11. def add(x, y):
  12. result = x + y
  13. rdb.set_trace()
  14. return result
  15. **Environment Variables**
  16. .. envvar:: CELERY_RDB_HOST
  17. Hostname to bind to. Default is '127.0.01', which means the socket
  18. will only be accessible from the local host.
  19. .. envvar:: CELERY_RDB_PORT
  20. Base port to bind to. Default is 6899.
  21. The debugger will try to find an available port starting from the
  22. base port. The selected port will be logged by celeryd.
  23. :copyright: (c) 2009 - 2011 by Ask Solem.
  24. :license: BSD, see LICENSE for more details.
  25. """
  26. import errno
  27. import os
  28. import socket
  29. import sys
  30. from pdb import Pdb
  31. default_port = 6899
  32. CELERY_RDB_HOST = os.environ.get("CELERY_RDB_HOST") or "127.0.0.1"
  33. CELERY_RDB_PORT = int(os.environ.get("CELERY_RDB_PORT") or default_port)
  34. #: Holds the currently active debugger.
  35. _current = [None]
  36. _frame = getattr(sys, "_getframe")
  37. class Rdb(Pdb):
  38. me = "Remote Debugger"
  39. _prev_outs = None
  40. _sock = None
  41. def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
  42. port_search_limit=100, port_skew=+0):
  43. self.active = True
  44. try:
  45. from multiprocessing import current_process
  46. _, port_skew = current_process().name.split('-')
  47. except (ImportError, ValueError):
  48. pass
  49. port_skew = int(port_skew)
  50. self._prev_handles = sys.stdin, sys.stdout
  51. this_port = None
  52. for i in xrange(port_search_limit):
  53. self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  54. this_port = port + port_skew + i
  55. try:
  56. self._sock.bind((host, this_port))
  57. except socket.error, exc:
  58. if exc.errno in [errno.EADDRINUSE, errno.EINVAL]:
  59. continue
  60. raise
  61. else:
  62. break
  63. else:
  64. raise Exception(
  65. "%s: Could not find available port. Please set using "
  66. "environment variable CELERY_RDB_PORT" % (self.me, ))
  67. self._sock.listen(1)
  68. me = "%s:%s" % (self.me, this_port)
  69. context = self.context = {"me": me, "host": host, "port": this_port}
  70. print("%(me)s: Please telnet %(host)s %(port)s."
  71. " Type `exit` in session to continue." % context)
  72. print("%(me)s: Waiting for client..." % context)
  73. self._client, address = self._sock.accept()
  74. context["remote_addr"] = ":".join(map(str, address))
  75. print("%(me)s: In session with %(remote_addr)s" % context)
  76. self._handle = sys.stdin = sys.stdout = self._client.makefile("rw")
  77. Pdb.__init__(self, completekey="tab",
  78. stdin=self._handle, stdout=self._handle)
  79. def _close_session(self):
  80. self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
  81. self._handle.close()
  82. self._client.close()
  83. self._sock.close()
  84. self.active = False
  85. print("%(me)s: Session %(remote_addr)s ended." % self.context)
  86. def do_continue(self, arg):
  87. self._close_session()
  88. self.set_continue()
  89. return 1
  90. do_c = do_cont = do_continue
  91. def do_quit(self, arg):
  92. self._close_session()
  93. self.set_quit()
  94. return 1
  95. do_q = do_exit = do_quit
  96. def set_trace(self, frame=None):
  97. if frame is None:
  98. frame = _frame().f_back
  99. try:
  100. Pdb.set_trace(self, frame)
  101. except socket.error, exc:
  102. # connection reset by peer.
  103. if exc.errno != errno.ECONNRESET:
  104. raise
  105. def set_quit(self):
  106. # this raises a BdbQuit exception that we are unable to catch.
  107. sys.settrace(None)
  108. def debugger():
  109. """Returns the current debugger instance (if any),
  110. or creates a new one."""
  111. rdb = _current[0]
  112. if rdb is None or not rdb.active:
  113. rdb = _current[0] = Rdb()
  114. return rdb
  115. def set_trace(frame=None):
  116. """Set breakpoint at current location, or a specified frame"""
  117. if frame is None:
  118. frame = _frame().f_back
  119. return debugger().set_trace(frame)