ThreadPoolingMixIn

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • pavel.uvarov@gmail.com

    ThreadPoolingMixIn

    Hi, everybody!

    I wrote a useful class ThreadPoolingMi xIn which can be used to create
    fast thread-based servers. This mix-in works much faster than
    ThreadingMixIn because it doesn't create a new thread on each request.

    Is it worth including in SocketServer.py ?


    from __future__ import with_statement
    from SocketServer import ThreadingMixIn
    import threading
    import Queue
    class ThreadPoolingMi xIn(ThreadingMi xIn):
    """Mix-in class to handle requests in a thread
    pool.

    The pool grows and thrinks depending on
    load.

    For instance, a threading UDP server class is created as
    follows:

    class ThreadPoolingUD PServer(ThreadP oolingMixIn, UDPServer):
    pass

    """
    __author__ = 'Pavel Uvarov <pavel.uvarov@g mail.com>'

    def init_thread_poo l(self, min_workers = 5,
    max_workers = 100, min_spare_worke rs = 5):
    """Initiali ze thread pool."""
    self.q = Queue.Queue()
    self.min_worker s = min_workers
    self.max_worker s = max_workers
    self.min_spare_ workers = min_spare_worke rs
    self.num_worker s = 0
    self.num_busy_w orkers = 0
    self.workers_mu tex = threading.Lock( )
    self.start_work ers(self.min_wo rkers)

    def start_workers(s elf, n):
    """Start n workers."""
    for i in xrange(n):
    t = threading.Threa d(target = self.worker)
    t.setDaemon(Tru e)
    t.start()

    def worker(self):
    """A function of a working
    thread.

    It gets a request from queue (blocking if
    there
    are no requests) and processes
    it.

    After processing it checks how many spare
    workers
    are there now and if this value is greater
    than
    self.min_spare_ workers then the worker
    exits.
    Otherwise it loops
    infinitely.

    """
    with self.workers_mu tex:
    self.num_worker s += 1
    while True:
    (request, client_address) = self.q.get()
    with self.workers_mu tex:
    self.num_busy_w orkers += 1
    self.process_re quest_thread(re quest, client_address)
    self.q.task_don e()
    with self.workers_mu tex:
    self.num_busy_w orkers -= 1
    if self.num_worker s - self.num_busy_w orkers \
    self.min_spare_ workers:
    self.num_worker s -= 1
    return

    def process_request (self, request, client_address) :
    """Puts a request into
    queue.

    If the queue size is too large, it adds extra
    worker.

    """
    self.q.put((req uest, client_address) )
    with self.workers_mu tex:
    if self.q.qsize() 3 and self.num_worker s <
    self.max_worker s:
    self.start_work ers(1)

    def join(self):
    """Wait for all busy threads"""
    self.q.join()
  • Rhamphoryncus

    #2
    Re: ThreadPoolingMi xIn

    On May 30, 2:40 pm, pavel.uva...@gm ail.com wrote:
    Hi, everybody!
    >
    I wrote a useful class ThreadPoolingMi xIn which can be used to create
    fast thread-based servers. This mix-in works much faster than
    ThreadingMixIn because it doesn't create a new thread on each request.
    Do you have any benchmarks demonstrating the performance difference/

    t.setDaemon(Tru e)
    Unfortunately, shutdown with daemon threads is fairly buggy. :/

    Comment

    • Giampaolo Rodola'

      #3
      Re: ThreadPoolingMi xIn

      On 30 Mag, 22:40, pavel.uva...@gm ail.com wrote:
      Hi, everybody!
      >
      I wrote a useful class ThreadPoolingMi xIn which can be used to create
      fast thread-based servers. This mix-in works much faster than
      ThreadingMixIn because it doesn't create a new thread on each request.
      >
      Is it worth including in SocketServer.py ?
      >
      from __future__ import with_statement
      from SocketServer import ThreadingMixIn
      import threading
      import Queue
      class ThreadPoolingMi xIn(ThreadingMi xIn):
          """Mix-in class to handle requests in a thread
      pool.
      >
          The pool grows and thrinks depending on
      load.
      >
          For instance, a threading UDP server class is created as
      follows:
      >
          class ThreadPoolingUD PServer(ThreadP oolingMixIn, UDPServer):
      pass
      >
          """
          __author__ = 'Pavel Uvarov <pavel.uva...@g mail.com>'
      >
          def init_thread_poo l(self, min_workers = 5,
                               max_workers = 100, min_spare_worke rs = 5):
              """Initiali ze thread pool."""
              self.q = Queue.Queue()
              self.min_worker s = min_workers
              self.max_worker s = max_workers
              self.min_spare_ workers = min_spare_worke rs
              self.num_worker s = 0
              self.num_busy_w orkers = 0
              self.workers_mu tex = threading.Lock( )
              self.start_work ers(self.min_wo rkers)
      >
          def start_workers(s elf, n):
              """Start n workers."""
              for i in xrange(n):
                  t = threading.Threa d(target = self.worker)
                  t.setDaemon(Tru e)
                  t.start()
      >
          def worker(self):
              """A function of a working
      thread.
      >
              It gets a request from queue (blocking if
      there
              are no requests) and processes
      it.
      >
              After processing it checks how many spare
      workers
              are there now and if this value is greater
      than
              self.min_spare_ workers then the worker
      exits.
              Otherwise it loops
      infinitely.
      >
              """
              with self.workers_mu tex:
                  self.num_worker s += 1
              while True:
                  (request, client_address) = self.q.get()
                  with self.workers_mu tex:
                      self.num_busy_w orkers += 1
                  self.process_re quest_thread(re quest, client_address)
                  self.q.task_don e()
                  with self.workers_mu tex:
                      self.num_busy_w orkers -= 1
                      if self.num_worker s - self.num_busy_w orkers \
                              self.min_spare_ workers:
                          self.num_worker s -= 1
                          return
      >
          def process_request (self, request, client_address) :
              """Puts a request into
      queue.
      >
              If the queue size is too large, it adds extra
      worker.
      >
              """
              self.q.put((req uest, client_address) )
              with self.workers_mu tex:
                  if self.q.qsize() 3 and self.num_worker s <
      self.max_worker s:
                      self.start_work ers(1)
      >
          def join(self):
              """Wait for all busy threads"""
              self.q.join()
      This is not the right place to discuss about such a thing.
      Post this same message on python-dev ml or, even better, open a new
      ticket on the bug tracker attaching the patch and, most important, a
      benchmark demonstrating the speed improvement.

      --- Giampaolo

      Comment

      • Rhamphoryncus

        #4
        Re: ThreadPoolingMi xIn

        On May 31, 1:40 pm, "Giampaolo Rodola'" <gne...@gmail.c omwrote:
        On 30 Mag, 22:40, pavel.uva...@gm ail.com wrote:
        >
        >
        >
        Hi, everybody!
        >
        I wrote a useful class ThreadPoolingMi xIn which can be used to create
        fast thread-based servers. This mix-in works much faster than
        ThreadingMixIn because it doesn't create a new thread on each request.
        >
        Is it worth including in SocketServer.py ?
        >
        from __future__ import with_statement
        from SocketServer import ThreadingMixIn
        import threading
        import Queue
        class ThreadPoolingMi xIn(ThreadingMi xIn):
        """Mix-in class to handle requests in a thread
        pool.
        >
        The pool grows and thrinks depending on
        load.
        >
        For instance, a threading UDP server class is created as
        follows:
        >
        class ThreadPoolingUD PServer(ThreadP oolingMixIn, UDPServer):
        pass
        >
        """
        __author__ = 'Pavel Uvarov <pavel.uva...@g mail.com>'
        >
        def init_thread_poo l(self, min_workers = 5,
        max_workers = 100, min_spare_worke rs = 5):
        """Initiali ze thread pool."""
        self.q = Queue.Queue()
        self.min_worker s = min_workers
        self.max_worker s = max_workers
        self.min_spare_ workers = min_spare_worke rs
        self.num_worker s = 0
        self.num_busy_w orkers = 0
        self.workers_mu tex = threading.Lock( )
        self.start_work ers(self.min_wo rkers)
        >
        def start_workers(s elf, n):
        """Start n workers."""
        for i in xrange(n):
        t = threading.Threa d(target = self.worker)
        t.setDaemon(Tru e)
        t.start()
        >
        def worker(self):
        """A function of a working
        thread.
        >
        It gets a request from queue (blocking if
        there
        are no requests) and processes
        it.
        >
        After processing it checks how many spare
        workers
        are there now and if this value is greater
        than
        self.min_spare_ workers then the worker
        exits.
        Otherwise it loops
        infinitely.
        >
        """
        with self.workers_mu tex:
        self.num_worker s += 1
        while True:
        (request, client_address) = self.q.get()
        with self.workers_mu tex:
        self.num_busy_w orkers += 1
        self.process_re quest_thread(re quest, client_address)
        self.q.task_don e()
        with self.workers_mu tex:
        self.num_busy_w orkers -= 1
        if self.num_worker s - self.num_busy_w orkers \
        self.min_spare_ workers:
        self.num_worker s -= 1
        return
        >
        def process_request (self, request, client_address) :
        """Puts a request into
        queue.
        >
        If the queue size is too large, it adds extra
        worker.
        >
        """
        self.q.put((req uest, client_address) )
        with self.workers_mu tex:
        if self.q.qsize() 3 and self.num_worker s <
        self.max_worker s:
        self.start_work ers(1)
        >
        def join(self):
        """Wait for all busy threads"""
        self.q.join()
        >
        This is not the right place to discuss about such a thing.
        Post this same message on python-dev ml or, even better, open a new
        ticket on the bug tracker attaching the patch and, most important, a
        benchmark demonstrating the speed improvement.
        It's perfectly acceptable to discuss ideas here before bringing them
        up on python-ideas, and then python-dev.

        Comment

        • pavel.uvarov@gmail.com

          #5
          Re: ThreadPoolingMi xIn

          On May 31, 9:13 pm, Rhamphoryncus <rha...@gmail.c omwrote:
          On May 30, 2:40 pm, pavel.uva...@gm ail.com wrote:
          >
          Hi, everybody!
          >
          I wrote a useful class ThreadPoolingMi xIn which can be used to create
          fast thread-based servers. This mix-in works much faster than
          ThreadingMixIn because it doesn't create a new thread on each request.
          >
          Do you have any benchmarks demonstrating the performance difference/
          >
          To benchmark this I used a simple tcp server which writes a small
          (16k)
          string to the client and closes the connection.

          I started 100 remote clients and got 500 replies/s for ThreadingMixIn
          and more than 1500 replies/s for ThreadPoolingMi xIn. I tested it on
          FreeBSD 6.2 amd64.

          I'm very curious about the exactness of the number 500 for
          ThreadingMixIn. It seems to be the same for various packet sizes.
          I suspect there is some OS limit on thread creating rate.

          Below I include a bugfixed ThreadPoolingMi xIn and the benchmarking
          utility. The utility can be used to start clients on localhost, though
          the reply rate will be slower (around 1000 replies/s).

          To start benchmarking server with localhost clients use:
          python ./TestServer.py --server=threadin g --n-clients=100
          or
          python ./TestServer.py --server=threadpo oling --n-clients=100


          #------- ThreadPoolingMi xIn.py
          from __future__ import with_statement
          from SocketServer import ThreadingMixIn
          import threading
          import Queue
          class ThreadPoolingMi xIn(ThreadingMi xIn):
          """Mix-in class to handle requests in a thread pool.

          The pool grows and thrinks depending on load.

          For instance, a threadpooling TCP server class is created as
          follows:

          class ThreadPoolingUD PServer(ThreadP oolingMixIn, TCPServer): pass


          """
          __author__ = 'Pavel Uvarov <pavel.uvarov@g mail.com>'

          def init_thread_poo l(self, min_workers = 5,
          max_workers = 100, min_spare_worke rs = 5):
          """Initiali ze thread pool."""
          self.q = Queue.Queue()
          self.min_worker s = min_workers
          self.max_worker s = max_workers
          self.min_spare_ workers = min_spare_worke rs
          self.num_worker s = 0
          self.num_busy_w orkers = 0
          self.workers_mu tex = threading.Lock( )
          self.start_work ers(self.min_wo rkers)

          def start_workers(s elf, n):
          """Start n workers."""
          for i in xrange(n):
          t = threading.Threa d(target = self.worker)
          t.setDaemon(Tru e)
          t.start()

          def worker(self):
          """A function of a working thread.

          It gets a request from queue (blocking if there
          are no requests) and processes it.

          After processing it checks how many spare workers
          are there now and if this value is greater than
          self.min_spare_ workers then the worker exits.
          Otherwise it loops infinitely.

          """
          with self.workers_mu tex:
          self.num_worker s += 1
          while True:
          (request, client_address) = self.q.get()
          with self.workers_mu tex:
          self.num_busy_w orkers += 1
          self.process_re quest_thread(re quest, client_address)
          self.q.task_don e()
          with self.workers_mu tex:
          self.num_busy_w orkers -= 1
          if (self.num_worke rs self.min_worker s and
          self.num_worker s - self.num_busy_w orkers >
          self.min_spare_ workers):
          self.num_worker s -= 1
          return

          def process_request (self, request, client_address) :
          """Puts a request into queue.

          If the queue size is too large, it adds extra worker.

          """
          self.q.put((req uest, client_address) )
          with self.workers_mu tex:
          if self.q.qsize() 3 and self.num_worker s <
          self.max_worker s:
          self.start_work ers(1)

          def join(self):
          """Wait for all busy threads"""
          self.q.join()

          #------- TestServer.py
          from __future__ import with_statement
          from SocketServer import *
          import socket
          import sys
          import threading
          import time
          import os
          from ThreadPoolingMi xIn import *

          class ThreadPoolingTC PServer(ThreadP oolingMixIn, TCPServer): pass

          class TestServer(Thre adingTCPServer) :

          allow_reuse_add ress = True
          request_queue_s ize = 128

          def __init__(self, server_address, RequestHandlerC lass,
          packet_size):
          TCPServer.__ini t__(self, server_address, RequestHandlerC lass)
          self.packet_siz e = packet_size
          self.sum_t = 0
          self.total_num_ requests = 0
          self.num_reques ts = 0
          self.t0 = time.time()
          self.lock = threading.Lock( )

          def reset_stats(sel f):
          with self.lock:
          self.total_num_ requests += self.num_reques ts
          self.num_reques ts = 0
          self.sum_t = 0
          self.t0 = time.time()

          def update_stats(se lf, t0, t1):
          with self.lock:
          self.num_reques ts += 1
          self.sum_t += t1 - t0
          n = self.num_reques ts
          sum_t = self.sum_t
          avg_t = sum_t / n
          rate = n / (t1 - self.t0)
          return (n, avg_t, rate)

          def handle_request( self):
          """Handle one request, possibly blocking."""
          try:
          request, client_address = self.get_reques t()
          except KeyboardInterru pt:
          raise
          except socket.error:
          return
          if self.verify_req uest(request, client_address) :
          try:
          self.process_re quest(request, client_address)
          except KeyboardInterru pt:
          raise
          except:
          self.handle_err or(request, client_address)
          self.close_requ est(request)

          class TestServerThrea dPool(ThreadPoo lingMixIn,TestS erver):
          def __init__(self, server_address, RequestHandlerC lass,
          packet_size):
          TestServer.__in it__(self, server_address, RequestHandlerC lass,
          packet_size)
          self.init_threa d_pool(2, 200, 20)

          class TestRequestHand ler(StreamReque stHandler):

          def __init__(self, request, client_address, server):
          self.t0 = time.time()
          StreamRequestHa ndler.__init__( self, request,
          client_address, server)

          def handle(self):
          self.wfile.writ e('a'*(self.ser ver.packet_size ))

          t1 = time.time()
          (n, avg_t, rate) = self.server.upd ate_stats(self. t0, t1)
          if n % 10000 == 0:
          print('rate=%.2 f ' % rate)
          self.server.res et_stats()

          from optparse import OptionParser

          def server(o):
          HandlerClass = TestRequestHand ler

          if o.server == "threading" :
          ServerClass = TestServer
          elif o.server == "threadpooling" :
          ServerClass = TestServerThrea dPool
          else:
          return

          server_address = ('', o.port)
          try:
          srv = ServerClass(ser ver_address, HandlerClass,
          o.packet_size)

          sa = srv.socket.gets ockname()
          print "Serving on", sa[0], "port", sa[1], "..."
          srv.serve_forev er()
          except Exception, val:
          print "Exception: %s" % str(val)
          raise

          def client(o):
          for f in xrange(0,o.n_cl ients):
          if os.fork():
          while True:
          try:
          sock = socket.socket(s ocket.AF_INET,
          socket.SOCK_STR EAM)
          sock.connect((" localhost",o.po rt))
          while len(sock.recv(4 096)):
          pass
          sock.close()
          except Exception, val:
          print val
          time.sleep(1)

          if __name__ == '__main__':
          args = sys.argv[1:]
          usage = "usage: %prog [options]"
          parser = OptionParser(us age)
          parser.add_opti on( "-p", "--port", help="Server port",
          type="int", default=8123 )
          parser.add_opti on( "", "--n-clients", help="Number of client
          forks",
          type="int", default=0 )
          parser.add_opti on( "", "--server",
          help="Type of the server (threading or
          threadpooling)" ,
          type="string", default="" )
          parser.add_opti on( "", "--packet-size", help="Packet size",
          type="int", default=16*1024 )
          (o,a) = parser.parse_ar gs(args)

          if os.fork() == 0:
          server(o)
          else:
          client(o)

          Comment

          • pavel.uvarov@gmail.com

            #6
            Re: ThreadPoolingMi xIn

            On Jun 2, 7:09 pm, pavel.uva...@gm ail.com wrote:
            On May 31, 9:13 pm, Rhamphoryncus <rha...@gmail.c omwrote:
            >
            On May 30, 2:40 pm, pavel.uva...@gm ail.com wrote:
            >
            Hi, everybody!
            >
            I wrote a useful class ThreadPoolingMi xIn which can be used to create
            fast thread-based servers. This mix-in works much faster than
            ThreadingMixIn because it doesn't create a new thread on each request.
            >
            Do you have any benchmarks demonstrating the performance difference/
            >
            To benchmark this I used a simple tcp server which writes a small
            (16k)
            string to the client and closes the connection.
            >
            I started 100 remote clients and got 500 replies/s for ThreadingMixIn
            and more than 1500 replies/s for ThreadPoolingMi xIn. I tested it on
            FreeBSD 6.2 amd64.
            >
            I'm very curious about the exactness of the number 500 for
            ThreadingMixIn. It seems to be the same for various packet sizes.
            I suspect there is some OS limit on thread creating rate.
            >
            Below I include a bugfixed ThreadPoolingMi xIn and the benchmarking
            utility. The utility can be used to start clients on localhost, though
            the reply rate will be slower (around 1000 replies/s).
            >
            To start benchmarking server with localhost clients use:
            python ./TestServer.py --server=threadin g --n-clients=100
            or
            python ./TestServer.py --server=threadpo oling --n-clients=100
            I've just tested it on a linux box and got a 240 replies/s vs 2000
            replies/s, that is 8x performance improvement.

            Comment

            • =?ISO-8859-1?Q?Michael_Str=F6der?=

              #7
              Re: ThreadPoolingMi xIn

              pavel.uvarov@gm ail.com wrote:
              To benchmark this I used a simple tcp server which writes a small
              (16k)
              string to the client and closes the connection.
              Just a general note: When benchmarking such a network service it would
              be valuable to see benchmark results for several data sizes. I'd expect
              better numbers for a ThreadPoolingMi xIn when there are more requests
              with smaller data size.

              Ciao, Michael.

              Comment

              • pavel.uvarov@gmail.com

                #8
                Re: ThreadPoolingMi xIn

                On Jun 2, 7:15 pm, Michael Ströder <mich...@stroed er.comwrote:
                pavel.uva...@gm ail.com wrote:
                To benchmark this I used a simple tcp server which writes a small
                (16k)
                string to the client and closes the connection.
                >
                Just a general note: When benchmarking such a network service it would
                be valuable to see benchmark results for several data sizes. I'd expect
                better numbers for a ThreadPoolingMi xIn when there are more requests
                with smaller data size.
                >
                Ciao, Michael.
                Here are benchmarks for FreeBSD 6.2, amd64

                packet_size x y
                0 499.57 1114.54
                1024 499.29 1130.02
                3072 500.09 1119.14
                7168 498.20 1111.76
                15360 499.29 1086.73
                31744 500.04 1036.46
                64512 499.43 939.60
                130048 499.28 737.44
                261120 498.04 499.03
                523264 307.54 312.04
                1047552 173.57 185.32
                2096128 93.61 94.39

                x = ThreadingMixIn replies/s
                y = ThreadPoolingMi xIn replies/s

                Comment

                • miller.paul.w@gmail.com

                  #9
                  Re: ThreadPoolingMi xIn

                  On Jun 2, 12:41 pm, pavel.uva...@gm ail.com wrote:
                  On Jun 2, 7:15 pm, Michael Ströder <mich...@stroed er.comwrote:
                  >
                  Here are benchmarks for FreeBSD 6.2, amd64
                  >
                  packet_size         x         y
                            0    499.57   1114.54
                         1024    499.29   1130.02
                         3072    500.09   1119.14
                         7168    498.20   1111.76
                        15360    499.29   1086.73
                        31744    500.04   1036.46
                        64512    499.43    939.60
                       130048    499.28    737.44
                       261120    498.04    499.03
                       523264    307.54    312.04
                      1047552    173.57    185.32
                      2096128     93.61     94.39
                  >
                  x = ThreadingMixIn replies/s
                  y = ThreadPoolingMi xIn replies/s
                  Well, I'd say you've got yourself a winner. Performance (at least on
                  FreeBSD) seems as good or better for your ThreadPoolingMi xin than
                  ThreadingMixin. Is this with the default values of min=5 and max=5
                  worker threads?

                  Comment

                  • pavel.uvarov@gmail.com

                    #10
                    Re: ThreadPoolingMi xIn

                    On Jun 3, 1:19 am, miller.pau...@g mail.com wrote:
                    On Jun 2, 12:41 pm, pavel.uva...@gm ail.com wrote:
                    >
                    >
                    >
                    On Jun 2, 7:15 pm, Michael Ströder <mich...@stroed er.comwrote:
                    >
                    Here are benchmarks for FreeBSD 6.2, amd64
                    >
                    packet_size x y
                    0 499.57 1114.54
                    1024 499.29 1130.02
                    3072 500.09 1119.14
                    7168 498.20 1111.76
                    15360 499.29 1086.73
                    31744 500.04 1036.46
                    64512 499.43 939.60
                    130048 499.28 737.44
                    261120 498.04 499.03
                    523264 307.54 312.04
                    1047552 173.57 185.32
                    2096128 93.61 94.39
                    >
                    x = ThreadingMixIn replies/s
                    y = ThreadPoolingMi xIn replies/s
                    >
                    Well, I'd say you've got yourself a winner. Performance (at least on
                    FreeBSD) seems as good or better for your ThreadPoolingMi xin than
                    ThreadingMixin. Is this with the default values of min=5 and max=5
                    worker threads?
                    No, I initialized thread pool with min_threads=2, max_threads=200 and
                    min_spare_threa ds=20.

                    For Linux (2.6.22, amd64) I got even more dramatic improvement:

                    packet_size x y
                    0 249.97 2014.63
                    1024 249.98 1782.83
                    3072 240.09 1859.00
                    7168 249.98 1900.61
                    15360 249.98 1787.30
                    31744 238.96 1808.17
                    64512 249.85 1561.47
                    130048 237.26 1213.26
                    261120 249.98 841.96
                    523264 249.97 595.40
                    1047552 236.40 351.96
                    2096128 216.26 218.15

                    x = ThreadingMixIn replies/s
                    y = ThreadPoolingMi xIn replies/s

                    Comment

                    Working...