매일 다른 사람들로부터 많은 gzip 파일 (* .gz)을 받고 HDFS에 넣고 분석하기 전에 모든 파일 (손상된 파일이 삭제됨)의 무결성을 확인해야합니다. gzip -t file_name 로컬 컴퓨터에서 확인하려면 작동하지만 전체 파일 크기가 매우 커서 전체 프로세스가 너무 느리고 대부분의 파일이 로컬 유효성 검사에 많은 시간을 필요로합니다.대형 gzip 파일 (.gz)의 무결성을 검사하기 위해 Hadoop 작업을 어떻게 사용할 수 있습니까?
에서 :
그래서 내가 병렬 검증을 할 수있는 하둡 작업을 사용하도록 설정되어, 각 파일은 매퍼에서 확인되고, 손상된 파일 경로가 파일로 출력됩니다, 여기 내 코드입니다 하둡 작업 설정 : 매퍼Job job = new Job(getConf());
job.setJarByClass(HdfsFileValidateJob.class);
job.setMapperClass(HdfsFileValidateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(JustBytesInputFormat.class);
:
public class HdfsFileValidateMapper extends Mapper<JustBytesWritable, NullWritable, Text, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(HdfsFileValidateJob.class);
private ByteArrayOutputStream bos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
/* specify a split size(=HDFS block size here) for the ByteArrayOutputStream, which prevents frequently allocating
* memory for it when writing data in [map] method */
InputSplit inputSplit = context.getInputSplit();
bos = new ByteArrayOutputStream((int) ((FileSplit) inputSplit).getLength());
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); // e.g. "/user/hadoop/abc.txt"
bos.flush();
byte[] mergedArray = bos.toByteArray(); // the byte array which stores the data of the whole file
if (!testUnGZip(mergedArray)) { // broken file
context.write(new Text(filePath), NullWritable.get());
}
bos.close();
}
@Override
public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
bos.write(key.getBytes());
}
/**
* Test whether we can un-gzip a piece of data.
*
* @param data The data to be un-gzipped.
* @return true for successfully un-gzipped the data, false otherwise.
*/
private static boolean testUnGZip(byte[] data) {
int numBytes2Read = 0;
ByteArrayInputStream bis = null;
GZIPInputStream gzip = null;
try {
bis = new ByteArrayInputStream(data);
gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num;
while ((num = gzip.read(buf, 0, buf.length)) != -1) {
numBytes2Read += num;
if (numBytes2Read % (1024 * 1024) == 0) {
LOG.info(String.format("Number of bytes read: %d", numBytes2Read));
}
}
} catch (Exception e) {
return false;
} finally {
if (gzip != null) {
try {
gzip.close();
} catch (IOException e) {
LOG.error("Error while closing GZIPInputStream");
}
}
if (bis != null) {
try {
bis.close();
} catch (IOException e) {
LOG.error("Error while closing ByteArrayInputStream");
}
}
}
return true;
}
}
어떤에서 내가 여기에서 찾을 수 있습니다 JustBytesInputFormat 및 JustBytesWritable라는 두 개의 클래스, 사용 https://issues.apache.org/jira/secure/attachment/12570327/justbytes.jar
보통을,이 솔루션은 잘 작동하지만 단일 GZIP 파일이 충분히 큰 경우 (예를 들어,. 1.5G) Hadoop 작업은 Java 힙 공간 문제로 인해 실패합니다. 이유는 분명합니다. 각 파일에 대해 먼저 모든 데이터를 메모리 버퍼에 수집하고 마지막에 일회성 유효성 검사를 수행하므로 파일 크기가 너무 클 수 없습니다. 그것이 잘 작성하지 못했습니다이 모든에서 작동 않는 문제를 실패하지만
private boolean testUnGzipFail = false;
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); // e.g. "/user/hadoop/abc.txt"
if (testUnGzipFail) { // broken file
context.write(new Text(filePath), NullWritable.get());
}
}
@Override
public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
if (!testUnGZip(key.getBytes())) {
testUnGzipFail = true;
}
}
이 버전은 하둡 작업을 해결 :
그래서 나는 내 코드의 일부를 수정! 내 E2E 테스트에서 완전히 좋은 gzip 파일 (크기 : 1.5G)은 손상된 파일로 취급됩니다!내 문제는 여기에 있습니다. 어떻게 유효성 검사를 올바르게 수행하고 단일 파일의 모든 내용을 메모리로 읽지 않아도됩니까?
어떤 아이디어라도 감사 할 것입니다. 미리 감사드립니다.
병렬로 gzip -t를 호출하는 것이 좋은 방법이지만 로컬 파일 경로가 필요하며 Hadoop 작업에서 읽는 매퍼의 파일은 로컬 fs에 없지만 원격 시스템에서 읽을 수 있으므로 가능하지 않다. – celt
Hadoop에는 'InputStream'(또는 청크로 파일을 읽는 다른 방법)을 제공하는 API가 있어야합니다. 그것을 사용하십시오. 또한 파일이 원격 시스템에있는 경우 동일한 CPU에서 작업을 실행해야합니다. 네트워크를 통해 파일을 복사하는 것은 비용이 많이 듭니다. –
손상된 데이터를 Hadoop에 업로드하기 전에 "확인"단계를 진행할 것을 제안합니다. Hadoop을 사용하면 클러스터에서 작업을 쉽게 배포 할 수 있지만 비용을 절감하지는 못합니다. 실제로 Hadoop은 로컬에서 데이터를 처리하는 것보다 훨씬 느립니다. 그러나 동시에 여러 작업을 실행할 수 있으므로 두 가지 목표의 균형을 맞출 수 있습니다. 가능하면 파일을 업로드하는 동안 파일을 검사하여 손상된 파일의 디스크 공간을 낭비하지 마십시오. –