2017-04-20 4 views
0

hadoop 아카이브 .har 형식의 거대한 데이터가 있습니다. har에는 압축이 포함되어 있지 않으므로 추가로 gzip으로 압축하여 HDFS에 저장하려고합니다. 내가 실수없이 일할 수있는 유일한 것은 :Spark를 사용하여 HDFS에서 Harz 파일을 압축하는 중

harFile.coalesce(1, "true") 
.saveAsTextFile("hdfs://namenode/archive/GzipOutput", classOf[org.apache.hadoop.io.compress.GzipCodec]) 
//`coalesce` because Gzip isn't splittable. 

그러나 이것은 올바른 결과를주지 못합니다. Gzipped 파일이 생성되었지만 잘못된 출력 (rdd 유형을 말하는 한 줄 등)

도움이 되겠습니다. 저는 다른 접근법에 대해서도 열려 있습니다.

감사합니다.

+0

CSV, JSON, 구조화되지 않은 텍스트 (예 : 로그), 바이너리 등 어떤 종류의 콘텐츠가 HAR 보관 파일에 있습니까? 각 HAR을 아카이브 해제하고, 내부의 각 파일을 압축하고, 다시 아카이브하는 것을 고려 했습니까? 바이너리가 아닌 경우 각 HAR (또는 여러 HAR)의 내용을 MR 또는 스파크 작업이있는 단일 GZipped (또는 BZipped) 파일로 병합하는 것이 좋습니다. 구조화되어있는 경우 각 HAR (또는 여러 HAR)의 내용을 Parquet ou ORC와 같은 기둥 형식으로 GZip 압축으로 병합하는 것이 좋습니다. –

+0

@SamsonScharfrichter har에는 평면 텍스트 파일 또는 쪽매 파일이 포함됩니다. xmls와 같은 것은 없지만 데이터를 분할하고 싶지는 않습니다. har에는 350 개 이상의 디렉토리가 있고 각 디렉토리에는 파일이 있기 때문에 각 파일을 gzip하는 것은 문제가됩니다. 어떻게해야할지 모르겠다. 나는 PIG를 사용하여 GZip 압축을 사용하여 그 단일 har 파일을 압축하려고했습니다. 그것은 압축에 성공했지만 GZip이 분할 가능하지 않기 때문에 다시는 바람직하지 않은 부분 파일을 생성했습니다. 마지막으로, 각 har을 gzip으로 분리해야하기 때문에 여러 HAR을 병합 할 수 없습니다. – philantrovert

답변

1

기존 HDFS 파일의 압축 버전을 만드는 Java 코드 스 니펫.

서둘러, 텍스트 편집기에서 Java App의 비트와 조각을 사용하여 작성했습니다. 이전에 작성 했으므로 테스트하지 않았습니다. 일부 오타 및 예상되는 간격.

// HDFS API 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.security.UserGroupInformation; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.FileStatus; 
// native Hadoop compression libraries 
import org.apache.hadoop.io.compress.CompressionCodecFactory; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.Compressor; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.io.compress.BZip2Codec; 
import org.apache.hadoop.io.compress.SnappyCodec; 
import org.apache.hadoop.io.compress.Lz4Codec; 

.............. 

    // Hadoop "Configuration" (and its derivatives for HDFS, HBase etc.) constructors try to auto-magically 
    // find their config files by searching CLASSPATH for directories, and searching each dir for hard-coded 
    // name "core-site.xml", plus "hdfs-site.xml" and/or "hbase-site.xml" etc. 
    // WARNING - if these config files are not found, the "Configuration" reverts to hard-coded defaults without 
    // any warning, resulting in bizarre error messages later > let's run some explicit controls here 
    Configuration cnfHadoop = new Configuration() ; 
    String propDefaultFs =cnfHadoop.get("fs.defaultFS") ; 
    if (propDefaultFs ==null || ! propDefaultFs.startsWith("hdfs://")) 
    { throw new IllegalArgumentException(
       "HDFS configuration is missing - no proper \"core-site.xml\" found, please add\n" 
       +"directory /etc/hadoop/conf/ (or custom dir with custom XML conf files) in CLASSPATH" 
       ) ; 
    } 
/* 
    // for a Kerberised cluster, either you already have a valid TGT in the default 
    // ticket cache (via "kinit"), or you have to authenticate by code 
    UserGroupInformation.setConfiguration(cnfHadoop) ; 
    UserGroupInformation.loginUserFromKeytab("[email protected]", "/some/path/to/user.keytab") ; 
*/ 
    FileSystem fsCluster =FileSystem.get(cnfHadoop) ; 
    Path source = new Path("/some/hdfs/path/to/XXX.har") ; 
    Path target = new Path("/some/hdfs/path/to/XXX.har.gz") ; 

    // alternative: "BZip2Codec" for better compression (but higher CPU cost) 
    // alternative: "SnappyCodec" or "Lz4Codec" for lower compression (but much lower CPU cost) 
    CompressionCodecFactory codecBootstrap = new CompressionCodecFactory(cnfHadoop) ; 
    CompressionCodec codecHadoop =codecBootstrap.getCodecByClassName(GzipCodec.class.getName()) ; 
    Compressor compressorHadoop =codecHadoop.createCompressor() ; 

    byte[] buffer = new byte[16*1024*1024] ; 
    int bufUsedCapacity ; 
    InputStream sourceStream =fsCluster.open(source) ; 
    OutputStream targetStream =codecHadoop.createOutputStream(fsCluster.create(target, true), compressorHadoop) ; 
    while ((bufUsedCapacity =sourceStream.read(buffer)) >0) 
    { targetStream.write(buffer, 0, bufUsedCapacity) ; } 
    targetStream.close() ; 
    sourceStream.close() ; 

.............. 
+0

샘슨에게 감사드립니다. 시도하고 업데이트 해 줄 것입니다. – philantrovert

+0

그래서 시도해 봤지만 작동하지 않았다. 왜냐하면'har' 파일이 디렉토리이고 디렉토리를 압축 할 수 없기 때문이다. HDFS에서'har' ('org.apache.commons.compress' 사용) 대신'타르 (tar) '를 만들고 나서 gzipping을 제안 해 주시겠습니까? – philantrovert

+0

Duh ... HAR가 참 이상한 짐승 인 것 같습니다. 그러나 HDFS가 그것이 디렉토리이고 개별 파일에 액세스 할 수 있다고 말하면 HAR로부터 하나의 ZIP 파일을 만들 수 있어야합니다 (표준'java.util.zip.ZipOutputStream'과'putNextEntry()'사용). 등) - _ 면책 조항 : 나는 훌륭한 TAR 형식의 큰 팬이 아닙니다 ._ –