따라서 우리는 현재 MQTT 기반 메시징 백엔드에서 netty 3.x를 netty 4.1로 업그레이드하는 중입니다. 우리의 애플리케이션에서는 커스텀 MQTT 메시지 디코더와 인코더를 사용한다. 참조 카운트 된 ByteBuf 오브젝트를 netty에 올바르게 릴리스 4.1
public class MqttMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 2) {
return;
}
.....
.....
.....
byte[] data = new byte[msglength];
in.resetReaderIndex();
in.readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
out.add(msg);
ReferenceCountUtil.release(in);
}
}
Message
우리의 사용자 지정 개체가
ChannelHandler
의
channelRead()
에 전달되는 다음과 같이 우리의 디코더를 들어
, 나는 현재
ByteToMessageDecoder을 사용하고 있습니다. 보시다시피, 들어오는
ByteBuf
개체
in
에서 개체를 만들면 바로 완료됩니다.
Message
개체입니다. 따라서
ByteBuf
은 netty에서 참조 횟수가 계산되므로 여기
in
개체를
ReferenceCountUtil.release(in)
으로 불러 와야 할 필요가 있습니까? 이상적으로 이것은
doc에 따라 올바르게 보입니다. 나는이 작업을 수행 할 때, 나는 예외를 직면 할 것 :이 자식 채널이 닫혀있을 때, 파이프 라인에있는 모든 핸들러가 차례로 제거되었는지 알려줍니다
Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception.
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final]
... 7 common frames omitted
. 이 디코더 핸들러가 닫히면, 아래의 메소드가 호출 될 때 예외를 발생시키는이 디코더에 첨부 된 ByteBuf
을 명시 적으로 해제합니다.
@Override
public boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
다음 ByteBuf
객체를 해제하는 올바른 방법은 무엇이며,이 문제가 발생하지 않도록 :
이
은AbstractReferenceCountedByteBuf#release
입니까? 당신은 구성에 관한 더 많은 정보가 필요하면 알려 주시기 바랍니다
new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-
나는 PooledByteBufAllocator
을 사용하고 있습니다.
EDIT : 애드온 Ferrybig의 대답으로서
는 ByteToMessageDecoder#channelRead
는 자체로 유입 ByteBuf
(S)의 해제를 처리한다. finally
블록 참조 -
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
인바운드 ByteBuf
파이프 라인 아래로 다음 채널 핸들러에 전달되는 경우를,이 ByteBuf
의 참조 횟수가 ByteBuf#retain
을 통해 증가하고 디코더 후 다음 핸들러는 귀하의 비즈니스 그래서 경우 처리기 (대개의 경우)에서 메모리 누수가 발생하지 않도록하려면 ByteBuf
객체를 릴리스해야합니다. 이것도 docs에 나와 있습니다.