저는 Java 전문가는 아니지만 Java의 기초를 알고 항상 Java 코드를 항상 깊이있게 이해하려고 노력합니다. 그것은 정말로 어리석은 의심 일 수 있지만 내 마음 속에서 그것을 분명하게 이해하는 것을 좋아합니다.
Java 커뮤니티 만 게시하고 있습니다. 의심스러운 부분은 Java에 관한 것입니다.래핑 된 유형은 Hadoop에서 어떻게 작동합니까?
지난 몇 달간 나는 hadoop을 사용하여 작업하고 있으며, 그 자체의 유형을 사용한다.이 유형은 직렬화와 비 직렬화를 기반으로 네트워크를 통해 데이터를 전송하는 효율성을 높이기 위해 Java의 기본 유형을 감싸고있다. 내 혼란이 여기에서 시작
, 우리는이 LongWritable, 텍스트, IntWritable처럼 HDFS의 일부 데이터가이 코드 하둡의 유형에서 하둡 코드
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper
{
extends Mapper<LongWritable,Text,Text,IntWritable>
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
}
}
String line = value.toString();
for (String word : line.split(" ")){
if(word.length()>0){
context.write(new Text(word),new IntWritable(1));
}
에서 실행되는 다음과 같은 자바 코드를 사용하여 처리 할 수 있다고 가정 할 수 있습니다.
자바의 문자열 유형을 감싸는 텍스트 유형을 선택합니다 (잘못된 경우 수정하십시오). 우리는 이러한 매개 변수 아래
import package i.e org.apache.hadoop.io.Text;
에있는 코드와 상호 작용 얻는 방법을 위의 코드, 우리의 방법지도에 이러한 매개 변수를 전달하는 경우 여기
내 의심의 여지가 텍스트 클래스 코드
package org.apache.hadoop.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.MalformedInputException;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.Arrays;
import org.apache.avro.reflect.Stringable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Stringable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Text
extends BinaryComparable
implements WritableComparable<BinaryComparable>
{
private static final Log LOG = LogFactory.getLog(Text.class);
private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal()
{
protected CharsetEncoder initialValue() {
return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
}
};
private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal()
{
protected CharsetDecoder initialValue() {
return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
}
};
private static final byte[] EMPTY_BYTES = new byte[0];
private byte[] bytes;
private int length;
public Text()
{
bytes = EMPTY_BYTES;
}
public Text(String string)
{
set(string);
}
public Text(Text utf8)
{
set(utf8);
}
public Text(byte[] utf8)
{
set(utf8);
}
public byte[] getBytes()
{
return bytes;
}
public int getLength()
{
return length;
}
public int charAt(int position)
{
if (position > length) return -1;
if (position < 0) { return -1;
}
ByteBuffer bb = (ByteBuffer)ByteBuffer.wrap(bytes).position(position);
return bytesToCodePoint(bb.slice());
}
public int find(String what) {
return find(what, 0);
}
public int find(String what, int start)
{
try
{
ByteBuffer src = ByteBuffer.wrap(bytes, 0, length);
ByteBuffer tgt = encode(what);
byte b = tgt.get();
src.position(start);
while (src.hasRemaining()) {
if (b == src.get()) {
src.mark();
tgt.mark();
boolean found = true;
int pos = src.position() - 1;
while (tgt.hasRemaining()) {
if (!src.hasRemaining()) {
tgt.reset();
src.reset();
found = false;
}
else if (tgt.get() != src.get()) {
tgt.reset();
src.reset();
found = false;
}
}
if (found) return pos;
}
}
return -1;
}
catch (CharacterCodingException e) {
e.printStackTrace(); }
return -1;
}
public void set(String string)
{
try
{
ByteBuffer bb = encode(string, true);
bytes = bb.array();
length = bb.limit();
} catch (CharacterCodingException e) {
throw new RuntimeException("Should not have happened " + e.toString());
}
}
public void set(byte[] utf8)
{
set(utf8, 0, utf8.length);
}
public void set(Text other)
{
set(other.getBytes(), 0, other.getLength());
}
public void set(byte[] utf8, int start, int len)
{
setCapacity(len, false);
System.arraycopy(utf8, start, bytes, 0, len);
length = len;
}
public void append(byte[] utf8, int start, int len)
{
setCapacity(length + len, true);
System.arraycopy(utf8, start, bytes, length, len);
length += len;
}
public void clear()
{
length = 0;
}
private void setCapacity(int len, boolean keepData)
{
if ((bytes == null) || (bytes.length < len)) {
if ((bytes != null) && (keepData)) {
bytes = Arrays.copyOf(bytes, Math.max(len, length << 1));
} else {
bytes = new byte[len];
}
}
}
public String toString()
{
try
{
return decode(bytes, 0, length);
} catch (CharacterCodingException e) {
throw new RuntimeException("Should not have happened " + e.toString());
}
}
public void readFields(DataInput in)
throws IOException
{
int newLength = WritableUtils.readVInt(in);
setCapacity(newLength, false);
in.readFully(bytes, 0, newLength);
length = newLength;
}
public static void skip(DataInput in) throws IOException
{
int length = WritableUtils.readVInt(in);
WritableUtils.skipFully(in, length);
}
public void write(DataOutput out)
throws IOException
{
WritableUtils.writeVInt(out, length);
out.write(bytes, 0, length);
}
public boolean equals(Object o)
{
if ((o instanceof Text))
return super.equals(o);
return false;
}
입니다 위의 hadoop 코드를 실행할 때 HDFS의 데이터가 map 메소드에서 언급 한 매개 변수를 통해 흐를 때 알려주시겠습니까?
일단 HDFS에서 첫 번째 데이터 세트가 Text 매개 변수에 도달하면 org.apache.hadoop.io.Text 클래스 내부로 어떻게 흐르고 있습니까?
어디에서부터 시작합니까? (지도 메서드와 동일한 매개 변수가 있기 때문에 클래스의 set 메서드에서 시작한다고 가정합니다.
어디에서 일반 문자열 형식이 Text로 바뀌나요? 코드를 입력 하시겠습니까?
내 두 번째 의심 : 데이터가 텍스트 형식으로 저장되면 누가 그것을 실행하여 필사적 작업을 시작합니까? 나는 누구가이 write (DataOutput out)을 호출하고, 데이터가 네트워크상의 해당 목적지에 도달하면 누가 readFields (DataInput in)를 호출했는지 의미합니까?
어떻게 작동하며 어디에서 볼 필요가 있습니까?
나는 분명히 묻고있는 것이 희망이다.