parallel csv-file processing

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • Michel Albert

    parallel csv-file processing

    Currently I am faced with a large computation tasks, which works on a
    huge CSV file. As a test I am working on a very small subset which
    already contains 2E6 records. The task itself allows the file to be
    split however as each computation only involves one line. The
    application performing the computation exists already, but it was
    never meant to run on such a big dataset.

    One thing that is clear, is that it will take a while to compute all
    this. So a distributed approach is probably a good idea. There ar a
    couple of options for this:

    Scenario A ( file is split manually in smaller parts ):
    1) Fire up an openmosix/kerrighed cluster, and run one process for
    each file part.

    Scenario B ( file is "split" using the application itself ):
    2) Again with an openmosix/kerrighed cluster, but only one instance of
    the application is run, using parallelpython
    3) Using parallelpython without cluster, but using ppserver.py on each
    node.

    The second case looks most interesting as it is quite flexible. In
    this case I would need to address subsets of the CSV file however. And
    the default csv.reader class does not allow random-access of the file
    (or jumping to a specific line).

    What would be the most efficient way to subset a CSV-file. For
    example:

    f1 = job_server.subm it(calc_scores, datafile[0:1000])
    f2 = job_server.subm it(calc_scores, datafile[1001:2000])
    f3 = job_server.subm it(calc_scores, datafile[2001:3000])
    ....

    and so on

    Obviously this won't work as you cannot access a slice of a csv-file.
    Would it be possible to subclass the csv.reader class in a way that
    you can somewhat efficiently access a slice? Jumping backwards is not
    really necessary, so it's not really random access.

    The obvious way is to do the following:

    buffer = []
    for line in reader:
    buffer.append(l ine)
    if len(buffer) == 1000:
    f = job_server.subm it(calc_scores, buffer)
    buffer = []

    f = job_server.subm it(calc_scores, buffer)
    buffer = []

    but would this not kill my memory if I start loading bigger slices
    into the "buffer" variable?

  • Paul Rubin

    #2
    Re: parallel csv-file processing

    Michel Albert <exhuma@gmail.c omwrites:
    buffer = []
    for line in reader:
    buffer.append(l ine)
    if len(buffer) == 1000:
    f = job_server.subm it(calc_scores, buffer)
    buffer = []
    >
    f = job_server.subm it(calc_scores, buffer)
    buffer = []
    >
    but would this not kill my memory if I start loading bigger slices
    into the "buffer" variable?
    Why not pass the disk offsets to the job server (untested):

    n = 1000
    for i,_ in enumerate(reade r):
    if i % n == 0:
    job_server.subm it(calc_scores, reader.tell(), n)

    the remote process seeks to the appropriate place and processes n lines
    starting from there.

    Comment

    • Marc 'BlackJack' Rintsch

      #3
      Re: parallel csv-file processing

      On Fri, 09 Nov 2007 02:51:10 -0800, Michel Albert wrote:
      Obviously this won't work as you cannot access a slice of a csv-file.
      Would it be possible to subclass the csv.reader class in a way that
      you can somewhat efficiently access a slice?
      An arbitrary slice? I guess not as all records before must have been read
      because the lines are not equally long.
      The obvious way is to do the following:
      >
      buffer = []
      for line in reader:
      buffer.append(l ine)
      if len(buffer) == 1000:
      f = job_server.subm it(calc_scores, buffer)
      buffer = []
      With `itertools.isli ce()` this can be written as:

      while True:
      buffer = list(itertools. islice(reader, 1000))
      if not buffer:
      break
      f = job_server.subm it(calc_scores, buffer)

      Comment

      • Paul Boddie

        #4
        Re: parallel csv-file processing

        On 9 Nov, 12:02, Paul Rubin <http://phr...@NOSPAM.i nvalidwrote:
        >
        Why not pass the disk offsets to the job server (untested):
        >
        n = 1000
        for i,_ in enumerate(reade r):
        if i % n == 0:
        job_server.subm it(calc_scores, reader.tell(), n)
        >
        the remote process seeks to the appropriate place and processes n lines
        starting from there.
        This is similar to a lot of the smarter solutions for Tim Bray's "Wide
        Finder" - a problem apparently in the same domain. See here for more
        details:



        Lots of discussion about more than just parallel processing/
        programming, too.

        Paul

        Comment

        Working...