123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- # -*- coding: utf-8 -*-
- """
- celery.contrib.rdb
- ==================
- Remote debugger for Celery tasks running in multiprocessing pool workers.
- Inspired by http://snippets.dzone.com/posts/show/7248
- **Usage**
- .. code-block:: python
- from celery.contrib import rdb
- from celery import task
- @task()
- def add(x, y):
- result = x + y
- rdb.set_trace()
- return result
- **Environment Variables**
- .. envvar:: CELERY_RDB_HOST
- Hostname to bind to. Default is '127.0.01', which means the socket
- will only be accessible from the local host.
- .. envvar:: CELERY_RDB_PORT
- Base port to bind to. Default is 6899.
- The debugger will try to find an available port starting from the
- base port. The selected port will be logged by the worker.
- """
- from __future__ import absolute_import
- import errno
- import os
- import socket
- import sys
- from pdb import Pdb
- from billiard import current_process
- default_port = 6899
- CELERY_RDB_HOST = os.environ.get('CELERY_RDB_HOST') or '127.0.0.1'
- CELERY_RDB_PORT = int(os.environ.get('CELERY_RDB_PORT') or default_port)
- #: Holds the currently active debugger.
- _current = [None]
- _frame = getattr(sys, '_getframe')
- class Rdb(Pdb):
- me = 'Remote Debugger'
- _prev_outs = None
- _sock = None
- def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
- port_search_limit=100, port_skew=+0, out=sys.stdout):
- self.active = True
- self.out = out
- self._prev_handles = sys.stdin, sys.stdout
- self._sock, this_port = self.get_avail_port(host, port,
- port_search_limit, port_skew)
- self._sock.listen(1)
- me = '%s:%s' % (self.me, this_port)
- context = self.context = {'me': me, 'host': host, 'port': this_port}
- self.say('%(me)s: Please telnet %(host)s %(port)s.'
- ' Type `exit` in session to continue.' % context)
- self.say('%(me)s: Waiting for client...' % context)
- self._client, address = self._sock.accept()
- context['remote_addr'] = ':'.join(map(str, address))
- self.say('%(me)s: In session with %(remote_addr)s' % context)
- self._handle = sys.stdin = sys.stdout = self._client.makefile('rw')
- Pdb.__init__(self, completekey='tab',
- stdin=self._handle, stdout=self._handle)
- def get_avail_port(self, host, port, search_limit=100, skew=+0):
- try:
- _, skew = current_process().name.split('-')
- skew = int(skew)
- except ValueError:
- pass
- this_port = None
- for i in xrange(search_limit):
- _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- this_port = port + skew + i
- try:
- _sock.bind((host, this_port))
- except socket.error, exc:
- if exc.errno in [errno.EADDRINUSE, errno.EINVAL]:
- continue
- raise
- else:
- return _sock, this_port
- else:
- raise Exception(
- '%s: Could not find available port. Please set using '
- 'environment variable CELERY_RDB_PORT' % (self.me, ))
- def say(self, m):
- self.out.write(m + '\n')
- def _close_session(self):
- self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
- self._handle.close()
- self._client.close()
- self._sock.close()
- self.active = False
- self.say('%(me)s: Session %(remote_addr)s ended.' % self.context)
- def do_continue(self, arg):
- self._close_session()
- self.set_continue()
- return 1
- do_c = do_cont = do_continue
- def do_quit(self, arg):
- self._close_session()
- self.set_quit()
- return 1
- do_q = do_exit = do_quit
- def set_trace(self, frame=None):
- if frame is None:
- frame = _frame().f_back
- try:
- Pdb.set_trace(self, frame)
- except socket.error, exc:
- # connection reset by peer.
- if exc.errno != errno.ECONNRESET:
- raise
- def set_quit(self):
- # this raises a BdbQuit exception that we are unable to catch.
- sys.settrace(None)
- def debugger():
- """Returns the current debugger instance (if any),
- or creates a new one."""
- rdb = _current[0]
- if rdb is None or not rdb.active:
- rdb = _current[0] = Rdb()
- return rdb
- def set_trace(frame=None):
- """Set breakpoint at current location, or a specified frame"""
- if frame is None:
- frame = _frame().f_back
- return debugger().set_trace(frame)
|