2017-04-10 2 views
2

15GB보다 큰 파일을 전송할 때 데이터 전송 속도에 문제가 있습니다. 3 대의 서버와 1 대의 클라이언트가 있습니다. 클라이언트에서 서버로 파일을 전송할 때 파일을 블록 (일반적으로 각 블록은 256MB)으로 분할하고 각 블록을 2 개의 서버에 복제합니다. 복제는 파이프 라인 방법에서 발생합니다. 블록을 보낼 때 각 블록은 더 작은 패킷 (각 패킷은 일반적으로 128KB)으로 슬라이스되고 서버로 전송되며 서버 측에서 병합되어 하드 드라이브에 저장됩니다. 여기에는 모든 것이 좋습니다. 필자는 5GB에서 50GB까지 파일을 5GB 단위로 테스트했습니다. 평균 쓰기는 모든 파일에 대해 약 600MB/초입니다. 아래 차트를 참조하십시오. 여기서 나는 HDFS와 비교하고있다. 서버에서 동일한 파일을 읽을 때큰 파일 전송을 위해 15GB 이후에 데이터 전송 속도가 느려짐

enter image description here

문제가 발생합니다. 파일은 여러 서버에 분산되어 있습니다. 예를 들어 server1의 block1, server2의 block2 등을 읽을 수 있습니다. 직관적으로 클라이언트는 3 대의 서버를 병렬로 읽으므로 읽기는 쓰기보다 빠를 수 없습니다. {15GB, 10GB, 15GG}보다 작은 파일을 읽을 때 성능은 약 1.1GB/초입니다. 문제는 20GB (20GB, 25GB, ..., 50GB)보다 큰 파일을 읽을 때 발생합니다. 파일 크기가 커질수록 성능이 떨어집니다.

Performance graph for 50GB file when reading

위 그림

는 50기가바이트 파일을 읽기위한 벤치 마크 테스트를 보여줍니다. 검은 색 점은 개별 블록 읽기 시간을 나타냅니다. 보시다시피, 성능은 60-70 블록 후에 감소하기 시작합니다. 흥미롭게도이 문제는 15GB를 초과하는 모든 파일에서 발생하며 같은 장소 (약 65 번째 블록) 주변에서 속도가 느려집니다. 파일 크기가 커짐에 따라 느린 부분이 지배적이며 성능이 악화되고 있습니다. 16GB 정도의 장애물이있는 것 같습니다. 내가 볼 수있는 유일한 힌트는 세 개의 서버가 약 65 번까지 블록을 임의로 병렬로 전송한다는 것입니다. 따라서 블록 전송은 겹쳐 있습니다. 그런 다음 한 서버가 한 번에 라운드 로빈 순서로 보냅니다. 나는 로그 결과에서 이것을 볼 수있다. 여전히 여기에 중복되는 부분이 있지만 65 번째 블록 이전에는 그렇지 않습니다.

이 프로젝트에는 Java 1.8을 사용하고 netty 4.1.8을 사용하고 있습니다. TCP 서버로. OS는 CentOS 7입니다. 각 서버에는 2 개의 CPU (Intel (R) Xeon (R) CPU E5-2650 v3 @ 2.30GHz) = 40 코어가 있습니다. 64GB RAM 10GBit 이더넷.

많은 시간을 들여 문제의 근본 원인을 찾을 수 없습니다. Java VM, Netty, OS, OS TCP 기본값 또는 다른 이유로 인해 문제가 발생할 수 있습니다. 서버 측에서

서버 측 BlockSenderManager

@Override 
    public void run(){ 

     while(nodeManager.isRunning()){ 
      try 
      { 
       BlockRequest br = blockSenders.take(); 
       if(br != null){ 
        executor.execute(new BlockSender(br, this)); 
       } 

       if(wait.take()) 
        System.out.println(br.getBlockId()+" Delivered"); 
      } 
      catch (InterruptedException e) 
      { 
       e.printStackTrace(); 
      } 
     } 

BlockSender : 클라이언트 측에서

@Override 
     public void run() 
     { 
      FileInputStream fis = null; 

      try 
      { 
       java.io.File file = new java.io.File(path+"/" + blockRequest.getBlockId()); 

       fis = new FileInputStream(file); 
       fSize = file.length(); 
       long rem = fSize; 

       sendBlockInfo(); 
       int bufSize; 
       if (fSize < (long) packetSize) 
        bufSize = (int) fSize; 
       else 
        bufSize = packetSize; 
       int read = 0, packetOrder = 1; 

       byte[] data; 
       if(bufSize <= rem) 
        data = new byte[bufSize]; 
       else 
        data = new byte[(int)rem]; 
       while ((read = (fis.read(data))) > 0) 
       { 
        if (read < 1) 
         break; 

        BlockPacket bp = new BlockPacket(); 

        bp.setRequestId(blockRequest.getRequestId()); 
        bp.setBlockId(blockRequest.getBlockId()); 
        bp.setData(data); 
        bp.setPacketSeqNo(packetOrder); 
        if(read < bufSize) 
        { 
         bp.setIsLastPacket(true); 
        } 

        executor.execute(new Sender(bp)); 

        packetOrder++; 
        if(rem > bufSize) 
         rem = rem - bufSize; 

        if(bufSize <= rem) 
         data = new byte[bufSize]; 
        else 
        { 
         data = new byte[(int)rem]; 
        } 
       } 

       fis.close(); 
       executor.shutdown(); 
      } 
      catch (FileNotFoundException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      catch (IOException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

public class Sender implements Runnable 
    { 
     private final BlockPacket bp; 
     private final FileBlock fb; 
     private DataClient dc; 

     public Sender(BlockPacket bp) 
     { 
      this.bp = bp; 
      this.fb = null; 
      dc = getDataClient(requestClient); 
     } 

     public Sender(FileBlock fb) 
     { 
      this.bp = null; 
      this.fb = fb; 
      dc = getDataClient(requestClient); 
     } 

     @Override 
     public void run() 
     { 

      if (dc != null) 
      { 
       if (bp != null) 
       { 
        dc.send(bp); 
       } 
       else if (fb != null) 
       { 
        dc.send(fb); 
       } 
      } 

     } 
    } 

ReceivedPacketProcessor는 -XX와

public void processBlockPacket(BlockPacket bp) 
    { 
     ByteBuffer buf = ByteBuffer.wrap(bp.getData()); 
     try 
     { 
      inChannel.write(buf); 
     } 
     catch (IOException e) 
     { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
     public void run() 
     { 
      try 
      { 
       aFile = new RandomAccessFile(path+"/"+fileName, "rw"); 
       inChannel = aFile.getChannel(); 
       //java.io.File f = new java.io.File(path+"/"+fileName); 
       //fop = new FileOutputStream(f); 
       String reqId = file.getFileID(); 
       currentBlockId = reqId + "_" + currentBlockSeq; 
       while (true) 
       { 
        BlockPacket bp = null; 
        if (numberOfBlocks > 0) 
        { 
         try 
         { 
          bp = this.blockingQueue.take(); 
         } 
         catch (InterruptedException e) 
         { 
          e.printStackTrace(); 
         } 
         if (bp.getBlockId().equals(currentBlockId)) 
         { 
          if (currentPacket == bp.getPacketSeqNo()) 
          { 

           if(fileBlocks.containsKey(currentBlockId)) 
           { 
            processBlockPacket(bp); 
            if(currentPacket < fileBlocks.get(currentBlockId).getNoOfPackets()) 
             currentPacket++; 
            else 
            { 
             if (fileBlocks.get(currentBlockId).getPackets().size() < 1) 
             { 
              removeFileBlock(currentBlockId); 
              currentBlockSeq++; 
              currentBlockId = reqId + "_" + currentBlockSeq; 
              currentPacket = 1; 
              numberOfBlocks--; 
             } 
            } 
           } 
           else 
           { 
            tempList.add(bp); 
           } 

           for(int k =numberOfBlocks; k>0; k--) 
           { 
            if(fileBlocks.containsKey(currentBlockId)) 
            { 
             int pCount = fileBlocks.get(currentBlockId).getNoOfPackets(); 
             int i; 
             for (i = currentPacket; i <= pCount; i++) 
             { 
              if (fileBlocks.get(currentBlockId).getPackets().containsKey(i)) 
              { 
               processBlockPacket(fileBlocks.get(currentBlockId).getPackets().remove(i)); 
               currentPacket++; 
              } 
              else 
              { 
               break; 
              } 
             } 
             if(i <= pCount) 
             { 
              break; 
             } 
             else 
             { 
              if (fileBlocks.get(currentBlockId).getPackets().size() < 1) 
              { 
               removeFileBlock(currentBlockId); 
               currentBlockSeq++; 
               currentBlockId = reqId + "_" + currentBlockSeq; 
               currentPacket = 1; 
               numberOfBlocks--; 
              } 
             } 
            } 
           } 
          } 
         } 
         else 
         { 
          if(fileBlocks.containsKey(bp.getBlockId())){ 
           fileBlocks.get(bp.getBlockId()).getPackets().put(bp.getPacketSeqNo(), bp); 
          }else{ 
           tempList.add(bp); 
          } 

         } 
        } 
        else{ 
         break; 
        } 
        for(int i=0; i<tempList.size(); i++){ 
         if(fileBlocks.containsKey(tempList.get(i).getBlockId())){ 
          BlockPacket temp = tempList.remove(i); 
          fileBlocks.get(temp.getBlockId()).getPackets().put(temp.getPacketSeqNo(), temp); 
         } 
        } 
       } 
       System.out.println("CLOSING FILE...."); 
       this.isCompleted.put(true); 
       inChannel.force(true); 
       inChannel.close(); 
      } 
      catch (FileNotFoundException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      catch (IOException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      catch (InterruptedException e) 
      { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

: + PrintGCDetails는 here is a sample log이 켜져.

모든 의견/도움을 주실 수 있습니다.

+1

JVM GC, 네트워크 대역폭 사용률, 서버의 IO 읽기를 확인합니다. 서버의 파일 시스템 캐시 활용. –

+0

@ JigarJoshi 의견을 주셔서 감사합니다. 나는 JVM GC를 전혀 건드리지 않을 것이다. 그것은 기본값으로 실행됩니다. 네트워크 대역폭이 문제가된다면, 파일 크기가 15GB 미만이라면 문제가 될 것이라고 생각합니다. 로컬 IO에서 블록을 읽는 것이이 경우 병목 현상이되지는 않습니다. 나는 이미 확인했다. – celik

+0

@celik Jigar가 말하고자하는 것은 GC 로그온을 켜서 '문제'일 수 있는지 확인하는 것입니다. 이 파일을 읽는 방법이 명확하지 않습니다 ... 코드 또는 구현 방법이 있습니까? 'FileChannels'? – Eugene

답변

0

메모리의 더티 페이지 비율 때문입니다. 들어오는 데이터 속도가 로컬 IO 플러시 처리량보다 높기 때문에 데이터가 메모리에 누적됩니다. 최대 허용 더티 페이지 비율에 도달하면 수신자는 더 많은 데이터를 허용하지 않습니다. 따라서 시스템은이 경우 네트워크가 아닌 로컬 IO로 제한됩니다. 따라서 수익의 감소는 약 15GB에서 발생합니다.

vm.dirty_background_ratio = 2 
vm.dirty_ratio = 80 
vm.dirty_expire_centisecs = 3000 
vm.dirty_writeback_centisecs = 500 

This 유용한 읽기 수 있습니다 : 당신은 같은

/etc/sysctl.conf에

의 일부 설정을 변경할 수 있습니다.

시스템 성능은 여전히 ​​로컬 IO 및 최대 허용 더티 페이지 비율로 제한됩니다. 당신은 반환 시간의 감소를 연기하기 위해 더러운 페이지 비율을 증가시킬 수 있습니다. 파일/데이터가 더 큰 경우이 지점에 다시 도달합니다. 새로운 결과 : enter image description here