Netty 核心组件

  • Channel : 传入或传出数据的载体
  • 回调 : 一个被提供给另一个方法的方法引用
  • Future : 提供了另一种在完成时通知应用程序的方式. Netty 的出站I/O 操作都将返回一个 ChannelFuture.
  • 事件 和 ChannelHandler : Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。
  • EeventLoop : Netty 在内部会为每个 Channel 分配一个 EventLoop,用以处理所有事件
入站事件 -> 入站处理器 -> 入站事件 -> 入站处理器 -> ...

... <- 出站处理器 <- 出站事件 <- 出站处理器 <- 出站事件

关于 ChannelPipeline 的正确理解

doc

其实官方文档已经给出了非常详细的解释了~

echo 服务器

[github emacsist nett-echo]()

纯手工输入~自己输入一次,加深下印象

server 端代码

Handler

package com.github.emacsist.echo.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * Created by emacsist on 2017/6/25.
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Bootstrap 代码

package com.github.emacsist.echo.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * Created by emacsist on 2017/6/25.
 */
public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(serverHandler);
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1 ){
            System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
            return;
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();
    }
}

client 端代码

Handler 代码

package com.github.emacsist.echo.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * Created by emacsist on 2017/6/25.
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Bootstrap 代码

package com.github.emacsist.echo.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;


/**
 * Created by emacsist on 2017/6/25.
 */
public class EchoClient {
    private final String host;
    private final int port;
    public EchoClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 2 ){
            System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
            return;
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}

运行及结果

server

[13:35:16] emacsist:echo-server $ mvn exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building echo-server 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> exec-maven-plugin:1.2.1:java (default-cli) > validate @ echo-server >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.2.1:java (default-cli) < validate @ echo-server <<<
[INFO]
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ echo-server ---
Server received: Netty rocks!


client

[13:35:05] emacsist:echo-client $ mvn exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building echo-client 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> exec-maven-plugin:1.2.1:java (default-cli) > validate @ echo-client >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.2.1:java (default-cli) < validate @ echo-client <<<
[INFO]
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ echo-client ---
Client received: Netty rocks!
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.327 s
[INFO] Finished at: 2017-06-25T13:35:28+08:00
[INFO] Final Memory: 9M/155M
[INFO] ------------------------------------------------------------------------
[13:35:28] emacsist:echo-client $

Netty 网络抽象

  • Channel – Socket

    • EmbeddedChannel
    • LocalServerChannel
    • NioDatagramChannel
    • NioSctpChannel
    • NioSocketChannel
  • EventLoop – 控制流、多线程处理、并发。它是Netty的核心抽象,用于处理连接的生命周期中所发生的事件。

    • 一个 EventLoopGroup 包含一个或多个 EventLoop
    • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
    • 所有由 EventLoop 处理的 I/O 事件都将在它专的 Thread 上被处理
    • 一个 Channel 在它的生命周期内只注册一个 EventLoop
    • 一个 EventLoop 可能会被分配给一个或多个 Channel
  • ChannelFuture – 异步通知

  • ChannelHandler – 它充当了所有处理入站和出站数据的应用程序逻辑的容器。(ChannelInboundHandler 它处理入站的数据, ChannelOutboundHandler 它处理出站的数据)

  • ChannelPipeline – 提供了 ChannelHandler 链的容器。

  • ChannelHandlerContext – 当 ChannelHandler 被添加到 ChannelPipeline 时,它将会被分配一个 ChannelHandlerContext ,其代表了 ChannelHandler 和 ChannelPipeline 之间的绑定。虽然这个对象可以被用于获取其底层的 Channel,但是它主要还是被用于写出站数据。

Netty 两种发送消息方式

  1. 直接写到 Channel => 这导致消息从 ChannelPipeline 的 尾端开始流动
  2. 写到和 ChannelHandler 相关联的 ChannelHandlerContext => 导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动

编码器与解码器

Netty 接收或发送一个消息时,会发生一次数据转换

  • 入站:解码 byte -> object
  • 出站:编码 object -> byte

内置的传输

  • NIO : 使用 java.nio.channels 包作为基础(支持的协议有 TCP, UDP, SCTP, UDT)
  • Epool : 使用 io.netty.channel.epoll ,由 JNI 驱动的 epoll() 和 非阻塞IO 。这个传输支持只有在Linux上可用的多种特性,如: SO_REUSEPORT,比NIO更快,而且是完全非阻塞的。(仅Linux,支持的协议有 TCP, UDP)
  • OIO : 使用 java.net 包作为基础,使用阻塞流(支持的协议有 TCP,UDP,SCTP, UDT)
  • Local : 使用 io.netty.channel.local ,可以在VM内部通过管道进行通信的本地传输
  • Embedded : 使用 io.netty.channel.embedded ,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。这在测试你的 ChannelHandler 实现时非常有用。

ByteBuf —— Netty的数据容器

它维护了两个不同的索引:一个用于读取,一个用于写入。

当你从 ByteBuf 读取时,它的 readerIndex 将会被递增(以 read 开头的API)已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增(以 write 开头的API)。以 set, get 开头的API 不会改变这两个索引。

使用模式

  • 堆缓冲区:这种模式称为 backing array,类似 JDK 的 ByteBuffer 的用法。它将数据存储在 JVM 的堆空间中。
  • 直接缓冲区:它的数据存储在 JVM 的堆外空间,主要缺点是:它的分配和释放都比较昂贵。
  • 复合缓冲区:它为多个 ByteBuf 提供了一个聚合视图(比如HTTP的头部+HTTP的Body)。(CompositeByteBuf,它可以同时包含直接内存分配和非直接内存分配的 ByteBuf)

派生缓冲区

创建方法:

  • duplicate()
  • slice()
  • slice(int, int)
  • Unpooled.unmodifiableBuffer()
  • order(ByteOrder)
  • readSlice(int)

每个这些方法都将返回一个新的ByteBuf 实例,它具有自己的读索引,写索引和标记索引。 其内部存储和JDK的ByteBuffer一样,也是共享 (注意,内容是共享的!!!)

ByteBufHolder 接口

除了实际的数据之外,还要存储各种属性值,如HTTP,字节的内容、状态码、cookie等。

  • content() : 返回这个 ByteBufHolder 所持有的 ByteBuf
  • copy() : 返回这个 ByteBufHolder 的一个深拷贝
  • duplicate() : 返回这个 ByteBufHolder 的一个浅拷贝

Channel 的生命周期

ChannelRegistered -> ChannelActive -> ChannelInactive -> ChannelUnregistered

ChannelHandler 的生命周期

handlerAdded : 添加到 ChannelPipeline 中时被调用
handlerRemoved : 从 ChannelPipeline 中移除时被调用
handlerCaught : 当处理过程中在 ChannelPipeline 中有错误产生时调用

资源管理

调用 ChannelInboundHandler.channelRead() 或者 ChannelOutboundHandler.write() 处理数据时,必须要保证没有任何的资源泄漏。(因为Netty是使用引用计数来处理池化的 ByteBuf 的)

为了检测资源泄漏的问题,Netty 提供了 class ResourceLeakDetector 来检测。

检测级别有:

  • DISABLED : 禁用泄漏检测
  • SIMPLE : 使用 1% 的默认采样率检测并报告。(这是默认级别)
  • ADVANCED : 使用默认的采样率,报告所发现的任何的泄漏以及对应的消息被访问的位置
  • PARANOID : 类似 ADVANCED ,但是会对每一次(对消息的)访问都采样。(这应该只是在调试阶段使用)

使用:

java -Dio.netty.leakDetectionLevel=ADVANCED

异常处理

  • ChannelHandler.execeptionCaught() 默认实现是简单地将当前异常转发给 ChannelPipeline 中的下一个 ChannelHandler
  • 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理
  • 要想定义自定义的处理逻辑,你需要重写 exceptionCaught() 方法。然后你需要决定是否需要将该异常传播出去

JDK 5 线程模型

池化,它的思想:

  1. 从池中空闲线程列表中选择一个 Thread,并且派它去运行一个已经提交的任务(Runnable的实现)
  2. 当任务完成时,将该Thread返回给该列表,使其可被重用

这虽然减少了每个任务线程的创建和销毁,但它并不能消除由 上下文切换 所带来的开销,随着线程数量的增加很快变得明显,并且在高负载下,愈演愈烈。

EventLoop 接口

while(!terminated) {
         List<Runnable> readyEvents = blockUntilEventsReady()
         for (Runnable ev : readyEvents) {
                 ev.run()
         }
}

在这个线程模型中,一个 EventLoop 由一个永不变的 Thread 驱动。可根据可用核心不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel

Netty 4 中的 I/O 和事件处理

在 Netty 4 中,所有的 I/O 操作和事件都已经被分配给了 EventLoop 的那个 Thread 来处理(注意,是处理,而不是触发哦)

Netty 3 中的 I/O 操作

这个模型只保证了入站(上游)事件会在所谓的 I/O 线程(对应 Netty 4 中的 EventLoop )执行。 所有的出站(下游)事件都由调用线程处理,其可能是 I/O 线程也可能是别的线程。

主要问题是:该模型不可能保证多个线程不会在同一时刻尝试访问出站事件。(例如,在不同的线程中调用 Channel.write() ,针对同一个 Channel 同时触发出站的事件,就会发生这种情况)

线程管理

取决于当前执行的 Thread 的身份的确定(EventLoop的 inEvenLoop(Thread) 来确定) 。

如果是支撑的 EventLoop 的线程,那么所提交的代码将会被(直接)执行。 否则 EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。

!!!!永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程上执行的任何的其他任务。

EventLoop 的线程分配

NIO 中

这种模型中,它们可能会被多个 Channel 所共享,这样可以通过尽可能少的 Thread 来支撑大量的 Channel ,而不是每个 Channel 分配一个 Thread 。EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin) 的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel 。

OIO 中

这里每个 Channel 都将被分配一个 EventLoop(以及它的Thread)

导引

                               <-- Bootstrap(客户端)
Cloneable <- AbstractBootstrap <--
                               <-- ServerBootstrap(服务器端)

为什么引导类是 Cloneable 的?

你有时可能会需要创建多个具有类似配置或者完全相同配置的 Channel 。为了支持这种模式而又不需要为每个 Channel 都创建并配置一个新的引导类实例,AbstractBootstrap 被标记为了 Cloneable 。在一个已经配置完成的引导类实例上调用 clone() 方法将返回一个可以立即使用的引导类实例。

注意,这种方式只会创建引导类实例的 EventLoopGroup 的一个浅拷贝,所以,后者将在所有克隆的 Channel 实例之间共享。

Bootstrap

内置编/解码器

/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/base64
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/bytes
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/compression
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/dns
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/haproxy
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/cookie
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/cors
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/multipart
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/websocketx
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/websocketx/extensions
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/websocketx/extensions/compression
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http2
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/json
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/marshalling
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/memcache
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/memcache/binary
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/mqtt
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/protobuf
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/redis
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/rtsp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/sctp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/serialization
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/smtp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socks
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socksx
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socksx/v4
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socksx/v5
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/spdy
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/stomp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/string
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/xml