parsing a file

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • erbrose
    New Member
    • Oct 2006
    • 58

    parsing a file

    Hey all.
    (hopefully) a quick question here. I am processing data using Hadoop Streaming Map/Reduce.. the map.py is straight forward.. basically takes the input data (in the form of sys.stdin), loads it into a list, sorts that list, then... well not exactly sure what hadoop does with that, but pretty sure it creates a temporarly file much like a csv in memory

    Code:
    for line in sys.stdin:
        <append into a list then sort>
    for m in TmpArr:
        print m

    I then have a reduce.py that takes each line from the mysterious hadoop temp file and loads it as sys.stdin... like this...

    Code:
    for line in sys.stdin:
        <load into temporary list and do some stuff>
    my issue is as soon as line = nothing, the process ends... even if there is still data to process. Is there an error checking way to fix this with stdin?
    an example would be this... my table looks like this
    ID---VAL
    01--20
    01--22
    01--25
    02--10
    02--15
    02--17
    03--5
    03--7

    my output SHOULD look like this
    ID---AVG---COUNT
    01--22.3--3
    02--14.0--3
    03--6.0--2

    but its coming out like this
    ID---AVG---COUNT
    01--22.3--3
    02--14.0--3

    Sorry this is so long winded and thanks for any input. Also, i could post my whole code if needed but its a bit long winded too!
    Cheers,
    Eric
  • erbrose
    New Member
    • Oct 2006
    • 58

    #2
    i am messing around with just running the reducer.py with a txt file and am able to process the whole file by adding this
    Code:
    while True:
        line = reader.readline()
        if len(line) != 0:
            <my code here>
        else:
            <repeat my code here>
    seems slightly wrong to have to repeat all my code in the if and the else but it works... am not able to get it to work using sys.stdin...
    Thanks again

    Comment

    • Glenton
      Recognized Expert Contributor
      • Nov 2008
      • 391

      #3
      Okay, that's interesting. In your second example, I assume reader is an object of a text file (ie reader=open("wh atever.txt") or something?).
      If so, it should work to go:

      Code:
      for line in reader:
          <your code here>
      I'm assuming that when the length of the line is zero, you just want to carry on to the next line? So you could do this like this:

      Code:
      for line in reader:
          if len(line)==0: continue
          <your code here>
      I definitely can't see any reason to repeat your code!

      Regarding the sys.stdin, I suppose it goes until it hits a blank line and then the iterator stops running. I've not used this kind of thing, but can imagine that would be problematic. I would suppose that the easier fix is in your reduce.py. Perhaps you can ensure that it goes through the whole hadoop file there before the iterator ends.

      Perhaps you could post reduce.py? I'm guessing that it could be written neatly as a class with an iterator. And that it's not written like that now ;P

      Comment

      • erbrose
        New Member
        • Oct 2006
        • 58

        #4
        Thanks!
        Alright.. well the only reason I check for line == 0 (or in this case line = "") is the actual end of file.. there will be no NULL lines from the map input. I am still having to pretty much duplicate the code as you see. The code is all over the place too as im still in debug mode... but it is working properly with a csv file as the input.. I am calculating the average, standard deviation, median, min and max values too. Will eventually look into Numpy or Scipy, but for now calculating values the old fashion way

        Code:
        #!/usr/bin/python
        import sys
        import math
        TmpArr = []
        Unique = []
        SortArr = []
        OutArr = []
        tmp_avgspd = float(0)
        sqr_sum = float(0)
        i = int(0)
        l = int(0)
        b = int(0)
        c = int(0)
        n = int(0)
        j = int(0)
        k = float(0)
        a = int(0)
        devsum = float(0)
        deviation = []
        reader = open("d:/temp/tmp/sample.csv",'r')
        
        while True:
            line = reader.readline()
        
            line = line.strip()
            StrTemp = line
            TmpArr.append(line.split(','))
            SortArr.append(line.split(','))    
            #if last line.. finish processing stuff in my Unique List
            if line == "":
                l = len(Unique)
                if l == 0:
                    median_val=Unique[l][1]
                    avg_val=Unique[l][1]
                    min_val = Unique[l][1]
                    std_dev=0.0
                elif l == 1:
                    median_val=Unique[l-1][1]
                    avg_val=Unique[l-1][1]
                    std_dev=0.0
                    min_val = Unique[l-1][1]
                    max_val = Unique[l-1][1]
                elif l == 2:
                    for a in Unique:
                        c = c + int(Unique[b][1])
                        b = b + 1
                    avg_val = c/l
                    median_val = c/l
                    tmp_avgspd = float(c)/float(l)
                    b = 0
                    c = 0
                    for a in Unique:
                        deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
                        b = b + 1
                    b = 0    
                    for a in deviation:
                        devsum = devsum + float(deviation[b])
                        b = b + 1
                    devsum = devsum/1.0
                    sqr_sum = math.sqrt(devsum)
                    std_dev = round(sqr_sum,3)
                    min_val = Unique[0][1]
                    max_val = Unique[l-1][1]    
                elif l%2==0:
                    median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
                    for a in Unique:
                        c = c + int(Unique[b][1])
                        b = b + 1
                    avg_val = c/l
                    tmp_avgspd = float(c)/float(l)
                    b = 0
                    c = 0
                    for a in Unique:
                        deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
                        b = b + 1
                    b = 0    
                    devsum = 0.0
                    for a in deviation:
                        devsum = devsum + float(deviation[b])
                        b = b + 1
                    devsum2 = devsum   
                    k = l - 1 
                    devsum = devsum/k
                    sqr_sum = math.sqrt(devsum)
                    std_dev = round(sqr_sum,3)                
                    min_val = Unique[0][1]
                    max_val = Unique[l-1][1] 
                else:
                    median_val=Unique[l/2][1]
                    for a in Unique:
                        d = Unique[b][1]
                        d = int(d)
                        c = c + d
                        b = b + 1
                    avg_val = c/l
                    tmp_avgspd = float(c)/float(l)
                    b = 0
                    c = 0
                    for a in Unique:
                        deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
                        b = b + 1
                    b = 0    
                    devsum = 0.0
                    for a in deviation:
                        devsum = devsum + float(deviation[b])
                        b = b + 1
                    devsum2 = devsum    
                    k = l - 1 
                    devsum = devsum/k
                    sqr_sum = math.sqrt(devsum)
                    std_dev = round(sqr_sum,3)                
                    min_val = Unique[0][1]
                    max_val = Unique[l-1][1] 
                            
                id = Unique[0][0]
                TempString = str(id) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(std_dev) + ',' + str(min_val) + ',' + str(max_val) + ',' + str(l)
                print TempString
                break
            else:
                    
            #if first row go ahead and put into unique array
                if i == 0:
                    Unique.append(line.split(','))        
                else:
                    #print Unique
            
                    if SortArr[i][0]==SortArr[i-1][0]:
                        #StrTemp = str(TmpArr[j][0]) + ',' + str(TmpArr[j][1])
                        StrTemp = str(TmpArr[j][0]) + ',' + str(TmpArr[j][1])
                        Unique.append(StrTemp.split(','))
                    else:
                        
                        l = len(Unique)
                        if l == 0:
                            median_val=Unique[l][1]
                            avg_val=Unique[l][1]
                            min_val = Unique[l][1]
                            max_val = Unique[l][1]
                            std_dev=0.0
                        elif l == 1:
                            median_val=Unique[l-1][1]
                            avg_val=Unique[l-1][1]
                            std_dev=0.0
                            min_val = Unique[l-1][1]
                            max_val = Unique[l-1][1]
                        elif l == 2:
                            for a in Unique:
                                c = c + int(Unique[b][1])
                                b = b + 1
                            avg_val = c/l
                            median_val = c/l
                            tmp_avgspd = float(c)/float(l)
                            b = 0
                            c = 0
                            for a in Unique:
                                deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
                                b = b + 1
                            b = 0    
                            for a in deviation:
                                devsum = devsum + float(deviation[b])
                                b = b + 1
                            devsum = devsum/1.0
                            sqr_sum = math.sqrt(devsum)
                            std_dev = round(sqr_sum,3)
                            min_val = Unique[0][1]
                            max_val = Unique[l-1][1]    
                        elif l%2==0:
                            median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
                            for a in Unique:
                                c = c + int(Unique[b][1])
                                b = b + 1
                            avg_val = c/l
                            tmp_avgspd = float(c)/float(l)
                            b = 0
                            c = 0
                            for a in Unique:
                                deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
                                b = b + 1
                            b = 0    
                            devsum = 0.0
                            for a in deviation:
                                devsum = devsum + float(deviation[b])
                                b = b + 1
                            devsum2 = devsum   
                            k = l - 1 
                            devsum = devsum/k
                            sqr_sum = math.sqrt(devsum)
                            std_dev = round(sqr_sum,3)                
                            min_val = Unique[0][1]
                            max_val = Unique[l-1][1] 
                        else:
                            median_val=Unique[l/2][1]
                            for a in Unique:
                                d = Unique[b][1]
                                d = int(d)
                                c = c + d
                                b = b + 1
                            avg_val = c/l
                            tmp_avgspd = float(c)/float(l)
                            b = 0
                            c = 0
                            for a in Unique:
                                deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
                                b = b + 1
                            b = 0    
                            devsum = 0.0
                            for a in deviation:
                                devsum = devsum + float(deviation[b])
                                b = b + 1
                            devsum2 = devsum    
                            k = l - 1 
                            devsum = devsum/k
                            sqr_sum = math.sqrt(devsum)
                            std_dev = round(sqr_sum,3)                
                            min_val = Unique[0][1]
                            max_val = Unique[l-1][1] 
                                
                        id = Unique[0][0]
                        TempString = str(id) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(std_dev) + ',' + str(min_val) + ',' + str(max_val) + ',' + str(l)
                        print TempString
                        deviation = []
                        a = len(TmpArr)
            
                        Unique = []
                        SortArr = []
                        SortArr.append(StrTemp.split(','))
                        Unique.append(StrTemp.split(','))
                        i = 0
                        b = int(0)
                        avg_val = int(0)
                        median_val = int(0)
                        c = int(0)    
                           
                    
            i = i + 1
            n = n + 1
            j = j + 1

        Comment

        • Glenton
          Recognized Expert Contributor
          • Nov 2008
          • 391

          #5
          Er...I don't really understand what the question is now. What's the pseudocode for what you're trying to do?

          The trickiness of what you're doing is not the calculation, but handling the large dataset, right? If you had a way of converting your keys uniquely into 0,...,n-1 you might find it easier to create an array/list which you just update on the fly.

          Comment

          Working...