Hi, everybody!
I wrote a useful class ThreadPoolingMi xIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.
Is it worth including in SocketServer.py ?
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMi xIn(ThreadingMi xIn):
"""Mix-in class to handle requests in a thread
pool.
The pool grows and thrinks depending on
load.
For instance, a threading UDP server class is created as
follows:
class ThreadPoolingUD PServer(ThreadP oolingMixIn, UDPServer):
pass
"""
__author__ = 'Pavel Uvarov <pavel.uvarov@g mail.com>'
def init_thread_poo l(self, min_workers = 5,
max_workers = 100, min_spare_worke rs = 5):
"""Initiali ze thread pool."""
self.q = Queue.Queue()
self.min_worker s = min_workers
self.max_worker s = max_workers
self.min_spare_ workers = min_spare_worke rs
self.num_worker s = 0
self.num_busy_w orkers = 0
self.workers_mu tex = threading.Lock( )
self.start_work ers(self.min_wo rkers)
def start_workers(s elf, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Threa d(target = self.worker)
t.setDaemon(Tru e)
t.start()
def worker(self):
"""A function of a working
thread.
It gets a request from queue (blocking if
there
are no requests) and processes
it.
After processing it checks how many spare
workers
are there now and if this value is greater
than
self.min_spare_ workers then the worker
exits.
Otherwise it loops
infinitely.
"""
with self.workers_mu tex:
self.num_worker s += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mu tex:
self.num_busy_w orkers += 1
self.process_re quest_thread(re quest, client_address)
self.q.task_don e()
with self.workers_mu tex:
self.num_busy_w orkers -= 1
if self.num_worker s - self.num_busy_w orkers \
self.min_spare_ workers:
self.num_worker s -= 1
return
def process_request (self, request, client_address) :
"""Puts a request into
queue.
If the queue size is too large, it adds extra
worker.
"""
self.q.put((req uest, client_address) )
with self.workers_mu tex:
if self.q.qsize() 3 and self.num_worker s <
self.max_worker s:
self.start_work ers(1)
def join(self):
"""Wait for all busy threads"""
self.q.join()
I wrote a useful class ThreadPoolingMi xIn which can be used to create
fast thread-based servers. This mix-in works much faster than
ThreadingMixIn because it doesn't create a new thread on each request.
Is it worth including in SocketServer.py ?
from __future__ import with_statement
from SocketServer import ThreadingMixIn
import threading
import Queue
class ThreadPoolingMi xIn(ThreadingMi xIn):
"""Mix-in class to handle requests in a thread
pool.
The pool grows and thrinks depending on
load.
For instance, a threading UDP server class is created as
follows:
class ThreadPoolingUD PServer(ThreadP oolingMixIn, UDPServer):
pass
"""
__author__ = 'Pavel Uvarov <pavel.uvarov@g mail.com>'
def init_thread_poo l(self, min_workers = 5,
max_workers = 100, min_spare_worke rs = 5):
"""Initiali ze thread pool."""
self.q = Queue.Queue()
self.min_worker s = min_workers
self.max_worker s = max_workers
self.min_spare_ workers = min_spare_worke rs
self.num_worker s = 0
self.num_busy_w orkers = 0
self.workers_mu tex = threading.Lock( )
self.start_work ers(self.min_wo rkers)
def start_workers(s elf, n):
"""Start n workers."""
for i in xrange(n):
t = threading.Threa d(target = self.worker)
t.setDaemon(Tru e)
t.start()
def worker(self):
"""A function of a working
thread.
It gets a request from queue (blocking if
there
are no requests) and processes
it.
After processing it checks how many spare
workers
are there now and if this value is greater
than
self.min_spare_ workers then the worker
exits.
Otherwise it loops
infinitely.
"""
with self.workers_mu tex:
self.num_worker s += 1
while True:
(request, client_address) = self.q.get()
with self.workers_mu tex:
self.num_busy_w orkers += 1
self.process_re quest_thread(re quest, client_address)
self.q.task_don e()
with self.workers_mu tex:
self.num_busy_w orkers -= 1
if self.num_worker s - self.num_busy_w orkers \
self.min_spare_ workers:
self.num_worker s -= 1
return
def process_request (self, request, client_address) :
"""Puts a request into
queue.
If the queue size is too large, it adds extra
worker.
"""
self.q.put((req uest, client_address) )
with self.workers_mu tex:
if self.q.qsize() 3 and self.num_worker s <
self.max_worker s:
self.start_work ers(1)
def join(self):
"""Wait for all busy threads"""
self.q.join()
Comment