I've written a port forwarding wrapper with paramiko for an app we use
and it works fine except that it consumes way too much processor.
(i.e. 25% +) I'm trying to write a stackless-based server class to
replace the threading one but I can't get the tunnel the wrapper
creates to work for more than one instance of the app. I based my
code on the example HTTP server on the stackless site. Relevant code
follows. The glovia variable is the result of subprocess.Pope n().
<pre>
class ForwardServer (SocketServer.T CPServer):
allow_reuse_add ress = True
def __init__(self, *args, **kwargs):
SocketServer.TC PServer.__init_ _(self, *args, **kwargs)
self.socket.set blocking(0)
def serve_forever(s elf, glovia):
while glovia.poll() is None:
verbose("Handle request")
self.handle_req uest()
def handle_request( self):
try:
request, client_address = self.get_reques t()
except socket.error:
return
verbose("Adding handler tasklet")
stackless.taskl et(self.handle_ request_tasklet )(request,
client_address)
def handle_request_ tasklet(self, request, client_address) :
if self.verify_req uest(request, client_address) :
try:
self.process_re quest(request, client_address)
except:
self.handle_err or(request, client_address)
self.close_requ est(request)
class Handler (SocketServer.B aseRequestHandl er):
def handle(self):
verbose("Enteri ng Handler.handle" )
peername = self.request.ge tpeername()
try:
chan = self.ssh_transp ort.open_channe l('direct-tcpip',
(self.chain_hos t,
self.chain_port ), peername)
except Exception, e:
verbose('Incomi ng request to %s:%d failed: %s' %
(self.chain_hos t, self.chain_port , repr(e)))
return
if chan is None:
verbose('Incomi ng request to %s:%d was rejected by the SSH
server.' % (self.chain_hos t, self.chain_port ))
return
verbose('Connec ted! Tunnel open %r -%r -%r' % (peername,
chan.getpeernam e(), (self.chain_hos t, self.chain_port )))
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
data = self.request.re cv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.se nd(data)
verbose("Curren t Task: %s" %
(str(stackless. getcurrent()),) )
verbose("Schedu ling Tasks: %s" %
(str(stackless. getruncount()), ))
stackless.sched ule()
chan.close()
self.request.cl ose()
verbose("Exitin g Handler.handle" )
verbose('Tunnel closed from %r' % (peername,))
def forward_tunnel( glovia, local_port, remote_host, remote_port,
transport):
class SubHandler (Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
ForwardServer(( '', local_port), SubHandler).ser ve_forever(glov ia)
stackless.taskl et(forward_tunn el)(glovia, LOCAL_PORT, HOST,
REMOTE_PORT, client.get_tran sport())
stackless.run()
</pre>
and it works fine except that it consumes way too much processor.
(i.e. 25% +) I'm trying to write a stackless-based server class to
replace the threading one but I can't get the tunnel the wrapper
creates to work for more than one instance of the app. I based my
code on the example HTTP server on the stackless site. Relevant code
follows. The glovia variable is the result of subprocess.Pope n().
<pre>
class ForwardServer (SocketServer.T CPServer):
allow_reuse_add ress = True
def __init__(self, *args, **kwargs):
SocketServer.TC PServer.__init_ _(self, *args, **kwargs)
self.socket.set blocking(0)
def serve_forever(s elf, glovia):
while glovia.poll() is None:
verbose("Handle request")
self.handle_req uest()
def handle_request( self):
try:
request, client_address = self.get_reques t()
except socket.error:
return
verbose("Adding handler tasklet")
stackless.taskl et(self.handle_ request_tasklet )(request,
client_address)
def handle_request_ tasklet(self, request, client_address) :
if self.verify_req uest(request, client_address) :
try:
self.process_re quest(request, client_address)
except:
self.handle_err or(request, client_address)
self.close_requ est(request)
class Handler (SocketServer.B aseRequestHandl er):
def handle(self):
verbose("Enteri ng Handler.handle" )
peername = self.request.ge tpeername()
try:
chan = self.ssh_transp ort.open_channe l('direct-tcpip',
(self.chain_hos t,
self.chain_port ), peername)
except Exception, e:
verbose('Incomi ng request to %s:%d failed: %s' %
(self.chain_hos t, self.chain_port , repr(e)))
return
if chan is None:
verbose('Incomi ng request to %s:%d was rejected by the SSH
server.' % (self.chain_hos t, self.chain_port ))
return
verbose('Connec ted! Tunnel open %r -%r -%r' % (peername,
chan.getpeernam e(), (self.chain_hos t, self.chain_port )))
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
data = self.request.re cv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.se nd(data)
verbose("Curren t Task: %s" %
(str(stackless. getcurrent()),) )
verbose("Schedu ling Tasks: %s" %
(str(stackless. getruncount()), ))
stackless.sched ule()
chan.close()
self.request.cl ose()
verbose("Exitin g Handler.handle" )
verbose('Tunnel closed from %r' % (peername,))
def forward_tunnel( glovia, local_port, remote_host, remote_port,
transport):
class SubHandler (Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
ForwardServer(( '', local_port), SubHandler).ser ve_forever(glov ia)
stackless.taskl et(forward_tunn el)(glovia, LOCAL_PORT, HOST,
REMOTE_PORT, client.get_tran sport())
stackless.run()
</pre>