Running a python farm

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • Ian McConnell

    Running a python farm

    What's the pythonic way of sending out a set of requests in parallel?

    My program throws an image at the server and then waits for the result. I'm
    currently using this bit of socket code to send an image to server on
    another machine.


    import socket
    import SocketServer
    import cPickle

    def oneclient(host, port, array):
    sock = socket.socket(s ocket.AF_INET, socket.SOCK_STR EAM)
    sock.connect((h ost, port))
    sent = sock.sendall(cP ickle.dumps(arr ay, 0))
    sock.shutdown(1 )
    rfile = sock.makefile(' rb')
    return cPickle.loads(r file.read())


    Processing the image can take several minutes and I have loads of images to
    process, but I also have several servers available, so I'd like save some
    time by distributing the images around the servers. So for 'n' servers,
    throw 'n' images at them. Then as each server finishes, give it another
    image to work on until I've done all the images.

    What should I be looking for, for a pythonic solution? I'm stuck for
    terminology? Threads? Load balancing? Processors farms? Any pointers or
    suggestions welcome.



    I don't have to do this too often, so I would prefer a simple solution over
    performance.


    Thanks.


    --
    "Thinks: I can't think of a thinks. End of thinks routine": Blue Bottle

    ** Aunty Spam says: Remove the trailing x from the To: field to reply **
  • Andrew Bennetts

    #2
    Re: Running a python farm

    On Mon, Oct 27, 2003 at 02:28:05PM +0000, Ian McConnell wrote:[color=blue]
    > What's the pythonic way of sending out a set of requests in parallel?
    >
    > My program throws an image at the server and then waits for the result. I'm
    > currently using this bit of socket code to send an image to server on
    > another machine.
    >
    >
    > import socket
    > import SocketServer
    > import cPickle
    >
    > def oneclient(host, port, array):
    > sock = socket.socket(s ocket.AF_INET, socket.SOCK_STR EAM)
    > sock.connect((h ost, port))
    > sent = sock.sendall(cP ickle.dumps(arr ay, 0))
    > sock.shutdown(1 )
    > rfile = sock.makefile(' rb')
    > return cPickle.loads(r file.read())
    >
    >
    > Processing the image can take several minutes and I have loads of images to
    > process, but I also have several servers available, so I'd like save some
    > time by distributing the images around the servers. So for 'n' servers,
    > throw 'n' images at them. Then as each server finishes, give it another
    > image to work on until I've done all the images.[/color]

    Probably the simplest way is to have n worker threads, and a main thread
    that pushes the images to be processed onto a Queue.Queue instance.

    So, your worker thread function might look something like (using your
    function above):

    def worker(host, port, queue):
    while 1:
    array = queue.get()
    if array is None:
    return
    result = oneclient(host, port, array)
    # do something with the result, perhaps store in a shared
    # list or dictionary?

    Then you just need a main function like:

    import Queue, threading

    servers = [('host1', 1111), ('host2', 2222)] # etc...

    def main():
    q = Queue.Queue()
    workers = []
    for host, port in servers:
    t = threading.Threa d(target=worker , args=(host, port, q))
    t.start()
    workers.append( t)

    # assume we get arrays to be processed from somewhere
    for array in arrays:
    q.put(array)

    # Tell the worker threads to stop when there's nothing left to do
    for i in range(len(worke rs)):
    q.put(None)

    # Wait for all the treads to finish
    for t in workers:
    t.join()

    # Do something with the results gathered by the workers
    pass

    -Andrew.


    Comment

    • John Roth

      #3
      Re: Running a python farm


      "Ian McConnell" <ian@emit.demon .co.ukx> wrote in message
      news:877k2qrc0a .fsf@emit.demon .co.uk...[color=blue]
      > What's the pythonic way of sending out a set of requests in parallel?
      >
      > My program throws an image at the server and then waits for the result.[/color]
      I'm[color=blue]
      > currently using this bit of socket code to send an image to server on
      > another machine.
      >
      >
      > import socket
      > import SocketServer
      > import cPickle
      >
      > def oneclient(host, port, array):
      > sock = socket.socket(s ocket.AF_INET, socket.SOCK_STR EAM)
      > sock.connect((h ost, port))
      > sent = sock.sendall(cP ickle.dumps(arr ay, 0))
      > sock.shutdown(1 )
      > rfile = sock.makefile(' rb')
      > return cPickle.loads(r file.read())
      >
      >
      > Processing the image can take several minutes and I have loads of images[/color]
      to[color=blue]
      > process, but I also have several servers available, so I'd like save some
      > time by distributing the images around the servers. So for 'n' servers,
      > throw 'n' images at them. Then as each server finishes, give it another
      > image to work on until I've done all the images.
      >
      > What should I be looking for, for a pythonic solution? I'm stuck for
      > terminology? Threads? Load balancing? Processors farms? Any pointers or
      > suggestions welcome.
      >
      >
      >
      > I don't have to do this too often, so I would prefer a simple solution[/color]
      over[color=blue]
      > performance.[/color]

      All the solutions I know of have adequate performance. Andrew's
      suggestion of using a thread to handle coordination with each of the
      server processors is the conventional textbook approach; there's
      much to recommend doing it the way everyone else does.

      I've frequently found it simpler to not deal with thread coordination
      problems (which can get real nasty) by using the select or poll
      services from the select module (7.3 in the 2.2.3 documentation).
      This leads to an event driven style that is closer to GUI programming
      than the procedure oriented style typical of threads.

      Any way you do it, it's going to complicate your program.

      John Roth
      [color=blue]
      > Thanks.
      >[/color]


      Comment

      • Brian Kelley

        #4
        Re: Running a python farm

        Ian McConnell wrote:[color=blue]
        > What's the pythonic way of sending out a set of requests in parallel?
        >
        > My program throws an image at the server and then waits for the result. I'm
        > currently using this bit of socket code to send an image to server on
        > another machine.
        >
        >
        > import socket
        > import SocketServer
        > import cPickle
        >
        > def oneclient(host, port, array):
        > sock = socket.socket(s ocket.AF_INET, socket.SOCK_STR EAM)
        > sock.connect((h ost, port))
        > sent = sock.sendall(cP ickle.dumps(arr ay, 0))
        > sock.shutdown(1 )
        > rfile = sock.makefile(' rb')
        > return cPickle.loads(r file.read())
        >[/color]

        You don't need threads here. We can leverage the fact that
        select.select can poll to see if results are ready, how about something
        like:

        class Client:
        def __init__(host, port, array):
        sock = socket.socket(s ocket.AF_INET, socket.SOCK_STR EAM)
        self.sock = sock
        self.host = host
        self.port = port
        sock.connect((h ost, port))
        sent = sock.sendall(cP ickle.dumps(arr ay, 0))
        sock.shutdown(1 )
        self.rfile = sock.makefile(' rb')

        def result(self):
        r,w,e = selelect.select ([self.rfile], [], [], 1)
        if not r:
        return None
        else:
        return cPickle.loads(s elf.rfile.read( ))

        all_images = [...]
        hosts = [(host,port), (host1,port1)]
        client_list = []
        # assign images to hosts
        for host, port in hosts:
        client_list.app end(Client(host , port, all_images.pop( )))

        # process
        while client_list:
        next_client_lis t = []
        for o in client_list:
        res = o.result()
        if o is not None:
        # do something with image result
        if all_images: # if we have images assign to completed host
        next_client_lis t.append(Client (o.host, o.port,
        all_images.pop( ))
        else:
        next_client_lis t.append(o)
        client_list = next_client_lis t

        Comment

        • Hung Jung Lu

          #5
          Re: Running a python farm

          Ian McConnell <ian@emit.demon .co.ukx> wrote in message news:<877k2qrc0 a.fsf@emit.demo n.co.uk>...[color=blue]
          > Processing the image can take several minutes and I have loads of images to
          > process, but I also have several servers available, so I'd like save some
          > time by distributing the images around the servers. So for 'n' servers,
          > throw 'n' images at them. Then as each server finishes, give it another
          > image to work on until I've done all the images.[/color]

          That's all fine. In the short term, that's the way to get things done.
          You many also want to look at xmlrpclib.

          For longer term, you may want to do extra decoupling, to make things
          more scalable/maintainable/fault-tolerant. A few ideas I got recently
          were: (1) prepare tasks well enough so they can kind of run "off-line"
          in a pool, you may use GUIDs (globally unique ids) to tag the tasks
          (2) do remote-to-local calls instead of local-to-remote calls. This
          means you need broadcast capability. (3) Write your own task manager,
          an installable on all machines (local and remote), so that you can
          dispatch programs and data files more easily, start and stop remote
          services. For instance, you now may have a remote server to do image
          processing, but what in the future you want to have servers to do
          sound file processing? What if you need to upgrade your remote
          programs?

          regards,

          Hung Jung

          Comment

          • Ian McConnell

            #6
            Re: Running a python farm

            Ian McConnell <ian@emit.demon .co.ukx> writes:
            [color=blue]
            > What's the pythonic way of sending out a set of requests in parallel?
            >
            > My program throws an image at the server and then waits for the result. I'm
            > currently using this bit of socket code to send an image to server on
            > another machine.[/color]

            Once again, thanks to all those who replied.

            In the end I went with the select and poll method as I only have a small
            number of machines to run my code on and this method fitted in easiest with
            my existing program.

            Actually, it also turns out that my code isn't quite as parallelised as I
            thought it was and having results coming back in a different order confused
            some of the later bookkeeping, so I've also done a version (pclient2) that
            maintains the image order. Even with this limitation, I do get a good speed
            up.

            I'm sure this code can be made more efficient and is probably fairly easy to
            deadlock, but it works for me.




            import sys
            import socket
            import cPickle
            import select
            import string

            class Machine:
            def __init__(self, hostname):
            colon = string.rfind(ho stname, ':')
            if colon >= 0:
            self.host = hostname[0:colon]
            self.port = int(hostname[colon+1:])
            else:
            self.host = hostname
            self.port = 10000 # default port
            # Should check for host being alive and possible endian issues
            def connect(self):
            return (self.host, self.port)
            def __repr__(self):
            return "%s:%d" % (self.host, self.port)

            class Client:
            def __init__(self, host, array, opts):
            print 'NEW CLIENT', host, array.shape
            sock = socket.socket(s ocket.AF_INET, socket.SOCK_STR EAM)
            self.host = host
            sock.connect(ho st.connect())
            sent = sock.sendall(cP ickle.dumps((ar ray, opts), 0))
            sock.shutdown(1 )
            self.rfile = sock.makefile(' rb')

            def result(self):
            r,w,e = select.select([self.rfile], [], [], 1)
            if not r:
            return None
            else:
            return cPickle.loads(s elf.rfile.read( ))
            def __repr__(self):
            return "%s" % self.host
            def name(self):
            return self.host


            def pclient(hostnam es, all_images, opts):
            """
            hostnames is a list of machine names (as host:port) that are willing
            to process data
            all_image is a list of images
            opts is a dictionary of processing options.
            """

            client_list = []
            result = []
            # assign images to hosts
            hosts = []
            while len(hostnames) > 0:
            host = Machine(hostnam es.pop())
            if host is not None:
            hosts.append(ho st)
            for host in hosts:
            client_list.app end(Client(host , all_images.pop( ), opts))

            # process
            while client_list:
            print len(client_list ), 'clients busy'
            next_client_lis t = []
            for o in client_list:
            res = o.result()
            if res is not None:
            result.append(r es)
            if all_images: # if we have images assign to completed host
            next_client_lis t.append(Client (o.host, all_images.pop( ),
            opts))
            else:
            next_client_lis t.append(o)
            client_list = next_client_lis t
            return result



            #
            #
            #
            # Extra code to ensure results return in same order as submitted.
            #
            #
            #


            class fifo:
            """Simple implementation of a First-In-First-Out structure."""
            def __init__(self):
            self.in_stack = []
            def push(self, obj):
            self.in_stack.a ppend(obj)
            # print 'fifo push', self.in_stack
            def pop(self):
            return self.in_stack.p op(0)
            def __repr__(self):
            return str(self.in_sta ck)
            def __len__(self):
            return len(self.in_sta ck)
            def head(self):
            return self.in_stack[0]

            class Workers:
            def __init__(self, hosts, opts):
            self.idle = hosts
            self.busy = fifo()
            self.opts = opts

            def free(self):
            return len(self.idle)

            def newjob(self, array):
            if array == [] or array == None:
            # Delete idle list
            self.idle = []
            return
            host = self.idle.pop()
            self.busy.push( Client(host, array, self.opts))

            def poll(self):
            if len(self.busy) == 0:
            return None
            host = self.busy.head( )
            return host.result()

            def done(self):
            if len(self.busy) == 0:
            return None
            res = None
            while res == None:
            res = self.poll()
            host = self.busy.pop()
            self.idle.appen d(host.name())
            print ' idle:', self.idle, ' busy:', self.busy
            return res

            def pclient2(hostna mes, all_images, opts):
            """
            hostnames is a list of machine names (as host:port) that are willing
            to process data
            all_image is a list of images
            opts is a dictionary of processing options.
            Returns results in same order as input.
            """
            result = []
            hosts = []
            while len(hostnames) > 0:
            host = Machine(hostnam es.pop())
            if host is not None:
            hosts.append(ho st)
            workers = Workers(hosts, opts)

            while workers.free() > 0:
            workers.newjob( all_images.pop( ))

            print 'PCLIENT2 idle:', workers.idle, ' busy:', workers.busy
            while 1:
            res = workers.done()
            if res == None:
            break
            result.append(r es)
            # Queue up another job if there are images left to process
            if all_images:
            workers.newjob( all_images.pop( ))
            return result


            --
            "Thinks: I can't think of a thinks. End of thinks routine": Blue Bottle

            ** Aunty Spam says: Remove the trailing x from the To: field to reply **

            Comment

            Working...