ChannelInitializer

它每次在 Channel 初始化时都会进行调用.一般会在这里, 初始化相应的编码器和解码器以及相应的 Handler. 例如

    @Override
    protected void initChannel(final Channel ch) {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpResponseEncoder());
        p.addLast(new HelloHandler());
    }

为了调试, 我这里覆盖了两个方法:

    @Override
    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
        System.out.println("添加 hello handler");
        System.out.println(ctx.pipeline().toMap());
    }

    @Override
    public void handlerRemoved(final ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
        System.out.println("移除 hello handler");
        System.out.println(ctx.pipeline().toMap());
    }

多次访问服务时, 可以看到它会不断输出:

添加 hello handler
{BidInitializer#0=com.uniweibo.dsp.server.BidInitializer@6c32dcc8, HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@508bcb4e, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@32e4e874, HelloHandler#0=com.uniweibo.dsp.server.HelloHandler@7c0161a1}
移除 hello handler
{HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@508bcb4e, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@32e4e874}
添加 hello handler
{BidInitializer#0=com.uniweibo.dsp.server.BidInitializer@6c32dcc8, HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@15c22d08, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@50c0b1d4, HelloHandler#0=com.uniweibo.dsp.server.HelloHandler@7c0161a1}
移除 hello handler
{HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@15c22d08, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@50c0b1d4}
添加 hello handler
{BidInitializer#0=com.uniweibo.dsp.server.BidInitializer@6c32dcc8, HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@47a59cc1, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@2a71a51, HelloHandler#0=com.uniweibo.dsp.server.HelloHandler@7c0161a1}
移除 hello handler
{HttpRequestDecoder#0=io.netty.handler.codec.http.HttpRequestDecoder@47a59cc1, HttpResponseEncoder#0=io.netty.handler.codec.http.HttpResponseEncoder@2a71a51}

理解 Handler 中的 Sharable

如果你的 Handler 是无状态的话, 则可以将它声明为 @Sharable 的, 然后只创建一次实例即可.而不用每次 Channel 初始化时, 都 new 一个.

以下是所有 Netty 包下, Sharable 的handler:

sharable class size =>  34
sharable class => io.netty.handler.codec.dns.DatagramDnsQueryEncoder
sharable class => io.netty.handler.codec.socksx.v4.Socks4ServerEncoder
sharable class => io.netty.handler.codec.string.LineEncoder
sharable class => io.netty.handler.traffic.GlobalTrafficShapingHandler
sharable class => io.netty.handler.codec.LengthFieldPrepender
sharable class => io.netty.handler.codec.dns.DatagramDnsQueryDecoder
sharable class => io.netty.handler.codec.protobuf.ProtobufDecoderNano
sharable class => io.netty.handler.codec.base64.Base64Encoder
sharable class => io.netty.handler.codec.string.StringEncoder
sharable class => io.netty.handler.ipfilter.RuleBasedIpFilter
sharable class => io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
sharable class => io.netty.handler.codec.rtsp.RtspObjectEncoder
sharable class => io.netty.handler.codec.socks.SocksMessageEncoder
sharable class => io.netty.handler.codec.bytes.ByteArrayEncoder
sharable class => io.netty.handler.codec.serialization.ObjectEncoder
sharable class => io.netty.handler.ipfilter.UniqueIpFilter
sharable class => io.netty.handler.codec.socksx.v5.Socks5ClientEncoder
sharable class => io.netty.handler.codec.protobuf.ProtobufEncoder
sharable class => io.netty.handler.codec.socksx.v4.Socks4ClientEncoder
sharable class => io.netty.handler.codec.dns.DatagramDnsResponseEncoder
sharable class => io.netty.handler.logging.LoggingHandler
sharable class => io.netty.handler.codec.mqtt.MqttEncoder
sharable class => io.netty.handler.codec.socksx.v5.Socks5ServerEncoder
sharable class => io.netty.handler.traffic.GlobalChannelTrafficShapingHandler
sharable class => io.netty.handler.codec.string.StringDecoder
sharable class => io.netty.handler.codec.marshalling.MarshallingEncoder
sharable class => io.netty.handler.codec.protobuf.ProtobufDecoder
sharable class => io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec
sharable class => io.netty.handler.codec.dns.DatagramDnsResponseDecoder
sharable class => io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender
sharable class => io.netty.handler.codec.http.websocketx.WebSocket00FrameEncoder
sharable class => io.netty.handler.codec.base64.Base64Decoder
sharable class => io.netty.handler.codec.protobuf.ProtobufEncoderNano
sharable class => io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder

打印代码:

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.11</version>
            <scope>test</scope>
        </dependency>
    @Test
    public void findSharable() {
        Set<Class<?>> set = new Reflections("io.netty.handler").getTypesAnnotatedWith(ChannelHandler.Sharable.class);
        System.out.println("sharable class size =>  " + set.size());
        for (Class<?> clazz : set) {
            System.out.println("sharable class => " + clazz.getName());
        }
    }

io.netty.channel.ChannelPipelineException: xxx is not a @Sharable handler, so can't be added or removed multiple times

测试的代码:

public class HelloHandler extends ChannelInboundHandlerAdapter {
   ...
}
public class AppChannelInializer extends ChannelInitializer {

    private static final HelloHandler helloHandler = new HelloHandler();

    @Override
    protected void initChannel(final Channel ch) {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpResponseEncoder());
        p.addLast(helloHandler);
    }
}

可以看到, HelloHandler 没有标注为 @Sharable , 并且, 它又想只创建一个实例的~ 先来看下源码的逻辑先:

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

定位源码, 可以发现上面的异常是由 checkMultiplicity(handler); 这个调用产生的, 它的源码如下:

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

从代码逻辑可以了解到: 当 HelloHandler 第一次被添加时, 是没问题的(因为 h.added = false, 所以并不会进入 if 里面的代码). 它会在第二次初始化 ChannelPipeline 时(Netty对每条 Channel, 都会进行初始化和清理 Handler 工作), 因为 HelloHandler 是同一个实例, 所以if 的逻辑判断是: h.isSharable() 为 false , 并且 h.added 为 true(第一次被添加时, 将它设置为 true 了), 这时就进入了 if 里面的代码块, 最后就抛出了 ChannelPipelineException 的异常~

解决办法

如果你确保你自己写的 Handler 是线程安全的, 则可以将它声明为 @Sharable, 否则就不要共享它, 而是每次都 new 一个.

Netty EventLoopGroup 初始化

默认线程数

MultithreadEventLoopGroup

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

可以看到, 最小为1, 也可以通过环境变量 io.netty.eventLoopThreads 来覆盖, 否则默认是当前环境的可用CPU核心数的两倍.(这个可用核心数, 也可以通过环境变量 io.netty.availableProcessors 来修改, 如果存在这个环境变量的话, 则优先使用这个, 而不是通过 Runtime.getRuntime().availableProcessors() 获取)

参考 NettyRuntime 类的下面代码

        synchronized int availableProcessors() {
            if (this.availableProcessors == 0) {
                final int availableProcessors =
                        SystemPropertyUtil.getInt(
                                "io.netty.availableProcessors",
                                Runtime.getRuntime().availableProcessors());
                setAvailableProcessors(availableProcessors);
            }
            return this.availableProcessors;

Netty 最挂实践

normanmaurer.me

摘要过来一下:

  • 在 channelReadComplete 方法用才 flush(...), 其他的方法, 则只是 write(...). 因为 flush 会有 syscall
    • 尽可能地减少 flush(), 以减少 syscall 调用, 因为 syscall 调用是相当昂贵的~
    • 如果可以的话, 也请减少 write(...) 调用, 因为它会穿过整个 pipeline 的~
  • 如果可以的话, 请使用 VoidChannelProise Channel.write(msg) => Channel.write(msg, Channel.voidPromise()) (以减少GC压力)
    • 仅当你对 ChannelFuture 不感兴趣 以及没有 ChannelOutboundHandler 需要添加 ChannelFutureListener 时才这样子做~
  • 当接收方太慢时, 正确的写的方式:
public class GracefulWriteHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    writeIfPossible(ctx.channel());
  }
  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) {
    writeIfPossible(ctx.channel());
  }

  private void writeIfPossible(Channel channel) {
    while(needsToWrite && channel.isWritable()) { 
      channel.writeAndFlush(createMessage());
    }
  }
}
  • 配置写的高低水位:
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
  • 尽可能地使用 ByteToMessageDecoder 而不是 ReplayingDecoder (它有更多的 overhead 以及需要处理 ReplayingError)
  • 小心使用 unpooled buffers(分配和回收比较慢). 尽可能使用 Pooled Buffers: bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); . 但还不够, 还要配置: java -Dio.netty.allocator.numDirectArenas=... -Dio.netty.allocator.numHeapArenas=... (注: 新版Netty, 这已经是默认值了, github)
  • 使用 Pool buffers 来减少内存的分配和释放时间
  • 仅当需要在 ChannelOutboundHandler 里操作 byte[] 时, 才使用 heap buffers, 默认ByteBufAllocator.buffer(...) 返回的是 direct ByteBuf(即尽可能总是写到 direct buffers)
  • 在 ByteBuf 中查找:
比较慢的方式:
ByteBuf buf = ...;
int index = -1;
for (int i = buf.readerIndex(); index == -1 && i <  buf.writerIndex(); i++) {
  if (buf.getByte(i) == '\n') {
    index = i;
  }
}

更快的方式:
ByteBuf buf = ...;
int index = buf.forEachByte(new ByteBufProcessor() {
  @Override
  public boolean process(byte value) {
    return value == '\n';
  }
});

这是因为: ByteBufProcessor

  1. 可以擦除范围检查
  2. 可以被创建和共享
  3. 更容易被 JIT 内联
  • 其他的 Buffer 提示

    • 使用 alloc() 来代替 Unpooled
    • 使用 slice(), dumplicate() 来代替 copy
    • 使用批量操作来代替循环
  • 使用 zero-memory-copy 来高效进行文件内容传输:

Channel channel = ...;
FileChannel fc = ...;
channel.writeAndFlush(new DefaultFileRegion(fc, 0, fileLength));

仅当你不需要修改文件数据时. 如果需要修改的话, 使用 ChunkedWriteHandlerNioChunkedFile

  • 永远不要阻塞 EventLoop !!!

    • Thread.sleep()
    • CountDownLatch.await() 或其他来自 java.util.concurrent 的任何阻塞操作
    • 长时间的计算操作
    • 会阻塞一段时间的操作(例如 DB 查询)
  • EventLoop 继承自 ScheduledExecutorService, 所以使用它~

    • 通过 EventLoop 来进行 Schedule 和 Execute task 可减少需要的线程, 但要保证它是线程安全的
  • 重用 EventLoopGroup, 如果可以的话. 共享同样的 EventLoopGroup 来减少资源的使用

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap().group(group);
Bootstrap bootstrap2 = new Bootstrap().group(group);
  • 应用程序代理中的上下文切换问题

不要这样子做:

public class ProxyHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) { 
    final Channel inboundChannel = ctx.channel();
    Bootstrap b = new Bootstrap();
    b.group(new NioEventLooopGroup()); 
    ...
    ChannelFuture f = b.connect(remoteHost, remotePort);
    ...
  }
}

而要这样子:

public class ProxyHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) { 
    final Channel inboundChannel = ctx.channel();
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop()); 
    ...
    ChannelFuture f = b.connect(remoteHost, remotePort);
    ...
  }
}

记住: 总是在整个应用中, 共享 EvenLoop

  • 在 EventLoop 外部调用时, 建议总是组合操作

不建议:

channel.write(msg1);
channel.writeAndFlush(msg3);

建议:

channel.eventLoop().execute(new Runnable() {
  @Override
  public void run() {
    channel.write(msg1);
    channel.writeAndFlush(msg3);
  }
});
  • 在 ChannelHandler 内部操作时
public class YourHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    // BAD (most of the times)
    ctx.channel().writeAndFlush(msg); 

    // GOOD
    ctx.writeAndFlush(msg); 
   }
}

这是因为 Channel.* 的方法, 将会从 ChannelPipeline 的尾端开始操作~ 而 ChannelHandlerContext.* 的方法, 则是从当前的 ChannelHandler 开始再通过 ChannelPipeline 操作.

即: 尽可能地使用最短路径来获取最大化的性能.

  • 如果 ChannelHandler 是无状态的话, 则共享它
@ChannelHandler.Shareable 
public class StatelessHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    logger.debug("Now client from " + ctx.channel().remoteAddress().toString());
   }
}

public class MyInitializer extends ChannelInitializer<Channel> {
  private static final ChannelHandler INSTANCE = new StatelessHandler();
  @Override
  public void initChannel(Channel ch) {
    ch.pipeline().addLast(INSTANCE);
  }
}
  • 当不再需要某个 ChannelHandler 时, 删除它~
public class OneTimeHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) {
    doOneTimeAction();
    ctx.channel().pipeline().remove(this); 
   }
}
  • 在 MessageToByteEncoder 中使用适当的 buffer 类型
public class EncodeActsOnByteArray extends MessageToByteEncoder<YourMessage> {
  public EncodeActsOnByteArray() { super(false); } 
  @Override
  public encode(ChannelHandlerContext ctx, YourMessage msg, ByteBuf out) {
    byte[] array = out.array(); 
    int offset = out.arrayOffset() + out.writerIndex();
    out.writeIndex(out.writerIndex() + encode(msg, array, offset)); 
  }
  private int encode(YourMessage msg, byte[] array, int offset, int len) { ... }
}

这可减少额外的字节复制.

  • 自动读取或非自动读取(这在写代理类的应用时非常有用)
channel.config().setAutoRead(false); 
channel.read(); 
channel.config().setAutoRead(true); 
  • SSL: 不要使用 JDK SSLEngine, 如果关注性能的话

JDK的 SSLEngine 太慢了并且会产生相当多的 GC

建议使用 Twitters 基于 OpenSSL 的 SSLEngine. finagle-native

import com.twitter.finagle.ssl.Ssl;
...

SSLEngine engine = Ssl.server("/path/to/cert.pem", "/path/to/key.pem", "/path/to/cachain.pem", null, null).self();
pipeline.addLast("ssl", new SslHandler(engine));
  • 你永不知道什么时候 ChannelFuture 会被唤醒

不好的写法

public class OuterClass {
  private final HeavyObject instance = ....;
  public void write() { channel.writeAndFlush(msg).addListener(
      new ChannelFutureListener() { ...}); }
}

好的写法:

public class OuterClass {
  private final HeavyObject instance = ....;
  public void write() { channel.writeAndFlush(msg).addListener(new ListenerImpl()); }

  private static final class ListenerImpl implements ChannelFutureListener { ... }
  • 本地的 Transport, 更少的 GC 更低的延迟:

https://github.com/netty/netty/wiki/Native-transports

仅在 Linux 下可用~

Bootstrap bootstrap = new Bootstrap().group(new EpollEventLoopGroup());
bootstrap.channel(EpollSocketChannel.class);

Netty 线程并没有关闭 channel

现象

写了个简单的 HTTP 请求/响应代码, 然后发送 curl 命令, 但是 curl 命令一直在等待服务器响应并且不会断开连接.

server 的代码:

initializer:

    @Override
    protected void initChannel(final Channel channel) {
        final ChannelPipeline p = channel.pipeline();
        p.addLast(new HttpServerCodec());
        p.addLast(new HttpObjectAggregator(_1M));
        p.addLast(new HelloHandler());
    }

HelloHandler:

@ChannelHandler.Sharable
public class HelloHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) throws Exception {
        DefaultFullHttpResponse defaultHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer("hello world".getBytes()));
        ctx.write(defaultHttpResponse);
        System.out.println("write ok");
    }

    @Override
    public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

curl:

curl http://localhost:8080/helloworld -v
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> GET /helloworld HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
* no chunk, no close, no size. Assume close to signal end
<

可以看到, curl 命令就一直这样子挂起着.

解决

指定 Content-TypeContent-Length 即可.

private static final AsciiString CONTENT_TYPE = AsciiString.cached("Content-Type");
private static final AsciiString CONTENT_LENGTH = AsciiString.cached("Content-Length");
private static final AsciiString TEXT = AsciiString.cached("text/plain");


defaultHttpResponse.headers().set(CONTENT_TYPE, TEXT);
defaultHttpResponse.headers().setInt(CONTENT_LENGTH, defaultHttpResponse.content().readableBytes());
ctx.write(defaultHttpResponse);

编写 HTTP Server

p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());

可以直接合并成一个:

p.addLast(new HttpServerCodec());

HttpObjectAggregator 作用

它一般结合 FullHttpRequest 来使用.

p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1 * MB));
p.addLast(appHandler);

注意, 一般要将 HttpServerCodec 放在 HttpObjectAggregator 前面.

这样子配置之后, 就可以直接获取 FullHttpRequest 对象了:

public class AppHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
	...
}

参考资料