2013-07-08 3 views
0

고급 자습서의 innerjoin-example을 수정하여 mapreduce (Ullman에 의해 설명 됨)를 사용하여 희소 행렬 곱셈을 만듭니다. 따라서 결과 행렬에서 동일한 위치의 값을 합산하는 두 번째 map-reduce 단계가 필요합니다.Disch (MapReduce)의 체인 작업

저는 불행히도 CsvInnerJoin 클래스의 첫 번째 reduce 함수의 출력을 SumJob의 맵 함수로 가져올 수 없습니다.

import sys 
sys.path.append("/home/damian/disco/lib/") 
from disco.core import Job, result_iterator 
from disco.worker.classic.func import chain_reader 
import csv, sys 


class SumJob(Job): 
    map_reader = staticmethod(chain_reader) 

    @staticmethod 
    def map(self,key_value, params): 
     print "KEY::::::",str(key_value[0]) 
     print "VAL::::::",str(key_value[1]) 
     yield key_value[0], key_value[1] 
    @staticmethod  
    def reduce(self,key_value,out, params): 
     Summe = sum(key_value[1]) 
     out.add(key_value[0],Summe) 

문제는 난을 변경하는 방법을 알고 (**)가 없다는 것입니다 :

import sys 
sys.path.append("/home/damian/disco/lib/") 
from disco.core import Job, result_iterator 
from disco.worker.classic.func import chain_reader 
import csv, sys 
class CsvInnerJoiner(Job): 
    partitions = 2 
    sort = True 

    def map(self, row, params): 
     yield row[0], row[1:] 

    @staticmethod 
    def map_reader(fd, size, url, params): 
     reader = csv.reader(fd, delimiter=',') 
     for row in reader: 
      yield row 

    #@staticmethod 
def reduce(self, rows_iter, out, params): 
    from disco.util import kvgroup 
    from itertools import chain 
    #for url_key, descriptors in kvgroup(sorted(rows_iter)): 
    for url_key, descriptors in kvgroup(rows_iter): 
     merged_descriptors = list(chain.from_iterable(descriptors)) 
     print url_key,"_______",merged_descriptors 
     if len(merged_descriptors) > 3: 
      Alist = merged_descriptors[:merged_descriptors.index("B")] 
      Blist = merged_descriptors[merged_descriptors.index("B"):] 
      Alistlength = len(Alist)/3 
      Blistlength = len(Blist)/3 
      for i in range(Alistlength): 
       for j in range(Blistlength): 
        container = int(Alist[3*i+2])*int(Blist[3*j+2]) 
        yield [Alist[3*i+1],Blist[3*j+1]],container 
        #out.add(Alist[3*i+1],[Blist[3*j+1],container])   

SumJob.py이 있습니다 :

import sys 
sys.path.append("/home/damian/disco/lib/") 
from disco.core import Job, result_iterator 
from disco.worker.classic.func import chain_reader 
import csv, sys 


if __name__ == '__main__': 
    input_filename = "input.csv" 
    output_filename = "output.csv" 
    if len(sys.argv) > 1: 
     input_filename = sys.argv[1] 
     if len(sys.argv) > 2: 
      output_filename = sys.argv[2] 

    from CsvInnerJoiner import CsvInnerJoiner 
    from SumJob import SumJob 

    job = CsvInnerJoiner().run(input=[input_filename]) 
    job = SumJob().run() (******************) 

    with open(output_filename, 'w') as fp: 
     writer = csv.writer(fp) 
     for url_key, descriptors in result_iterator(job.wait(show=True)): 
      writer.writerow([url_key] + descriptors) 

CsvInnerJoiner.py이 파일입니다 상기 제 1 감축 단계의 상기 제 2 출력이 상기 제 2 맵 기능에 의해 입력으로 취해 지도록 상기 라인을 선택하는 단계를 더 포함하는 방법.

도움을 주셔서 감사합니다. Damian

답변

0

map/reduce 단계의 출력을 다른 출력의 입력으로 사용할 수 있습니다 (job.wait()에서 반환). 나는 코드의 덩어리로 전문가가 아니에요

job1 = SumJob().run(input=[...]) 
job2 = SumJob().run(input=[...]) 

output = SomeOtherJob.run(input=[job1.wait(), job2.wait()]).wait(show=True) 
for key, value in result_iterator(output): 
    print key, value 

나를 위해 작동 (나는 많은 단계를 여러 번 반복이있는 pagerank 알고리즘을 구현).