Python3.0rc1 windowsxp
in the lib\asynchat.py
def handle_write (self):
self.initiate_s end()
def push (self, data):
sabs = self.ac_out_buf fer_size
if len(data) sabs:
for i in range(0, len(data), sabs):
self.producer_f ifo.append(data[i:i+sabs])
else:
self.producer_f ifo.append(data )
self.initiate_s end()
when there's only one time connection, the object works just fine. but
problems came out when the client disconnected and reconnected again
to the server, it seems there are two ways to call the initiate_send,
one is from push() which I called in my program, one is from
handle_write() which automatically called in asyncore.loop() . I just
can't get it why one time connection works fine but multi-time
connection went bad.
I printed the traceback. I found when one time connection made, the
handle_write() always get silent, but when the second time, it get
called and start to call initiate_send in the same time as push() get
called. So confusing....
So I tried to remove the initiate_send from push() and the code
magically works fine for me.
the main program lists below:
since it's need a flash client, I attached a webpage to reproduce the problem
click on the connect button multiple times and clicked on the send
button will make an error
import asyncore, asynchat
import os, socket, string
from multiprocessing import Process,Manager
import pickle
import _thread
import threading
PORT = 80
policyRequest = b"<policy-file-request/>"
policyReturn = b"""<cross-domain-policy>
<allow-access-from domain="*" to-ports="*" />
</cross-domain-policy\x00"""
def handler(taskLis t,msgList):
while 1:
print('getting task')
item = pickle.loads(ta skList.get())
print('item before handle ', item)
#do something
item['msg'] += b' hanlded done'
msgList.put(pic kle.dumps(item) )
def findClient(id):
for item in clients:
if item.idx == id:
return item
def pushData(ch,dat a):
global pushLock
pushLock.acquir e()
try:
ch.push(data)
finally:
pushLock.releas e()
def sender():
global msgList
print('thread started')
while 1:
item = pickle.loads(ms gList.get())
#print time()
c = findClient(item['cid'])
#print time()
#wrong here it's not thread safe, need some wrapper
#c.push(item['msg'])
pushData(c,item['msg'])
print('msg sent ',item['msg'])
#print time()
class HTTPChannel(asy nchat.async_cha t):
def __init__(self, server, sock, addr):
global cid;
asynchat.async_ chat.__init__(s elf, sock)
self.set_termin ator(b"\x00")
self.data = b""
cid += 1
self.idx = cid
if not self in clients:
print('add to clients:',self)
clients.append( self)
def collect_incomin g_data(self, data):
self.data = self.data + data
print(data)
def found_terminato r(self):
global taskList
print("found",s elf.data)
if self.data == policyRequest:
pushData(self,p olicyReturn)
self.close_when _done()
else:
d = {'cid':self.idx ,'msg':self.dat a}
taskList.put(pi ckle.dumps(d))
self.data = b""
def handle_close(se lf):
if self in clients:
print('remove from clients:',self)
clients.remove( self)
class HTTPServer(asyn core.dispatcher ):
def __init__(self, port):
asyncore.dispat cher.__init__(s elf)
self.create_soc ket(socket.AF_I NET, socket.SOCK_STR EAM)
self.bind(("", port))
self.listen(5)
def handle_accept(s elf):
conn, addr = self.accept()
print('a new customer!')
HTTPChannel(sel f, conn, addr)
#
# try it out
if __name__ == "__main__":
s = HTTPServer(PORT )
print ("serving at port", PORT, "...")
#push data lock
pushLock = threading.Lock( )
clients=[]
cid = 0
manager = Manager()
taskList = manager.Queue()
msgList = manager.Queue()
h = Process(target= handler,args=(t askList,msgList ))
h.start()
_thread.start_n ew_thread(sende r,())
print('entering loop')
asyncore.loop()
in the lib\asynchat.py
def handle_write (self):
self.initiate_s end()
def push (self, data):
sabs = self.ac_out_buf fer_size
if len(data) sabs:
for i in range(0, len(data), sabs):
self.producer_f ifo.append(data[i:i+sabs])
else:
self.producer_f ifo.append(data )
self.initiate_s end()
when there's only one time connection, the object works just fine. but
problems came out when the client disconnected and reconnected again
to the server, it seems there are two ways to call the initiate_send,
one is from push() which I called in my program, one is from
handle_write() which automatically called in asyncore.loop() . I just
can't get it why one time connection works fine but multi-time
connection went bad.
I printed the traceback. I found when one time connection made, the
handle_write() always get silent, but when the second time, it get
called and start to call initiate_send in the same time as push() get
called. So confusing....
So I tried to remove the initiate_send from push() and the code
magically works fine for me.
the main program lists below:
since it's need a flash client, I attached a webpage to reproduce the problem
click on the connect button multiple times and clicked on the send
button will make an error
import asyncore, asynchat
import os, socket, string
from multiprocessing import Process,Manager
import pickle
import _thread
import threading
PORT = 80
policyRequest = b"<policy-file-request/>"
policyReturn = b"""<cross-domain-policy>
<allow-access-from domain="*" to-ports="*" />
</cross-domain-policy\x00"""
def handler(taskLis t,msgList):
while 1:
print('getting task')
item = pickle.loads(ta skList.get())
print('item before handle ', item)
#do something
item['msg'] += b' hanlded done'
msgList.put(pic kle.dumps(item) )
def findClient(id):
for item in clients:
if item.idx == id:
return item
def pushData(ch,dat a):
global pushLock
pushLock.acquir e()
try:
ch.push(data)
finally:
pushLock.releas e()
def sender():
global msgList
print('thread started')
while 1:
item = pickle.loads(ms gList.get())
#print time()
c = findClient(item['cid'])
#print time()
#wrong here it's not thread safe, need some wrapper
#c.push(item['msg'])
pushData(c,item['msg'])
print('msg sent ',item['msg'])
#print time()
class HTTPChannel(asy nchat.async_cha t):
def __init__(self, server, sock, addr):
global cid;
asynchat.async_ chat.__init__(s elf, sock)
self.set_termin ator(b"\x00")
self.data = b""
cid += 1
self.idx = cid
if not self in clients:
print('add to clients:',self)
clients.append( self)
def collect_incomin g_data(self, data):
self.data = self.data + data
print(data)
def found_terminato r(self):
global taskList
print("found",s elf.data)
if self.data == policyRequest:
pushData(self,p olicyReturn)
self.close_when _done()
else:
d = {'cid':self.idx ,'msg':self.dat a}
taskList.put(pi ckle.dumps(d))
self.data = b""
def handle_close(se lf):
if self in clients:
print('remove from clients:',self)
clients.remove( self)
class HTTPServer(asyn core.dispatcher ):
def __init__(self, port):
asyncore.dispat cher.__init__(s elf)
self.create_soc ket(socket.AF_I NET, socket.SOCK_STR EAM)
self.bind(("", port))
self.listen(5)
def handle_accept(s elf):
conn, addr = self.accept()
print('a new customer!')
HTTPChannel(sel f, conn, addr)
#
# try it out
if __name__ == "__main__":
s = HTTPServer(PORT )
print ("serving at port", PORT, "...")
#push data lock
pushLock = threading.Lock( )
clients=[]
cid = 0
manager = Manager()
taskList = manager.Queue()
msgList = manager.Queue()
h = Process(target= handler,args=(t askList,msgList ))
h.start()
_thread.start_n ew_thread(sende r,())
print('entering loop')
asyncore.loop()