《Netty实战》读书笔记
Contents
Netty 核心组件
- Channel : 传入或传出数据的载体
- 回调 : 一个被提供给另一个方法的方法引用
- Future : 提供了另一种在完成时通知应用程序的方式. Netty 的出站I/O 操作都将返回一个 ChannelFuture.
- 事件 和 ChannelHandler : Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。
EeventLoop : Netty 在内部会为每个 Channel 分配一个 EventLoop,用以处理所有事件
入站事件 -> 入站处理器 -> 入站事件 -> 入站处理器 -> ... ... <- 出站处理器 <- 出站事件 <- 出站处理器 <- 出站事件
关于 ChannelPipeline 的正确理解
其实官方文档已经给出了非常详细的解释了~
echo 服务器
[github emacsist nett-echo]()
纯手工输入~自己输入一次,加深下印象
server 端代码
Handler
package com.github.emacsist.echo.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* Created by emacsist on 2017/6/25.
*/
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Bootstrap 代码
package com.github.emacsist.echo.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* Created by emacsist on 2017/6/25.
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
if (args.length != 1 ){
System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
}
client 端代码
Handler 代码
package com.github.emacsist.echo.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* Created by emacsist on 2017/6/25.
*/
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Bootstrap 代码
package com.github.emacsist.echo.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* Created by emacsist on 2017/6/25.
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port){
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
if (args.length != 2 ){
System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
运行及结果
server
[13:35:16] emacsist:echo-server $ mvn exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building echo-server 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> exec-maven-plugin:1.2.1:java (default-cli) > validate @ echo-server >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.2.1:java (default-cli) < validate @ echo-server <<<
[INFO]
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ echo-server ---
Server received: Netty rocks!
client
[13:35:05] emacsist:echo-client $ mvn exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building echo-client 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> exec-maven-plugin:1.2.1:java (default-cli) > validate @ echo-client >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.2.1:java (default-cli) < validate @ echo-client <<<
[INFO]
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ echo-client ---
Client received: Netty rocks!
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.327 s
[INFO] Finished at: 2017-06-25T13:35:28+08:00
[INFO] Final Memory: 9M/155M
[INFO] ------------------------------------------------------------------------
[13:35:28] emacsist:echo-client $
Netty 网络抽象
Channel – Socket
- EmbeddedChannel
- LocalServerChannel
- NioDatagramChannel
- NioSctpChannel
- NioSocketChannel
EventLoop – 控制流、多线程处理、并发。它是Netty的核心抽象,用于处理连接的生命周期中所发生的事件。
- 一个 EventLoopGroup 包含一个或多个 EventLoop
- 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
- 所有由 EventLoop 处理的 I/O 事件都将在它专的 Thread 上被处理
- 一个 Channel 在它的生命周期内只注册一个 EventLoop
- 一个 EventLoop 可能会被分配给一个或多个 Channel
ChannelFuture – 异步通知
ChannelHandler – 它充当了所有处理入站和出站数据的应用程序逻辑的容器。(ChannelInboundHandler 它处理入站的数据, ChannelOutboundHandler 它处理出站的数据)
ChannelPipeline – 提供了 ChannelHandler 链的容器。
ChannelHandlerContext – 当 ChannelHandler 被添加到 ChannelPipeline 时,它将会被分配一个 ChannelHandlerContext ,其代表了 ChannelHandler 和 ChannelPipeline 之间的绑定。虽然这个对象可以被用于获取其底层的 Channel,但是它主要还是被用于写出站数据。
Netty 两种发送消息方式
- 直接写到 Channel => 这导致消息从 ChannelPipeline 的 尾端开始流动
- 写到和 ChannelHandler 相关联的 ChannelHandlerContext => 导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动
编码器与解码器
Netty 接收或发送一个消息时,会发生一次数据转换
- 入站:解码 byte -> object
- 出站:编码 object -> byte
内置的传输
- NIO : 使用 java.nio.channels 包作为基础(支持的协议有 TCP, UDP, SCTP, UDT)
- Epool : 使用 io.netty.channel.epoll ,由 JNI 驱动的 epoll() 和 非阻塞IO 。这个传输支持只有在Linux上可用的多种特性,如: SO_REUSEPORT,比NIO更快,而且是完全非阻塞的。(仅Linux,支持的协议有 TCP, UDP)
- OIO : 使用 java.net 包作为基础,使用阻塞流(支持的协议有 TCP,UDP,SCTP, UDT)
- Local : 使用 io.netty.channel.local ,可以在VM内部通过管道进行通信的本地传输
- Embedded : 使用 io.netty.channel.embedded ,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。这在测试你的 ChannelHandler 实现时非常有用。
ByteBuf —— Netty的数据容器
它维护了两个不同的索引:一个用于读取,一个用于写入。
当你从 ByteBuf 读取时,它的 readerIndex 将会被递增(以 read 开头的API)已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增(以 write 开头的API)。以 set, get 开头的API 不会改变这两个索引。
使用模式
- 堆缓冲区:这种模式称为 backing array,类似 JDK 的 ByteBuffer 的用法。它将数据存储在 JVM 的堆空间中。
- 直接缓冲区:它的数据存储在 JVM 的堆外空间,主要缺点是:它的分配和释放都比较昂贵。
- 复合缓冲区:它为多个 ByteBuf 提供了一个聚合视图(比如HTTP的头部+HTTP的Body)。(CompositeByteBuf,它可以同时包含直接内存分配和非直接内存分配的 ByteBuf)
派生缓冲区
创建方法:
- duplicate()
- slice()
- slice(int, int)
- Unpooled.unmodifiableBuffer()
- order(ByteOrder)
- readSlice(int)
每个这些方法都将返回一个新的ByteBuf 实例,它具有自己的读索引,写索引和标记索引。 其内部存储和JDK的ByteBuffer一样,也是共享 (注意,内容是共享的!!!)
ByteBufHolder 接口
除了实际的数据之外,还要存储各种属性值,如HTTP,字节的内容、状态码、cookie等。
- content() : 返回这个 ByteBufHolder 所持有的 ByteBuf
- copy() : 返回这个 ByteBufHolder 的一个深拷贝
- duplicate() : 返回这个 ByteBufHolder 的一个浅拷贝
Channel 的生命周期
ChannelRegistered -> ChannelActive -> ChannelInactive -> ChannelUnregistered
ChannelHandler 的生命周期
handlerAdded : 添加到 ChannelPipeline 中时被调用
handlerRemoved : 从 ChannelPipeline 中移除时被调用
handlerCaught : 当处理过程中在 ChannelPipeline 中有错误产生时调用
资源管理
调用 ChannelInboundHandler.channelRead() 或者 ChannelOutboundHandler.write() 处理数据时,必须要保证没有任何的资源泄漏。(因为Netty是使用引用计数来处理池化的 ByteBuf 的)
为了检测资源泄漏的问题,Netty 提供了 class ResourceLeakDetector 来检测。
检测级别有:
- DISABLED : 禁用泄漏检测
- SIMPLE : 使用 1% 的默认采样率检测并报告。(这是默认级别)
- ADVANCED : 使用默认的采样率,报告所发现的任何的泄漏以及对应的消息被访问的位置
- PARANOID : 类似 ADVANCED ,但是会对每一次(对消息的)访问都采样。(这应该只是在调试阶段使用)
使用:
java -Dio.netty.leakDetectionLevel=ADVANCED
异常处理
- ChannelHandler.execeptionCaught() 默认实现是简单地将当前异常转发给 ChannelPipeline 中的下一个 ChannelHandler
- 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理
- 要想定义自定义的处理逻辑,你需要重写 exceptionCaught() 方法。然后你需要决定是否需要将该异常传播出去
JDK 5 线程模型
池化,它的思想:
- 从池中空闲线程列表中选择一个 Thread,并且派它去运行一个已经提交的任务(Runnable的实现)
- 当任务完成时,将该Thread返回给该列表,使其可被重用
这虽然减少了每个任务线程的创建和销毁,但它并不能消除由 上下文切换 所带来的开销,随着线程数量的增加很快变得明显,并且在高负载下,愈演愈烈。
EventLoop 接口
while(!terminated) {
List<Runnable> readyEvents = blockUntilEventsReady()
for (Runnable ev : readyEvents) {
ev.run()
}
}
在这个线程模型中,一个 EventLoop 由一个永不变的 Thread 驱动。可根据可用核心不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel
Netty 4 中的 I/O 和事件处理
在 Netty 4 中,所有的 I/O 操作和事件都已经被分配给了 EventLoop 的那个 Thread 来处理(注意,是处理,而不是触发哦)
Netty 3 中的 I/O 操作
这个模型只保证了入站(上游)事件会在所谓的 I/O 线程(对应 Netty 4 中的 EventLoop )执行。 所有的出站(下游)事件都由调用线程处理,其可能是 I/O 线程也可能是别的线程。
主要问题是:该模型不可能保证多个线程不会在同一时刻尝试访问出站事件。(例如,在不同的线程中调用 Channel.write() ,针对同一个 Channel 同时触发出站的事件,就会发生这种情况)
线程管理
取决于当前执行的 Thread 的身份的确定(EventLoop的 inEvenLoop(Thread) 来确定) 。
如果是支撑的 EventLoop 的线程,那么所提交的代码将会被(直接)执行。 否则 EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。
!!!!永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程上执行的任何的其他任务。
EventLoop 的线程分配
NIO 中
这种模型中,它们可能会被多个 Channel 所共享,这样可以通过尽可能少的 Thread 来支撑大量的 Channel ,而不是每个 Channel 分配一个 Thread 。EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin) 的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel 。
OIO 中
这里每个 Channel 都将被分配一个 EventLoop(以及它的Thread)
导引
<-- Bootstrap(客户端)
Cloneable <- AbstractBootstrap <--
<-- ServerBootstrap(服务器端)
为什么引导类是 Cloneable 的?
你有时可能会需要创建多个具有类似配置或者完全相同配置的 Channel 。为了支持这种模式而又不需要为每个 Channel 都创建并配置一个新的引导类实例,AbstractBootstrap 被标记为了 Cloneable 。在一个已经配置完成的引导类实例上调用 clone() 方法将返回一个可以立即使用的引导类实例。
注意,这种方式只会创建引导类实例的 EventLoopGroup 的一个浅拷贝,所以,后者将在所有克隆的 Channel 实例之间共享。
Bootstrap
内置编/解码器
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/base64
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/bytes
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/compression
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/dns
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/haproxy
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/cookie
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/cors
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/multipart
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/websocketx
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/websocketx/extensions
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http/websocketx/extensions/compression
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/http2
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/json
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/marshalling
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/memcache
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/memcache/binary
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/mqtt
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/protobuf
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/redis
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/rtsp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/sctp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/serialization
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/smtp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socks
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socksx
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socksx/v4
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/socksx/v5
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/spdy
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/stomp
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/string
/Users/emacsist/.m2/repository/io/netty/netty-all/4.1.12.Final/netty-all-4.1.12.Final.jar!/io/netty/handler/codec/xml