15GB보다 큰 파일을 전송할 때 데이터 전송 속도에 문제가 있습니다. 3 대의 서버와 1 대의 클라이언트가 있습니다. 클라이언트에서 서버로 파일을 전송할 때 파일을 블록 (일반적으로 각 블록은 256MB)으로 분할하고 각 블록을 2 개의 서버에 복제합니다. 복제는 파이프 라인 방법에서 발생합니다. 블록을 보낼 때 각 블록은 더 작은 패킷 (각 패킷은 일반적으로 128KB)으로 슬라이스되고 서버로 전송되며 서버 측에서 병합되어 하드 드라이브에 저장됩니다. 여기에는 모든 것이 좋습니다. 필자는 5GB에서 50GB까지 파일을 5GB 단위로 테스트했습니다. 평균 쓰기는 모든 파일에 대해 약 600MB/초입니다. 아래 차트를 참조하십시오. 여기서 나는 HDFS와 비교하고있다. 서버에서 동일한 파일을 읽을 때큰 파일 전송을 위해 15GB 이후에 데이터 전송 속도가 느려짐
문제가 발생합니다. 파일은 여러 서버에 분산되어 있습니다. 예를 들어 server1의 block1, server2의 block2 등을 읽을 수 있습니다. 직관적으로 클라이언트는 3 대의 서버를 병렬로 읽으므로 읽기는 쓰기보다 빠를 수 없습니다. {15GB, 10GB, 15GG}보다 작은 파일을 읽을 때 성능은 약 1.1GB/초입니다. 문제는 20GB (20GB, 25GB, ..., 50GB)보다 큰 파일을 읽을 때 발생합니다. 파일 크기가 커질수록 성능이 떨어집니다.
위 그림
는 50기가바이트 파일을 읽기위한 벤치 마크 테스트를 보여줍니다. 검은 색 점은 개별 블록 읽기 시간을 나타냅니다. 보시다시피, 성능은 60-70 블록 후에 감소하기 시작합니다. 흥미롭게도이 문제는 15GB를 초과하는 모든 파일에서 발생하며 같은 장소 (약 65 번째 블록) 주변에서 속도가 느려집니다. 파일 크기가 커짐에 따라 느린 부분이 지배적이며 성능이 악화되고 있습니다. 16GB 정도의 장애물이있는 것 같습니다. 내가 볼 수있는 유일한 힌트는 세 개의 서버가 약 65 번까지 블록을 임의로 병렬로 전송한다는 것입니다. 따라서 블록 전송은 겹쳐 있습니다. 그런 다음 한 서버가 한 번에 라운드 로빈 순서로 보냅니다. 나는 로그 결과에서 이것을 볼 수있다. 여전히 여기에 중복되는 부분이 있지만 65 번째 블록 이전에는 그렇지 않습니다.이 프로젝트에는 Java 1.8을 사용하고 netty 4.1.8을 사용하고 있습니다. TCP 서버로. OS는 CentOS 7입니다. 각 서버에는 2 개의 CPU (Intel (R) Xeon (R) CPU E5-2650 v3 @ 2.30GHz) = 40 코어가 있습니다. 64GB RAM 10GBit 이더넷.
많은 시간을 들여 문제의 근본 원인을 찾을 수 없습니다. Java VM, Netty, OS, OS TCP 기본값 또는 다른 이유로 인해 문제가 발생할 수 있습니다. 서버 측에서
서버 측 BlockSenderManager
@Override
public void run(){
while(nodeManager.isRunning()){
try
{
BlockRequest br = blockSenders.take();
if(br != null){
executor.execute(new BlockSender(br, this));
}
if(wait.take())
System.out.println(br.getBlockId()+" Delivered");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
BlockSender : 클라이언트 측에서
@Override
public void run()
{
FileInputStream fis = null;
try
{
java.io.File file = new java.io.File(path+"/" + blockRequest.getBlockId());
fis = new FileInputStream(file);
fSize = file.length();
long rem = fSize;
sendBlockInfo();
int bufSize;
if (fSize < (long) packetSize)
bufSize = (int) fSize;
else
bufSize = packetSize;
int read = 0, packetOrder = 1;
byte[] data;
if(bufSize <= rem)
data = new byte[bufSize];
else
data = new byte[(int)rem];
while ((read = (fis.read(data))) > 0)
{
if (read < 1)
break;
BlockPacket bp = new BlockPacket();
bp.setRequestId(blockRequest.getRequestId());
bp.setBlockId(blockRequest.getBlockId());
bp.setData(data);
bp.setPacketSeqNo(packetOrder);
if(read < bufSize)
{
bp.setIsLastPacket(true);
}
executor.execute(new Sender(bp));
packetOrder++;
if(rem > bufSize)
rem = rem - bufSize;
if(bufSize <= rem)
data = new byte[bufSize];
else
{
data = new byte[(int)rem];
}
}
fis.close();
executor.shutdown();
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public class Sender implements Runnable
{
private final BlockPacket bp;
private final FileBlock fb;
private DataClient dc;
public Sender(BlockPacket bp)
{
this.bp = bp;
this.fb = null;
dc = getDataClient(requestClient);
}
public Sender(FileBlock fb)
{
this.bp = null;
this.fb = fb;
dc = getDataClient(requestClient);
}
@Override
public void run()
{
if (dc != null)
{
if (bp != null)
{
dc.send(bp);
}
else if (fb != null)
{
dc.send(fb);
}
}
}
}
ReceivedPacketProcessor는 -XX와
public void processBlockPacket(BlockPacket bp)
{
ByteBuffer buf = ByteBuffer.wrap(bp.getData());
try
{
inChannel.write(buf);
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run()
{
try
{
aFile = new RandomAccessFile(path+"/"+fileName, "rw");
inChannel = aFile.getChannel();
//java.io.File f = new java.io.File(path+"/"+fileName);
//fop = new FileOutputStream(f);
String reqId = file.getFileID();
currentBlockId = reqId + "_" + currentBlockSeq;
while (true)
{
BlockPacket bp = null;
if (numberOfBlocks > 0)
{
try
{
bp = this.blockingQueue.take();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
if (bp.getBlockId().equals(currentBlockId))
{
if (currentPacket == bp.getPacketSeqNo())
{
if(fileBlocks.containsKey(currentBlockId))
{
processBlockPacket(bp);
if(currentPacket < fileBlocks.get(currentBlockId).getNoOfPackets())
currentPacket++;
else
{
if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
{
removeFileBlock(currentBlockId);
currentBlockSeq++;
currentBlockId = reqId + "_" + currentBlockSeq;
currentPacket = 1;
numberOfBlocks--;
}
}
}
else
{
tempList.add(bp);
}
for(int k =numberOfBlocks; k>0; k--)
{
if(fileBlocks.containsKey(currentBlockId))
{
int pCount = fileBlocks.get(currentBlockId).getNoOfPackets();
int i;
for (i = currentPacket; i <= pCount; i++)
{
if (fileBlocks.get(currentBlockId).getPackets().containsKey(i))
{
processBlockPacket(fileBlocks.get(currentBlockId).getPackets().remove(i));
currentPacket++;
}
else
{
break;
}
}
if(i <= pCount)
{
break;
}
else
{
if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
{
removeFileBlock(currentBlockId);
currentBlockSeq++;
currentBlockId = reqId + "_" + currentBlockSeq;
currentPacket = 1;
numberOfBlocks--;
}
}
}
}
}
}
else
{
if(fileBlocks.containsKey(bp.getBlockId())){
fileBlocks.get(bp.getBlockId()).getPackets().put(bp.getPacketSeqNo(), bp);
}else{
tempList.add(bp);
}
}
}
else{
break;
}
for(int i=0; i<tempList.size(); i++){
if(fileBlocks.containsKey(tempList.get(i).getBlockId())){
BlockPacket temp = tempList.remove(i);
fileBlocks.get(temp.getBlockId()).getPackets().put(temp.getPacketSeqNo(), temp);
}
}
}
System.out.println("CLOSING FILE....");
this.isCompleted.put(true);
inChannel.force(true);
inChannel.close();
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
: + PrintGCDetails는 here is a sample log이 켜져.
모든 의견/도움을 주실 수 있습니다.
JVM GC, 네트워크 대역폭 사용률, 서버의 IO 읽기를 확인합니다. 서버의 파일 시스템 캐시 활용. –
@ JigarJoshi 의견을 주셔서 감사합니다. 나는 JVM GC를 전혀 건드리지 않을 것이다. 그것은 기본값으로 실행됩니다. 네트워크 대역폭이 문제가된다면, 파일 크기가 15GB 미만이라면 문제가 될 것이라고 생각합니다. 로컬 IO에서 블록을 읽는 것이이 경우 병목 현상이되지는 않습니다. 나는 이미 확인했다. – celik
@celik Jigar가 말하고자하는 것은 GC 로그온을 켜서 '문제'일 수 있는지 확인하는 것입니다. 이 파일을 읽는 방법이 명확하지 않습니다 ... 코드 또는 구현 방법이 있습니까? 'FileChannels'? – Eugene