2013-03-04 3 views
1

HAR 아카이브를 생성하고이 아카이브에서 데이터를 읽어야하는 MR 작업을 실행하는 워크 플로우를 작성했습니다. 1. 아카이브가 생성됩니다. 2. 작업이 실행될 때 매퍼는 분산 캐시에 아카이브를 표시합니다. 3. 3. ??? 어떻게이 arhive를 읽을 수 있습니까? 이 아카이브에서 한 줄씩 데이터를 읽는 API는 무엇입니까? (my har는 여러 행으로 분리 된 텍스트 파일의 묶음입니다.) NB : DistirubtedCache에 저장된 일반 파일 (HAR 아카이브 아님)로 작업 할 때 완벽하게 작동합니다. HAR에서 데이터를 읽으려고 할 때 문제가 있습니다. 당신이 볼 수 있듯이mapreduce의 DistributedCache에서 HAR 파일 읽기

InputStream inputStream; 
    String cachedDatafileName = System.getProperty(DIST_CACHE_FILE_NAME); 
    LOG.info(String.format("Looking for[%s]=[%s] in DistributedCache",DIST_CACHE_FILE_NAME, cachedDatafileName)); 

    URI[] uris = DistributedCache.getCacheArchives(getContext().getConfiguration()); 
    URI uriToCachedDatafile = null; 
    for(URI uri : uris){ 
     if(uri.toString().endsWith(cachedDatafileName)){ 
      uriToCachedDatafile = uri; 
      break; 
     } 
    } 
    if(uriToCachedDatafile == null){ 
     throw new RuntimeConfigurationException(String.format("Looking for[%s]=[%s] in DistributedCache failed. There is no such file", 
       DIST_CACHE_FILE_NAME, cachedDatafileName)); 
    } 

    Path pathToFile = new Path(uriToCachedDatafile); 
    LOG.info(String.format("[%s] has been found. Uri is: [%s]. The path is:[%s]",cachedDatafileName, uriToCachedDatafile, pathToFile)); 

    FileSystem fileSystem = pathToFile.getFileSystem(getContext().getConfiguration()); 
    HarFileSystem harFileSystem = new HarFileSystem(fileSystem); 
    inputStream = harFileSystem.open(pathToFile); //NULL POINTER EXCEPTION IS HERE! 
    return inputStream; 

답변

0
protected InputStream getInputStreamToDistCacheFile() throws IOException{ 
     InputStream inputStream; 
     String cachedDatafileName = System.getProperty(DIST_CACHE_FILE_NAME); 
     LOG.info(String.format("Looking for[%s]=[%s] in DistributedCache",DIST_CACHE_FILE_NAME, cachedDatafileName)); 

     URI[] uris = DistributedCache.getCacheArchives(getContext().getConfiguration()); 
     URI uriToCachedDatafile = null; 
     for(URI uri : uris){ 
      if(uri.toString().endsWith(cachedDatafileName)){ 
       uriToCachedDatafile = uri; 
       break; 
      } 
     } 
     if(uriToCachedDatafile == null){ 
      throw new RuntimeConfigurationException(String.format("Looking for[%s]=[%s] in DistributedCache failed. There is no such file", 
        DIST_CACHE_FILE_NAME, cachedDatafileName)); 
     } 

     //Path pathToFile = new Path(uriToCachedDatafile +"/stf/db_bts_stf.txt"); 
     Path pathToFile = new Path("har:///"+"home/ssa/devel/megalabs/kyc-solution/kyc-mrjob/target/test-classes/GSMCellSubscriberHomeIntersectionJobDescriptionClusterMRTest/in/gsm_cell_location_stf.har" +"/stf/db_bts_stf.txt"); 
     //Path pathToFile = new Path(("har://home/ssa/devel/megalabs/kyc-solution/kyc-mrjob/target/test-classes/GSMCellSubscriberHomeIntersectionJobDescriptionClusterMRTest/in/gsm_cell_location_stf.har")); 

     LOG.info(String.format("[%s] has been found. Uri is: [%s]. The path is:[%s]",cachedDatafileName, uriToCachedDatafile, pathToFile)); 
     FileSystem harFileSystem = pathToFile.getFileSystem(context.getConfiguration()); 
     FSDataInputStream fin = harFileSystem.open(pathToFile); 
     LOG.info("fin: " + fin); 
//  FileSystem fileSystem = pathToFile.getFileSystem(getContext().getConfiguration()); 
//  HarFileSystem harFileSystem = new HarFileSystem(fileSystem); 
//  harFileSystem.exists(new Path("har://home/ssa/devel/mycompany/my-solution/my-mrjob/target/test-classes/HomeJobDescriptionClusterMRTest/in/locations.har")); 
//  LOG.info("harFileSystem.exists(pathToFile):"+ harFileSystem.exists(pathToFile)); 
//  harFileSystem.initialize(uriToCachedDatafile, context.getConfiguration()); 



     FileStatus[] statuses = harFileSystem.listStatus(new Path("har:///"+"har://home/ssa/devel/mycompany/my-solution/my-mrjob/target/test-classes/HomeJobDescriptionClusterMRTest/in/locations.har")); 
     for(FileStatus fileStatus : statuses){ 
      LOG.info("fileStatus isDir"+fileStatus.isDirectory() +" len:" + fileStatus.getLen()); 
     } 

//  String tmpPathToFile = "har:///"+pathToFile.toString(); //+"/stf/db_bts_stf.txt"; 
//  Path tmpPath = new Path(tmpPathToFile); 
//  LOG.info("KILL ME PATH TO FILE IN ARCHIVE: " +tmpPath); 
//  inputStream = harFileSystem.open(tmpPath); 
//  return inputStream; 
     return fin; 
    } 

, 그것은 끔찍 : 여기

는 코드입니다. 아카이브 내부에 저장된 인덱스 파일을 수동으로 읽고 인덱스 파일 메타 데이터를 사용하여 경로를 재구성했습니다. 아카이브에 저장된 파일의 정확한 이름을 아는 경우 (예와 같이) 수동으로 경로를 구성 할 수 있습니다.

그것은 convinient가 아닙니다. Zip-> zipEntry와 같은 것을 기대했습니다. 구조를 알지 못해도 아카이브 항목을 반복 할 수 있습니다.