2017-05-24 7 views
1

따라서 우리는 현재 MQTT 기반 메시징 백엔드에서 netty 3.x를 netty 4.1로 업그레이드하는 중입니다. 우리의 애플리케이션에서는 커스텀 MQTT 메시지 디코더와 인코더를 사용한다. 참조 카운트 된 ByteBuf 오브젝트를 netty에 올바르게 릴리스 4.1

public class MqttMessageDecoder extends ByteToMessageDecoder { 

    @Override 
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
     if (in.readableBytes() < 2) { 
      return; 
     } 

     ..... 
     ..... 
     ..... 

     byte[] data = new byte[msglength]; 
     in.resetReaderIndex(); 
     in.readBytes(data); 
     MessageInputStream mis = new MessageInputStream(
       new ByteArrayInputStream(data)); 
     Message msg = mis.readMessage(); 
     out.add(msg); 
     ReferenceCountUtil.release(in); 
    } 
} 

Message 우리의 사용자 지정 개체가

, 즉이 다음 ChannelHandlerchannelRead()에 전달되는 다음과 같이 우리의 디코더를 들어

, 나는 현재 ByteToMessageDecoder을 사용하고 있습니다. 보시다시피, 들어오는 ByteBuf 개체 in에서 개체를 만들면 바로 완료됩니다. Message 개체입니다. 따라서 ByteBuf은 netty에서 참조 횟수가 계산되므로 여기 in 개체를 ReferenceCountUtil.release(in)으로 불러 와야 할 필요가 있습니까? 이상적으로 이것은 doc에 따라 올바르게 보입니다. 나는이 작업을 수행 할 때, 나는 예외를 직면 할 것 :이 자식 채널이 닫혀있을 때, 파이프 라인에있는 모든 핸들러가 차례로 제거되었는지 알려줍니다

Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. 
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception. 
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal] 
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    ... 7 common frames omitted 

. 이 디코더 핸들러가 닫히면, 아래의 메소드가 호출 될 때 예외를 발생시키는이 디코더에 첨부 된 ByteBuf을 명시 적으로 해제합니다.

@Override 
    public boolean release() { 
     for (;;) { 
      int refCnt = this.refCnt; 
      if (refCnt == 0) { 
       throw new IllegalReferenceCountException(0, -1); 
      } 

      if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { 
       if (refCnt == 1) { 
        deallocate(); 
        return true; 
       } 
       return false; 
      } 
     } 
    } 

다음 ByteBuf 객체를 해제하는 올바른 방법은 무엇이며,이 문제가 발생하지 않도록 :

AbstractReferenceCountedByteBuf#release입니까? 당신은 구성에 관한 더 많은 정보가 필요하면 알려 주시기 바랍니다

new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 

-

나는 PooledByteBufAllocator을 사용하고 있습니다.


EDIT : 애드온 Ferrybig의 대답으로서

ByteToMessageDecoder#channelRead는 자체로 유입 ByteBuf (S)의 해제를 처리한다. finally 블록 참조 -

@Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     if (msg instanceof ByteBuf) { 
      CodecOutputList out = CodecOutputList.newInstance(); 
      try { 
       ByteBuf data = (ByteBuf) msg; 
       first = cumulation == null; 
       if (first) { 
        cumulation = data; 
       } else { 
        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); 
       } 
       callDecode(ctx, cumulation, out); 
      } catch (DecoderException e) { 
       throw e; 
      } catch (Throwable t) { 
       throw new DecoderException(t); 
      } finally { 
       if (cumulation != null && !cumulation.isReadable()) { 
        numReads = 0; 
        cumulation.release(); 
        cumulation = null; 
       } else if (++ numReads >= discardAfterReads) { 
        // We did enough reads already try to discard some bytes so we not risk to see a OOME. 
        // See https://github.com/netty/netty/issues/4275 
        numReads = 0; 
        discardSomeReadBytes(); 
       } 

       int size = out.size(); 
       decodeWasNull = !out.insertSinceRecycled(); 
       fireChannelRead(ctx, out, size); 
       out.recycle(); 
      } 
     } else { 
      ctx.fireChannelRead(msg); 
     } 
    } 

인바운드 ByteBuf 파이프 라인 아래로 다음 채널 핸들러에 전달되는 경우를,이 ByteBuf의 참조 횟수가 ByteBuf#retain을 통해 증가하고 디코더 후 다음 핸들러는 귀하의 비즈니스 그래서 경우 처리기 (대개의 경우)에서 메모리 누수가 발생하지 않도록하려면 ByteBuf 객체를 릴리스해야합니다. 이것도 docs에 나와 있습니다.

답변

1

모든 처리기가 전달 된 bytebuf를 파괴해야하는 것은 아닙니다. ByteToMessageDecoder이 그 중 하나입니다.

그 이유는이 핸들러는 여러 들어오는 bytebufs를 수집하고 코딩의 편의를 위해, 바이트의 1 개 연속 스트림으로 응용 프로그램에 노출하고, 자신이

기억이 덩어리를 처리하기 위해 필요로하지 않는 것입니다 javadoc에 명시된대로 readBytes 또는 readSlice을 사용하여 생성 한 모든 bytebufs를 수동으로 릴리스해야합니다.