python and hadoop

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • erbrose
    New Member
    • Oct 2006
    • 58

    python and hadoop

    Anyone here using python and hadoop streaming to process data? I am just dipping my toes into both and struggling. All the example on the web that i've found are the 'Word Count' examples. would love to find more examples. I have a specific example of what i need to achieve and have been able to solve that example by writing the python code, but it currently only works on csv files...
    Thanks ahead of time for anyone interested in helping or pointing me in the right direction.
    Cheers,
    Eric
  • bvdet
    Recognized Expert Specialist
    • Oct 2006
    • 2851

    #2
    I have never heard of hadoop streaming. What kind of data is it, and how do you want to process it? If you could show us an example of the data, maybe we can help.

    Comment

    • Glenton
      Recognized Expert Contributor
      • Nov 2008
      • 391

      #3
      Yeah, I've also not come across hadoop. But we've often been able to help without knowing about all of it! Give us your code and more description of what you're trying to do, and why it doesn't work.

      Comment

      • erbrose
        New Member
        • Oct 2006
        • 58

        #4
        Thanks for both of your replies!!!!
        Well here is the deal. I have a couple terabytes worth of text (csv) data that i need to loop through. The data is currently stored on a hadoop cluster... which basically breaks the data up into smaller chunks and distributes it across multiple machines, so when you run a python job via hadoop streaming, it processes the job on all the different machines, thus (in theory) speeding up the process via a Map and Reduce process. My sample data has an unique identifier and an integer value that will be used to create averages, medians values, standard devious, percentiles, and (eventually) some other important stats. Again, i am just dipping my toes into python, so you have to forgive my lengthy code! I know there are going to be tons of shortcuts that hopefully I will learn!. Here is what i have so far.
        Basically I have one text file that contains a unique set of ID's.

        ID
        101
        102

        Then my 'big' data set looks something like this (but it is not sorted like this ie the ID's are all over the place)

        ID----INT
        101---23
        101---25
        101---20
        101---19
        101---30
        101---28
        101---27
        101---27
        101---23
        102---30
        102---28
        102---27
        102---27
        102---23

        Right now my code is NOT using any math or sci modules (which I am sure will be the way to go to smooth this process out) but for now I am manually calculating the values just to better understand python.


        Code:
        import time
        
        unique = open("d:/temp/tmp/ID.txt", 'r')
        writer = open("d:/temp/tmp/ID_out.txt", 'w')
        t1 = time.clock()
        avg_val=float()
        b = int(0)
        c = int(0)
        i = int(0)
        j = int(0)
        Arr = []
        SortArr = []
        k = int(0)
        l = int(0)
        LoopArr = []
        LoopCntr = int(0)
        for ID in unique:
            ID = ID.strip()
            LoopArr.append(ID)    
        unique.close() 
        
        for m in LoopArr:
            reader = open("d:/temp/tmp/sample.txt",'r')
        
            while True:        
                line = reader.readline()
                    
                if len(line) != 0:
                    line = line.strip()
        
                    TmpArr = line.split(',')
        
                    if m == TmpArr[0]:
                        StrTmp=str(TmpArr[0])+','+str(TmpArr[1])+','+str('1')
                        SortArr.append(StrTmp.split(','))
                        i = i + 1 
                else:                   
                    break            
                    
            reader.close()
            SortArr.sort(key=lambda a: a[1])
            median_val=int(0)
            if i == 0:
                median_val=SortArr[i][1]
                avg_val=SortArr[i][1]
        
            elif i == 1:
                
                for a in SortArr:
                    c = c + int(SortArr[b][1])
                    b = b + 1
                median_val = c/i
                avg_val = c/i
                     
            elif i == 2:
                median_val=SortArr[i-1][1]
                for a in SortArr:
                    c = c + int(SortArr[b][1])
                    b = b + 1
                        
                avg_val = c/i
            
            elif i%2==0:
                median_val=(int(SortArr[i/2][1])+int(SortArr[(i/2)+1][1]))/2
                for a in SortArr:
                    c = c + int(SortArr[b][1])
                    b = b + 1
                        
                avg_val = c/i        
            else:
                median_val=int(SortArr[i/2][1])
                for a in SortArr:
                    c = c + int(SortArr[b][1])
                    b = b + 1
        
                avg_val = c/i
            ID = m
            SortArr = []
            TempString = str(ID) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(i) + '\n'
            writer.write(TempString)
            i = int(0)    
            b = 0
            avg_val = 0
            median_val = 0
            c = 0
        writer.close()
        t2 = time.clock() - t1
        print t2
        Sorry my code is not commented very well at the moment, basically because I keep trying new things!
        here is a link for the basic "word count" using python and hadoop

        its really the only CLEAR example that i've been able to find where someone is using python and hadoop, although I know there are alot of folks out there using it too!
        Thanks a bunch for helping me in any way shape or form!
        -Eric

        Comment

        • Glenton
          Recognized Expert Contributor
          • Nov 2008
          • 391

          #5
          Okay, so it's not entirely clear what the problem is, but it doesn't look like it will run too efficiently if at all at the moment!

          The first question is how many unique IDs are there?

          If there's a reasonable number then an approach would be to do the following:
          1. set up an array with a sorted list of the IDs
          2. go through the large data set a line at a time ("for line in file:" is the normal way of doing this).
          3. using each line find out the ID, use a bisect sort to find the appropriate ID, and updating each of the calculations for what you want to do.
          4. save every so often to a file

          But if there's an unreasonable number of IDs then we'd have to reconsider...

          Let us know.

          Comment

          • erbrose
            New Member
            • Oct 2006
            • 58

            #6
            Thanks again for your reply!
            Well i did some more research yesterday and it appears that with hadoop Map Reduce (via Streaming) you can set the Reduce to NONE, so that you only have to execute one script. So on that note... I am playing around with a sample data set of 39 Million rows. My sample contains ~20k unique ID's, in the real Terabytes worth of data there were will roughly 27 Million unique ID. Yesterday I was able to do a simple process that looped through the sample and put it into an array, sorted that array by the ID and VAL, then printed the sorted array out to a file via this.

            Code:
            TmpArr = []
            for line in sys.stdin:
                line = line.strip()
                TmpArr.append(line.split(','))
            TmpArr.sort(key=lambda a:(a[0],int(a[1])))
            for row_ in TmpArr:
                print (row_)
            Thats all good for my first hadoop streaming job! I then got the idea to try and create two files (a sorted list and a uniuqe dict of ID's.)

            Code:
            TmpArr = []
            Unique = {}
            for line in sys.stdin:
                line = line.strip()
                TmpArr.append(line.split(','))
                ID,VAL = line.split(',',1)
                Unique[ID] = Unique.get(ID,0)
            TmpArr.sort(key=lambda a:(a[0],int(a[1])))
            for row_ in TmpArr:
                print (row_)
            Again this worked just great. My next thought was to create two loops, the main looping through my Unique Dict, then for each row loop through my TmpArr and do all my math, but for some reason... hadoop doesn't like that, kept getting a time-out error.

            Code:
            TmpArr = []
            Unique = {}
            for line in sys.stdin:
                line = line.strip()
                TmpArr.append(line.split(','))
                ID,VAL = line.split(',',1)
                Unique[ID] = Unique.get(ID,0)
            TmpArr.sort(key=lambda a:(a[0],int(a[1])))
            
            for m in Unique:
                try:
                    for n in TmpArr:
                        #do a bunch of math
                except ValueError:
                    pass
            for row_ in TmpArr:
                print (row_)
            today I am going to just try and loop through my sorted TmpArr and do some simple if statements to say something like if TmpArr[i]==TmpArr[i+1] then output that value to another Temp Array. Will let you know (hopefully by the end of the day) how this process goes.

            Comment

            • erbrose
              New Member
              • Oct 2006
              • 58

              #7
              Hey all!
              Well, i am now able to run this code on my sample csv file (3 million rows) on my desktop. It completes in under 1 minute. Which is great! Still having issues on the hadoop end, but i think that problem is not for this forum. Would still appreciate any suggestions or improvements on the code itself as im still very much a newbie!

              Code:
              import time
              t1 = time.clock()
              TmpArr = []
              Unique = []
              SortArr = []
              OutArr = []
              i = int(0)
              l = int(0)
              b = int(0)
              c = int(0)
              n = int(0)
              reader = open("d:/temp/tmp/Input.csv",'r')
              writer = open("d:/temp/tmp/Out.csv",'w')
              
              
              for line in reader:
                  line = line.strip()
                  TmpArr.append(line.split(','))
              TmpArr.sort(key=lambda a:(a[0],int(a[1])))
              
              reader.close()
              
              for m in TmpArr:
                  
                  StrTemp = str(TmpArr[n][0]) + ',' + str(TmpArr[n][1])
                  SortArr.append(StrTemp.split(','))    
                  #if first row go ahead and put into unique array
                  if i == 0:        
                      Unique.append(StrTemp.split(','))        
                  else:
                      if SortArr[i][0]==SortArr[i-1][0]:
                          StrTemp = str(TmpArr[i][0]) + ',' + str(TmpArr[i][1])
                          Unique.append(StrTemp.split(','))
                      else:
                          try:
                              l = len(Unique)
                              if l == 0:
                                  median_val=Unique[l][1]
                                  avg_val=Unique[l][1]
                              elif l == 1:
                                  for a in Unique:
                                      c = c + int(Unique[b][1])
                                      b = b + 1
                                  median_val = c/l
                                  avg_val = c/l
                              elif l == 2:
                                  median_val=Unique[l-1][1]
                                  for a in Unique:
                                      c = c + int(Unique[b][1])
                                      b = b + 1
                      
                                  avg_val = c/l
                      
                              elif l%2==0:
                                  median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
                                  for a in Unique:
                                      c = c + int(Unique[b][1])
                                      b = b + 1
                      
                                  avg_val = c/l
                              else:
                                  median_val=int(Unique[l/2][1])
                                  for a in Unique:
                                      c = c + int(Unique[b][1])
                                      b = b + 1
                      
                                  avg_val = c/l
                                  
                              ID = Unique[0][0]
                              TempString = str(ID) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(l) + '\n'
                              writer.write(TempString)
                              Unique = []
                              SortArr = []
                              SortArr.append(StrTemp.split(','))
                              Unique.append(StrTemp.split(','))
                              OutArr.append(TempString.split(','))
                              i = 0
                              b = int(0)
                              avg_val = int(0)
                              median_val = int(0)
                              c = int(0)    
                          except ValueError:
                              pass
                          
                              
                          
                  i = i + 1
                  n = n + 1        
              
                  
              t2 = time.clock() - t1
              writer.write(str(t2))
              writer.close()

              Comment

              • bvdet
                Recognized Expert Specialist
                • Oct 2006
                • 2851

                #8
                Good work erbrose! I have a few minor comments.

                There is no need in initializing variables as integer. i = 0 and i = int(0) will give you identical results.

                It looks like you are using variable n as an index to your list TmpArr. You don't need it. By iterating on the list, you can do this:
                Code:
                for item in TmpArr:
                    StrTemp = str(item[0]) + ',' + str(item[1])
                I like to use item for the variable in that situation. If you need an index for another reason:
                Code:
                for j, item in enumerate(TmpArr):
                Augmented assignment:
                Code:
                    i += 1
                Keep up the good work!

                Comment

                • erbrose
                  New Member
                  • Oct 2006
                  • 58

                  #9
                  one last post (on this subject anyhow!) just wanted to let you know that I was able to complete my first Map/Reduce job on Hadoop with Python! Thanks again for all your help!

                  Comment

                  Working...