Netty资料收集与整理
Contents
ChannelInitializer
它每次在 Channel 初始化时都会进行调用.一般会在这里, 初始化相应的编码器和解码器以及相应的 Handler. 例如
@Override
protected void initChannel(final Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(new HelloHandler());
}
为了调试, 我这里覆盖了两个方法:
@Override
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
System.out.println("添加 hello handler");
System.out.println(ctx.pipeline().toMap());
}
@Override
public void handlerRemoved(final ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
System.out.println("移除 hello handler");
System.out.println(ctx.pipeline().toMap());
}
多次访问服务时, 可以看到它会不断输出:
添加 hello handler
{BidInitializer#0=com.uniweibo.dsp.server.BidInitializer@6c32dcc8, HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@508bcb4e, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@32e4e874, HelloHandler#0=com.uniweibo.dsp.server.HelloHandler@7c0161a1}
移除 hello handler
{HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@508bcb4e, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@32e4e874}
添加 hello handler
{BidInitializer#0=com.uniweibo.dsp.server.BidInitializer@6c32dcc8, HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@15c22d08, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@50c0b1d4, HelloHandler#0=com.uniweibo.dsp.server.HelloHandler@7c0161a1}
移除 hello handler
{HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@15c22d08, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@50c0b1d4}
添加 hello handler
{BidInitializer#0=com.uniweibo.dsp.server.BidInitializer@6c32dcc8, HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@47a59cc1, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@2a71a51, HelloHandler#0=com.uniweibo.dsp.server.HelloHandler@7c0161a1}
移除 hello handler
{HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@47a59cc1, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@2a71a51}
理解 Handler 中的 Sharable
如果你的 Handler 是无状态的话, 则可以将它声明为 @Sharable
的, 然后只创建一次实例即可.而不用每次 Channel 初始化时, 都 new 一个.
以下是所有 Netty 包下, Sharable 的handler:
sharable class size => 34
sharable class => io.netty.handler.codec.dns.DatagramDnsQueryEncoder
sharable class => io.netty.handler.codec.socksx.v4.Socks4ServerEncoder
sharable class => io.netty.handler.codec.string.LineEncoder
sharable class => io.netty.handler.traffic.GlobalTrafficShapingHandler
sharable class => io.netty.handler.codec.LengthFieldPrepender
sharable class => io.netty.handler.codec.dns.DatagramDnsQueryDecoder
sharable class => io.netty.handler.codec.protobuf.ProtobufDecoderNano
sharable class => io.netty.handler.codec.base64.Base64Encoder
sharable class => io.netty.handler.codec.string.StringEncoder
sharable class => io.netty.handler.ipfilter.RuleBasedIpFilter
sharable class => io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
sharable class => io.netty.handler.codec.rtsp.RtspObjectEncoder
sharable class => io.netty.handler.codec.socks.SocksMessageEncoder
sharable class => io.netty.handler.codec.bytes.ByteArrayEncoder
sharable class => io.netty.handler.codec.serialization.ObjectEncoder
sharable class => io.netty.handler.ipfilter.UniqueIpFilter
sharable class => io.netty.handler.codec.socksx.v5.Socks5ClientEncoder
sharable class => io.netty.handler.codec.protobuf.ProtobufEncoder
sharable class => io.netty.handler.codec.socksx.v4.Socks4ClientEncoder
sharable class => io.netty.handler.codec.dns.DatagramDnsResponseEncoder
sharable class => io.netty.handler.logging.LoggingHandler
sharable class => io.netty.handler.codec.mqtt.MqttEncoder
sharable class => io.netty.handler.codec.socksx.v5.Socks5ServerEncoder
sharable class => io.netty.handler.traffic.GlobalChannelTrafficShapingHandler
sharable class => io.netty.handler.codec.string.StringDecoder
sharable class => io.netty.handler.codec.marshalling.MarshallingEncoder
sharable class => io.netty.handler.codec.protobuf.ProtobufDecoder
sharable class => io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec
sharable class => io.netty.handler.codec.dns.DatagramDnsResponseDecoder
sharable class => io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender
sharable class => io.netty.handler.codec.http.websocketx.WebSocket00FrameEncoder
sharable class => io.netty.handler.codec.base64.Base64Decoder
sharable class => io.netty.handler.codec.protobuf.ProtobufEncoderNano
sharable class => io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder
打印代码:
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.11</version>
<scope>test</scope>
</dependency>
@Test
public void findSharable() {
Set<Class<?>> set = new Reflections("io.netty.handler").getTypesAnnotatedWith(ChannelHandler.Sharable.class);
System.out.println("sharable class size => " + set.size());
for (Class<?> clazz : set) {
System.out.println("sharable class => " + clazz.getName());
}
}
io.netty.channel.ChannelPipelineException: xxx is not a @Sharable handler, so can't be added or removed multiple times
测试的代码:
public class HelloHandler extends ChannelInboundHandlerAdapter {
...
}
public class AppChannelInializer extends ChannelInitializer {
private static final HelloHandler helloHandler = new HelloHandler();
@Override
protected void initChannel(final Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(helloHandler);
}
}
可以看到, HelloHandler 没有标注为 @Sharable
, 并且, 它又想只创建一个实例的~
先来看下源码的逻辑先:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
定位源码, 可以发现上面的异常是由 checkMultiplicity(handler);
这个调用产生的, 它的源码如下:
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
从代码逻辑可以了解到: 当 HelloHandler
第一次被添加时, 是没问题的(因为 h.added = false
, 所以并不会进入 if 里面的代码). 它会在第二次初始化 ChannelPipeline 时(Netty对每条 Channel, 都会进行初始化和清理 Handler 工作), 因为 HelloHandler
是同一个实例, 所以if 的逻辑判断是: h.isSharable()
为 false , 并且 h.added
为 true(第一次被添加时, 将它设置为 true 了), 这时就进入了 if 里面的代码块, 最后就抛出了 ChannelPipelineException 的异常~
解决办法
如果你确保你自己写的 Handler 是线程安全的, 则可以将它声明为 @Sharable
, 否则就不要共享它, 而是每次都 new 一个.
Netty EventLoopGroup 初始化
默认线程数
MultithreadEventLoopGroup
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
可以看到, 最小为1, 也可以通过环境变量 io.netty.eventLoopThreads
来覆盖, 否则默认是当前环境的可用CPU核心数的两倍.(这个可用核心数, 也可以通过环境变量 io.netty.availableProcessors
来修改, 如果存在这个环境变量的话, 则优先使用这个, 而不是通过 Runtime.getRuntime().availableProcessors()
获取)
参考 NettyRuntime 类的下面代码
synchronized int availableProcessors() {
if (this.availableProcessors == 0) {
final int availableProcessors =
SystemPropertyUtil.getInt(
"io.netty.availableProcessors",
Runtime.getRuntime().availableProcessors());
setAvailableProcessors(availableProcessors);
}
return this.availableProcessors;
Netty 最挂实践
摘要过来一下:
- 在 channelReadComplete 方法用才
flush(...)
, 其他的方法, 则只是write(...)
. 因为flush 会有 syscall
- 尽可能地减少
flush()
, 以减少 syscall 调用, 因为 syscall 调用是相当昂贵的~ - 如果可以的话, 也请减少
write(...)
调用, 因为它会穿过整个pipeline
的~
- 尽可能地减少
- 如果可以的话, 请使用
VoidChannelProise
Channel.write(msg) => Channel.write(msg, Channel.voidPromise())
(以减少GC压力)- 仅当你对
ChannelFuture
不感兴趣 以及没有ChannelOutboundHandler
需要添加ChannelFutureListener
时才这样子做~
- 仅当你对
当接收方太慢时, 正确的写的方式:
public class GracefulWriteHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { writeIfPossible(ctx.channel()); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) { writeIfPossible(ctx.channel()); } private void writeIfPossible(Channel channel) { while(needsToWrite && channel.isWritable()) { channel.writeAndFlush(createMessage()); } } }
配置写的高低水位:
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
尽可能地使用
ByteToMessageDecoder
而不是ReplayingDecoder
(它有更多的 overhead 以及需要处理ReplayingError
)小心使用
unpooled buffers
(分配和回收比较慢). 尽可能使用Pooled Buffers
:bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
. 但还不够, 还要配置:java -Dio.netty.allocator.numDirectArenas=... -Dio.netty.allocator.numHeapArenas=...
(注: 新版Netty, 这已经是默认值了, github)使用
Pool buffers
来减少内存的分配和释放时间仅当需要在
ChannelOutboundHandler
里操作byte[]
时, 才使用heap buffers
, 默认ByteBufAllocator.buffer(...)
返回的是direct ByteBuf
(即尽可能总是写到direct buffers
)在 ByteBuf 中查找:
比较慢的方式: ByteBuf buf = ...; int index = -1; for (int i = buf.readerIndex(); index == -1 && i < buf.writerIndex(); i++) { if (buf.getByte(i) == '\n') { index = i; } } 更快的方式: ByteBuf buf = ...; int index = buf.forEachByte(new ByteBufProcessor() { @Override public boolean process(byte value) { return value == '\n'; } });
这是因为: ByteBufProcessor
- 可以擦除范围检查
- 可以被创建和共享
- 更容易被 JIT 内联
其他的 Buffer 提示
- 使用
alloc()
来代替Unpooled
- 使用
slice(), dumplicate()
来代替copy
- 使用批量操作来代替循环
- 使用
使用
zero-memory-copy
来高效进行文件内容传输:Channel channel = ...; FileChannel fc = ...; channel.writeAndFlush(new DefaultFileRegion(fc, 0, fileLength));
仅当你不需要修改文件数据时. 如果需要修改的话, 使用 ChunkedWriteHandler
和 NioChunkedFile
永远不要阻塞 EventLoop !!!
Thread.sleep()
CountDownLatch.await()
或其他来自java.util.concurrent
的任何阻塞操作- 长时间的计算操作
- 会阻塞一段时间的操作(例如 DB 查询)
EventLoop 继承自
ScheduledExecutorService
, 所以使用它~- 通过 EventLoop 来进行 Schedule 和 Execute task 可减少需要的线程, 但要保证它是线程安全的
重用 EventLoopGroup, 如果可以的话. 共享同样的
EventLoopGroup
来减少资源的使用EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap().group(group); Bootstrap bootstrap2 = new Bootstrap().group(group);
应用程序代理中的上下文切换问题
不要这样子做:
public class ProxyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
Bootstrap b = new Bootstrap();
b.group(new NioEventLooopGroup());
...
ChannelFuture f = b.connect(remoteHost, remotePort);
...
}
}
而要这样子:
public class ProxyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop());
...
ChannelFuture f = b.connect(remoteHost, remotePort);
...
}
}
记住: 总是在整个应用中, 共享 EvenLoop
- 在 EventLoop 外部调用时, 建议总是组合操作
不建议:
channel.write(msg1);
channel.writeAndFlush(msg3);
建议:
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
channel.write(msg1);
channel.writeAndFlush(msg3);
}
});
在 ChannelHandler 内部操作时
public class YourHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { // BAD (most of the times) ctx.channel().writeAndFlush(msg); // GOOD ctx.writeAndFlush(msg); } }
这是因为 Channel.*
的方法, 将会从 ChannelPipeline
的尾端开始操作~
而 ChannelHandlerContext.*
的方法, 则是从当前的 ChannelHandler
开始再通过 ChannelPipeline
操作.
即: 尽可能地使用最短路径来获取最大化的性能.
如果 ChannelHandler 是无状态的话, 则共享它
@ChannelHandler.Shareable public class StatelessHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { logger.debug("Now client from " + ctx.channel().remoteAddress().toString()); } } public class MyInitializer extends ChannelInitializer<Channel> { private static final ChannelHandler INSTANCE = new StatelessHandler(); @Override public void initChannel(Channel ch) { ch.pipeline().addLast(INSTANCE); } }
当不再需要某个 ChannelHandler 时, 删除它~
public class OneTimeHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { doOneTimeAction(); ctx.channel().pipeline().remove(this); } }
在 MessageToByteEncoder 中使用适当的 buffer 类型
public class EncodeActsOnByteArray extends MessageToByteEncoder<YourMessage> { public EncodeActsOnByteArray() { super(false); } @Override public encode(ChannelHandlerContext ctx, YourMessage msg, ByteBuf out) { byte[] array = out.array(); int offset = out.arrayOffset() + out.writerIndex(); out.writeIndex(out.writerIndex() + encode(msg, array, offset)); } private int encode(YourMessage msg, byte[] array, int offset, int len) { ... } }
这可减少额外的字节复制.
自动读取或非自动读取(这在写代理类的应用时非常有用)
channel.config().setAutoRead(false); channel.read(); channel.config().setAutoRead(true);
SSL: 不要使用 JDK SSLEngine, 如果关注性能的话
JDK的 SSLEngine
太慢了并且会产生相当多的 GC
建议使用 Twitters 基于 OpenSSL 的 SSLEngine
. finagle-native
import com.twitter.finagle.ssl.Ssl;
...
SSLEngine engine = Ssl.server("/path/to/cert.pem", "/path/to/key.pem", "/path/to/cachain.pem", null, null).self();
pipeline.addLast("ssl", new SslHandler(engine));
- 你永不知道什么时候
ChannelFuture
会被唤醒
不好的写法
public class OuterClass {
private final HeavyObject instance = ....;
public void write() { channel.writeAndFlush(msg).addListener(
new ChannelFutureListener() { ...}); }
}
好的写法:
public class OuterClass {
private final HeavyObject instance = ....;
public void write() { channel.writeAndFlush(msg).addListener(new ListenerImpl()); }
private static final class ListenerImpl implements ChannelFutureListener { ... }
- 本地的 Transport, 更少的 GC 更低的延迟:
https://github.com/netty/netty/wiki/Native-transports
仅在 Linux 下可用~
Bootstrap bootstrap = new Bootstrap().group(new EpollEventLoopGroup());
bootstrap.channel(EpollSocketChannel.class);
Netty 线程并没有关闭 channel
现象
写了个简单的 HTTP 请求/响应代码, 然后发送 curl 命令, 但是 curl 命令一直在等待服务器响应并且不会断开连接.
server 的代码:
initializer:
@Override
protected void initChannel(final Channel channel) {
final ChannelPipeline p = channel.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(_1M));
p.addLast(new HelloHandler());
}
HelloHandler:
@ChannelHandler.Sharable
public class HelloHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) throws Exception {
DefaultFullHttpResponse defaultHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("hello world".getBytes()));
ctx.write(defaultHttpResponse);
System.out.println("write ok");
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
curl:
curl http://localhost:8080/helloworld -v
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /helloworld HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
* no chunk, no close, no size. Assume close to signal end
<
可以看到, curl 命令就一直这样子挂起着.
解决
指定 Content-Type
和 Content-Length
即可.
private static final AsciiString CONTENT_TYPE = AsciiString.cached("Content-Type");
private static final AsciiString CONTENT_LENGTH = AsciiString.cached("Content-Length");
private static final AsciiString TEXT = AsciiString.cached("text/plain");
defaultHttpResponse.headers().set(CONTENT_TYPE, TEXT);
defaultHttpResponse.headers().setInt(CONTENT_LENGTH, defaultHttpResponse.content().readableBytes());
ctx.write(defaultHttpResponse);
编写 HTTP Server
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
可以直接合并成一个:
p.addLast(new HttpServerCodec());
HttpObjectAggregator 作用
它一般结合 FullHttpRequest
来使用.
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1 * MB));
p.addLast(appHandler);
注意, 一般要将 HttpServerCodec 放在 HttpObjectAggregator 前面.
这样子配置之后, 就可以直接获取 FullHttpRequest
对象了:
public class AppHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
...
}