[翻译]Netty中的引用计数对象
Contents
原文 http://netty.io/wiki/reference-counted-objects.html
从 Netty 4 版本开始, 某些对象的生命周期是通过它们的引用计数来进行管理, 当它不再被使用时, 方便 Netty 尽可能快地可以回收它们(或者它们共享的资源)到一个对象池(或一个对象分配器). GC 和引用队列对不可达对象并没有提供如此高效实时的保证, 而引用计数则提供了一个可替代的机制, 但牺牲了一点点方便性~
ByteBuf
是最值得注意的类型, 它利用引用计数来提高分配和释放内存的性能, 本页将解释在 Netty 中 ByteBuf
引用计数的工作原理.
引用计数的基础
一个新的引用计数对象的初始化值为1:
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
当你释放release
该引用计数对象时, 引用计数会减1. 如果引用计数达到了0, 则引用计数对象会被释放内存或返回到它所源属的对象池:
assert buf.refCnt() == 1;
// release() returns true only if the reference count becomes 0.
boolean destroyed = buf.release();
assert destroyed;
assert buf.refCnt() == 0;
Dangling 引用
试图访问一个引用计数为0的引用计数对象, 将会触发一个 IllegalReferenceCountException
异常:
assert buf.refCnt() == 0;
try {
buf.writeLong(0xdeadbeef);
throw new Error("should not reach here");
} catch (IllegalReferenceCountExeception e) {
// Expected
}
增加引用计数
一个引用计数可以通过 retain()
方法来增加, 只要它还没有被销毁的话:
ByteBuf buf = ctx.alloc().directBuffer();
assert buf.refCnt() == 1;
buf.retain();
assert buf.refCnt() == 2;
boolean destroyed = buf.release();
assert !destroyed;
assert buf.refCnt() == 1;
谁销毁它?
一般的规则是: 最后访问引用计数对象的一方负责销毁引用计数对象. 进一步来说是:
- 如果一个
sending
(发送) 组件应该将一个引用计数对象传递给另一个receiving
(接收) 组件的话, 则sending
(发送)组件通常不需要销毁它, 而应该将决定推迟到receiving
(接收)组件. - 如果一个组件消费一个引用计数对象并且知道没有其他地方会再使用它的话(例如, 没有再传递一个引用到另一个组件), 则该组件(消费者)应该销毁它.
下面是一个简单的例子:
public ByteBuf a(ByteBuf input) {
input.writeByte(42);
return input;
}
public ByteBuf b(ByteBuf input) {
try {
output = input.alloc().directBuffer(input.readableBytes() + 1);
output.writeBytes(input);
output.writeByte(42);
return output;
} finally {
input.release();
}
}
public void c(ByteBuf input) {
System.out.println(input);
input.release();
}
public void main() {
...
ByteBuf buf = ...;
// This will print buf to System.out and destroy it.
c(b(a(buf)));
assert buf.refCnt() == 0;
}
动作 | 谁应该释放? | 谁已经被释放了? |
---|---|---|
1. main() 创建了 buf |
buf -> main() |
|
2. main() 以 buf 为参数来调用 a() |
buf -> a() |
|
3. a() 只是返回了 buf |
buf -> main() |
|
4. main() 以 buf 为参数来调用 b() |
buf -> b() |
|
5. b() 返回了 buf 的副本 copy |
buf -> b() , copy -> main() |
b() 释放(release )了 buf |
6. main 以 copy 为参数来调用 c() |
copy -> c() |
|
7. c() 直接消费了 copy |
copy -> c() |
c() 释放(release )了 copy |
derived buffer
ByteBuf.duplicate()
, ByteBuf.slice()
和 ByteBuf.order(ByteOrder)
会创建一个 derived buffer
, 它会共享父 buffer 的内存区域. 一个 derived buffer
并不会拥有它自己的引用计数, 而是与父 buffer 共享引用计数的.
ByteBuf parent = ctx.alloc().directBuffer();
ByteBuf derived = parent.duplicate();
// Creating a derived buffer does not increase the reference count.
assert parent.refCnt() == 1;
assert derived.refCnt() == 1;
相反, ByteBuf.copy()
和 ByteBuf.readBytes(int)
并不是 derived buffer
. 返回的 ByteBuf
是已经分配的将需要被释放(release
)的.
注意, 父 buffer 与它的 derived buffer
共享相同的引用计数, 当一个 derived buffer
被创建了的话, 引用计数并不会增加. 因此, 如果你正准备传递一个 derived buffer
到你应用的其他组件的话, 你首先需要调用 retain()
方法.
ByteBuf parent = ctx.alloc().directBuffer(512);
parent.writeBytes(...);
try {
while (parent.isReadable(16)) {
ByteBuf derived = parent.readSlice(16);
derived.retain();
process(derived);
}
} finally {
parent.release();
}
...
public void process(ByteBuf buf) {
...
buf.release();
}
ByteBufHolder 接口
有时, 一个 ByteBuf
是通过一个 buffer holder
来持有的, 例如: DatagramPacket
, HttpContent
, 和 WebSocketframe
. 这些类型都扩展自同一个称为 ByteBufHolder
的接口.
一个 buffer holder
会共享它持有的 buffer
的引用计数的, 类似一个 derived buffer
.
在 ChannelHandler 中的引用计数
Inbound message
当一个 event loop
读取数据到一个 ByteBuf
然后触发一个 channelRead()
事件时, 则在 pipeline
中相应的 ChannelHandler
负责释放(release
)该 buffer. 因此, 消费该接收的数据的 Handler 应该在它的 channelRead()
处理方法中调用消息数据的 release()
方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
...
} finally {
buf.release();
}
}
正如在该文档的上面 谁销毁它?
部分的解释一样, 如果你的 Handler 传递该 buffer (或其他任意的引用计数对象) 到下一个 Handler, 则你不需要释放(release
)它:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
...
ctx.fireChannelRead(buf);
}
注意, ByteBuf
在 Netty 中不是唯一的引用计数类型. 如果你正处理的消息是通过 decoders
产生的话, 该消息很可能也是引用计数类型的:
// 假设你的 handler 是放在 `HttpRequestDecoder` 后面
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
...
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
try {
...
} finally {
content.release();
}
}
}
如果你比较困惑或你想简单地释放该消息, 你可以使用 ReferenceCountUtil.release()
:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
...
} finally {
ReferenceCountUtil.release(msg);
}
}
或者, 你可以考虑继承 SimpleChannelHandler
, 它会为所有你接收到的消息进行自动调用 ReferenceCountUtil.release()
.
Outbound message
不同于 Inbound message
, outbound message 是通过你的应用创建的, 在写完这些消息后, 是由 Netty 来负责释放的. 然而, 那些拦截写请求的 handler 应该确保适当地释放了内部的对象(例如, encoders
):
// Simple-pass through
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
System.err.println("Writing: " + message);
ctx.write(message, promise);
}
// Transformation
public void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise) {
if (message instanceof HttpContent) {
// Transform HttpContent to ByteBuf.
HttpContent content = (HttpContent) message;
try {
ByteBuf transformed = ctx.alloc().buffer();
....
ctx.write(transformed, promise);
} finally {
content.release();
}
} else {
// Pass non-HttpContent through.
ctx.write(message, promise);
}
}
分析和解决 buffer 泄漏
引用计数的缺点是, 它很容易泄漏引用计数对象. 由于 JVM 不知道 Netty 的引用计数实现, 一旦它们变得不可达时会自动进行 GC, 尽管它们的引用计数并不为0. 一个对象一旦被 GC 则不能恢复了的, 因此并不能返回到它源属的对象池中, 并因此会产生内存泄漏.
幸运的是, 尽管很难发现泄漏, Netty 默认情况下会在所有分配的 buffer 中采样 1% 的样本来检测你的应用是否有泄漏. 如果有泄漏, 你会发现有以下的日志消息:
LEAK: ByteBuf.release() was not called before it’s garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option ‘-Dio.netty.leakDetectionLevel=advanced’ or call ResourceLeakDetector.setLevel()
按上面的日志消息提示, 以这些选项来再次启动你的应用, 你将会看到你应用的最近访问泄漏的 buffer 的代码位置. 以下是我们的一个单元测试 (XmlFrameDecoderTest.testDecodeWithXml()
) 的泄漏输出例子:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.toString(AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
...
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.UnpooledUnsafeDirectByteBuf.copy(UnpooledUnsafeDirectByteBuf.java:465)
io.netty.buffer.WrappedByteBuf.copy(WrappedByteBuf.java:697)
io.netty.buffer.AdvancedLeakAwareByteBuf.copy(AdvancedLeakAwareByteBuf.java:656)
io.netty.handler.codec.xml.XmlFrameDecoder.extractFrame(XmlFrameDecoder.java:198)
io.netty.handler.codec.xml.XmlFrameDecoder.decode(XmlFrameDecoder.java:174)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:227)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:140)
io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74)
io.netty.channel.embedded.EmbeddedEventLoop.invokeChannelRead(EmbeddedEventLoop.java:142)
io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:317)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:176)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(XmlFrameDecoderTest.java:147)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(XmlFrameDecoderTest.java:133)
...
如果你使用 Netty 5 或以上版本, 会再提供一些额外的信息来帮你查找是哪个 Handler 最近处理该泄漏的 buffer 的. 下面展示的例子, 打印了 Handler 名为 EchoServerHandler#0
的处理泄漏的 buffer 的输出, 然后 GC 掉了, 这意味着, 很可能是 EchoServerHandler#0
忘记释放了该 buffer
12:05:24.374 [nioEventLoop-1-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 2
#2:
Hint: 'EchoServerHandler#0' will handle the message from this point.
io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:329)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:133)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:589)
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:208)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
java.lang.Thread.run(Thread.java:744)
泄漏检测级别
目前有4种泄漏检测级别:
DISABLED
: 完全禁用检测. 不建议SIMPLE
: 对所有分配的 buffer 进行 1% 采样检测是否泄漏. 这是默认值ADVANCED
: 对 1% buffer样本检测, 并且告知是在哪里访问泄漏的 buffer 的PARANOID
: 与ADVANCED
一样, 但它是对每一个单一的 buffer 都进行检测. 这在自动测试阶段非常有用. 如果你的构建输出中包含LEAK:
的话, 会导致构建失败.
你可以通过JVM选项 -Dio.netty.leakDetection.level
指定泄漏检测级别:
java -Dio.netty.leakDetection.level=advanced ...
注意: 该属性名以前称为
io.netty.leakDetectionLevel
避免泄漏的最佳实践
- 在你的单元测试和集成测试中使用
PARANOID
级别检测, 同样也在SIMPLE
级别检测 - 在你的应用加入整个应用集群之前, 在
SIMPLE
级别中执行比较合理的一段长时间的输出来查看是否有内存泄漏 - 如果存在泄漏, 则在
ADVANCED
级别中再收集一次来查看是从哪里泄漏的 - 不要部署一个存在泄漏的应用到整个集群里~
在单元测试中 fix 泄漏
在一个单元测试中, 非常容易忘记释放一个 buffer 或 message . 它会产生一个泄漏警告, 但这不意味着你的应用有泄漏. 你可以使用 ReferenceCountUtil.releaseLater()
工具方法, 而不是使用 try-finally
代码块来封装单元测试来释放所有的 buffer:
import static io.netty.util.ReferenceCountUtil.*;
@Test
public void testSomething() throws Exception {
// ReferenceCountUtil.releaseLater() will keep the reference of buf,
// and then release it when the test thread is terminated.
ByteBuf buf = releaseLater(Unpooled.directBuffer(512));
...
}