Netty网络编程1
文章目录
1. IO概念
IO指 Input(输入流)、OutPut(输出流) 以支持JAVA应用对外传输交换数据。传输入目标可以是文件、网络、内存等。
随着对传输效率的要求越来越高,java也逐步演变出了三代IO模型分别是 BIO、NIO、AIO。
BIO 同步阻塞式 (Blocking I/O)
在java1.4之前 这种传输的实现只能通过inputStream和OutputStream 实现。这是一种阻塞式IO模型,应对连接数不多的文件系统还好,如果应对的是成千上万的网络服务,这种阻塞式模型就会造成大量的线程占用,造成服务器无法承载更高的并发。
NIO 同步非阻塞式(Non Blocking I/O)
为解决这个问题 java1.4之后引入了NIO ,它通过双向管道进行通信,并且支持以非阻塞式的方式进行,就解决了网络传输导致线程占用的问题。Netty其在底层就是采用这种通信模型。
AIO 异步非阻塞式(Asynchronous Blocking I/O)
NIO的非阻塞的实现是依赖选择器对管道状态进行轮循实现,如果同时进行的管道较多,性能必会受影响,所以java1.7引入了 异步非阻塞式IO,通过异步回调的方式代替选择器。这种改变在windows下是很明显,在linux系统中不明显。现大部分JAVA系统都是通过linux部署,所以AIO直正被应用的并不广泛。所以我们接下重点关注BIO与NIO的对比。
2.BIO与NIO模型区别
两组模型最大的别区在于阻塞与非阻塞,而所谓的阻塞是什么呢?而非阻塞又是如何解决的呢?
阻塞模型 BIO
在阻塞模型中客户端与服务端建立连接后,就会按顺序发生三个事件
1.客户端把数据写入流中。(阻塞)
2.通过网络通道(TPC/IP)传输到服务端。(阻塞)
3.服务端读取。
这个过程中服务端的线程会一直处阻塞等待,直到数据发送过来后执行第3步。
如果在第1步客户端迟迟不写入数据,或者第2步网络传输延迟太高,都会导致服务端线程阻塞时间更长。所以更多的并发,就意味着需要更多的线程来支撑。
BIO模型里是通1对1线程来等待 第1、2步的完成。而在NIO里是指派了选择器(Selector)来检查,是否满足执行第3步的条件,满足了就会通知线程来执行第3步,并处理业务。这样1、2步的延迟就与 用于处理业务线程无关。
非阻塞模型 NIO
3.NIO 基础组件 Channel 与Buffer
在BIO API中是通过InputStream 与outPutStream 两个流进行输入输出。而在NIO 使用一个双向通信的管道代替了它俩。管道(Channel)必须依赖缓冲区(Buffer)实现通信。
管道对比流多了一些如:非阻塞、堆外内存映射、零拷贝等特性。
注:不同的管道支持的功能不一样,并非所有管道都支持上述特性
缓冲区Buffer
所有管道都依赖了缓冲区Buffer,必须先了解它。
缓冲区Buffer定义
所谓缓冲区就是一个数据容器内部维护了一个数组来存储。Buffer缓冲区并不支持存储任何数据,只能存储一些基本类型,就连字符串也是不能直接存储的。
在平常的开发过程当中用的比较多的是byte类型
Buffer 内部结构
在Buffer内部维护了一个数组,同时有三个属性我们需要关注即:
capacity:容量, 即内部数组的大小,这个值一但声明就不允许改变
**position:位置,**当前读写位置,默认是0每读或写个一个位就会加1
limit:限制,即能够进行读写的最大值,它必须小于或等于capacity
有了capacity做容量限制为什么还要有limit,原因是往Buffer中写数据的时候 不一定会写满,而limit就是用来标记写到了哪个位置,读取的时候就不会超标。
如果读取超标就会报:BufferUnderflowException
同样写入超标也会报:BufferOverflowException
Buffer核心使用
-
allocate:声明一个指定大小的Buffer,position为0,limit为容量值
-
wrap:基于数组包装一个Buffer,position为0,limit为容量值
flip操作
为读取做好准备,将position 置为0,limit置为原position值
clear操作
为写入做好准备,将position 置为0,limit置为capacity值
注:clear不会实际清空数据
rewind操作
为写入做准备,将position 置为0,limit不变。也可用来的读,但是由于limit没有改变,可能会读取到未写入的空数据造成读取数据存在误差。
mark操作
添加标记,以便后续调用 reset 将position回到标记,常用场景如:替换某一段内容
目前我们知道,总共有4个值,分别是 mark、position、limit、capacity它们等于以下规则:
0 <= 标记 <= 位置 <= 限制 <= 容量
Channel 管道
由java.nio.channels包定义的,Channel表示IO源与目标打开的连接,Channel类似于传统的“流”,只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另一侧的实体(如文件、套接字…),反之亦然;通道是访问IO服务的导管,通过通道,我们可以以最小的开销来访问操作系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点
Channel定义
channel 管道是用于连接文件、网络Socket等。它可同时执行读取和写入两个I/O 操作,固称双向管道,它有连接和关闭两个状态,在创建管道时处于打开状态,一但关闭在调用I/O操作就会报ClosedChannelException
。通过管道的isOpen
方法可判断其是否处于打开状态。
常用的 channel 管道
一般开发过程中比较常用的管道有:FileChannel 、DatagramChannel 、ServerSocketChannel、SocketChannel
FileChannel 文件管道
固名思议它就是用于操作文件的,除常规操作外它还支持以下特性:
-
支持对文件的指定区域进行读写
-
堆外内存映射,进行大文件读写时,可直接映射到JVM声明内存之外,从面提升读写效率。
-
零拷贝技术,通过
transferFrom
或transferTo
直接将数据传输到某个通道,极大提高性能。 -
锁定文件指定区域,以阻止其它程序员进行访问
打开FileChannel目前只能通过流间接打开,如inputStream.getChannel() 和outputStream.getChannel() ,通过输入流打开的管道只能进行取,而outputStream打开的只能写。否则会分别抛出NonWritableChannelException与NonReadableChannelException异常。
如果想要管道同时支持读写,必须用RandomAccessFile
读写模式才可以。
FileChannel示例:
FileChannel channel = new RandomAccessFile(file_name,"rw").getChannel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
int count = channel.read(buffer);
read方法会将数据写入到buffer 直到Buffer写满或者数据已经读取完毕。count 返回读取的数量,-1表示已读取完毕。
关于堆外内存映射与零拷贝等技术术,将会在后续章节基于NIO的零拷贝中详细说明
DatagramChannel UDP套接字管道
udp 是一个无连接协议,DatagramChannel就是为这个协议提供服务,以接收客户端发来的消息。
udp实现步骤如下:
// 1.打开管道
DatagramChannel channel = DatagramChannel.open();
// 2.绑定端口
channel.bind(new InetSocketAddress(8080));
ByteBuffer buffer = ByteBuffer.allocate(8192);
// 3.接收消息,如果客户端没有消息,则当前会阻塞等待
channel.receive(buffer);
nc -vu 127.0.0.1 8080 这个命令可向udp发送消息
TCP套接字管道
TCP是一个有连接协议,须建立连接后才能通信。这就需要下面两个管道:
-
ServerSocketChannel: 用于与客户端建立连接
-
SocketChannel:用于和客户端进行消息读写
TCP管道实现步骤如下:
// 1.打开TCP服务管道
ServerSocketChannel channel = ServerSocketChannel.open();
// 2.绑定端口
channel.bind(new InetSocketAddress(8080));
// 3.接受客户端发送的连接请求,(如果没有则阻塞)
SocketChannel socketChannel = channel.accept();
ByteBuffer buffer=ByteBuffer.allocate(1024);
// 4.读取客户端发来的消息(如果没有则阻塞)
socketChannel.read(buffer);
// 5.回写消息
socketChannel.write(ByteBuffer.wrap("返回消息".getBytes()));
// 6.关闭管道
socketChannel.close();
可通过命令进行测试TCP服务 telnet 127.0.0.1 8080
上述例子接收一个消息,并返回客户端,然后关闭。它只能处理一个客户端的请求,然后整个服务就结束了。如果想要处理多个请求,我们可以加上一个循环来接收请求,然后在分配一个子线程去处理每个客户端请求。
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress(8080));
while(true){
SocketChannel socketChannel = channel.accept();
// 使用子线程处理请求
new Thread(()->handle(socketChannel)).start();
}
处理客户端请求
private void handle(SocketChannel socketChannel){
ByteBuffer buffer=ByteBuffer.allocate(1024);
// 1.读取客户端发来的消息(如果没有则阻塞)
socketChannel.read(buffer);
// 返回消息
socketChannel.write(ByteBuffer.wrap("返回消息".getBytes()));
// 关闭管道
socketChannel.close();
}
至此我们已完成了一个简单但完整的请求响应模型。这个模型中每个客户端连接都会有一个子线程进行处理。在没有读到消息前这个线程会一直阻塞在read方法中。所以不难推断这就是一个典型的BIO阻塞模型。
那怎么在管道中实现非阻塞的NIO模型呢?下面我们继续。
4.NIO多路复用选择器
选择器工作模型
上面我们用管道实现了一个阻塞式模型,在那个模型中,服务端建立连接后就会立马分配一个线程等待消息到达。由于不知道什么时候消息能到达客户端,所以主要一直阻塞等待。
能否等消息到达之后在分配线程进行处理?这就需要Selector出场了。只要将管道设置为非阻塞模式,然后注册至Selector。当消息到达后就会得到通知。
图中左边的管道没有注册至Selector 需要阻塞等待消息,右边已注册至Selector当消息到达时才会分配线程。避免了线程阻塞。
选择器是如何实现这个功能的?在解决这个问题之前先来了解一下选择器中的核心组件
选择器核心组件
选择器核心组件有三个:管道(SelectableChannel)、选择器(Selector)、选择键(SelectorKey)。
并非所有的Channel都支持向选择器注册,只有SelectableChannel子类才可以。当管道注册到Selector后就会返回一个Key,通过这个key就可以获取到关联的管道。接下来就分别介绍三个组件的作用:
SelectableChannel 管道
核心功能有两个,configureBlocking 设置阻塞模式类型,默认为true,代表是阻塞模式。所以在向选择器注册前必须置为false。第二就是是调用register方法注册到指定管道,并且指定要监听的事件。可选的事件有:
CONNECT(建立连接)、ACCEPT(接受连接)、READ(可读)、WRITE(可写) 。但是并不是所有管道都支持这四事件,可通过 validOps() 来查看当前管道支持哪些事件。
Selector 选择器
管道注册至Selector之后 会生成一个键(SelectorKey) 该键维护在Selector的keys中。并通过调用select 方法进行刷新,如果返回数大于0表示有指定数量的键状态发生了变更。
-
select(): 有键更新,立马返回。否则会一直阻塞直到有键更新为止。
-
**select(long ):**有键更新,立马返回。否则会阻塞参数指定毫秒时。
-
**selectNow():**无论有没有键更新都会立马返回
如果有键更新,接下来就可以调用selectedKeys 获取更新的键集了。
SelectionKey 选择键
键用于关联管道与选择器,并监听维护管道一至多个事件,监听事件可在注册时指定,也可后续通过调用SelectionKey.interestOps 来改变。
// 监听读取事件
key=socketChannel.register(selector, SelectionKey.OP_READ);
// 同时监听读写事件
key.interestOps(SelectionKey.OP_READ|OP_WRITE);
SelectionKey 中4个常量表示4个事件
但并非所有管道都支持这四个事件如:
ServerSocketChannel 仅支持OP_ACCEPT事件,
SocketChannel 支持 OP_CONNECT、OP_READ、OP_WRITE 三个事件
查看管道支持哪些事件可通过 validOps() 反回的值 然后进行‘&’运算 判断如
//表示该管道支持 OP_CONNECT 事件监听
socketChannel.validOps()&SelectionKey.OP_CONNECT != 0
此外Key还有如下主要功能:
-
channel() 获取管道
-
isAcceptable() 管道是否处于Accept状态
-
isConnectable 管道是否处于连接就绪状态
-
isReadable 管道是否处于读取就绪状态
-
isWritable 管道是否处于写就续状态
-
isValid() 判断该键是否有效,管道关闭、选择器关闭、键调用cancel()方法都会导致该键无效。
-
**cancel()**取消管道注册(不会直接关闭管道)
选择器的使用
认识了选择器的三个组件,接下来通过一个简单Demo逐步认识Selector 。
Demo演示
下面是一个UDP的例子,通过 DatagramChannel 开放管道,并注册至Selector。然后由Selector代为监听消息的状态。
@Test
public void selectorUdpTest() throws IOException {
Selector selector = Selector.open();
DatagramChannel channel = DatagramChannel.open();
channel.bind(new InetSocketAddress(8080));
// 设置非阻塞模式
channel.configureBlocking(false);
// 1.注册读取就续事件
channel.register(selector, SelectionKey.OP_READ);
while (true) {
//2.刷新键集
int count = selector.select();
if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 3.遍历就续集
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//4.处理该就续键
handle(key);
//5.从就续集中移除
iterator.remove();
}
}
}
}
处理该键,从键中获取管道并读取消息
public void handle(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(8192);
DatagramChannel channel = (DatagramChannel) key.channel();
channel.receive(buffer);// 读取消息并写入缓冲区
}
使用流程
上述例子关键流程说明:
- 通过channel.register()注册管道。
- 通过selector.select()刷新已注册键的状态。
- selector.selectedKeys() 获取就续集并遍历。
- 处理键,即获取管道并读取消息。
- 从选择集中移除。
重点关注一下第 4、5步,如没有执行第4步(即读取管道中的消息),那么该管道目前还是处于读就续状态。当调用选择器select() 方法时会立马返回,造成死循环。如果执行了第四步但没有执行第5步,会导致其留存在选择集中,从而重复进行处理。
键集说明
在选择器中总共维护了三个键集,底层都是Set实现所以不会重复:
-
全部键集(keys):所有向该选择器注册的键都放置于此
-
选择键集(selectedKeys):存放准备就续的键
-
取消键集(cancelledKeys) :存放已取消的键
通过刷新或关闭选择器都会导致,键集发生变更。下图详细说明了键
- 调用select()会刷新键,将已就绪集添加至选择集中、清空取消键集并移除已取消键
- 移除选择集,选择集不会被选择器,需自行调用Set.remove()进行移除
- Cancel()或关闭选择器,关闭管道都会都会将键添加至取消集,但其不会被立马清除,只有下一次刷新时才会被清空 。
NIO实现简单httpServer
package com.example.nettydemo.nio.http;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class SimpleHttpServer {
private final Selector selector;
int port;
private Set<SocketChannel> allConnections = new HashSet<>();
volatile boolean run = false;
HttpServlet servlet;
ExecutorService executor = Executors.newFixedThreadPool(5);
public SimpleHttpServer(int port, HttpServlet servlet) throws IOException {
this.port = port;
this.servlet = servlet;
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
selector = Selector.open();
listenerChannel.bind(new InetSocketAddress(port));
listenerChannel.configureBlocking(false);
listenerChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public Thread start() {
run = true;
Thread thread = new Thread(() -> {
try {
while (run) {
dispatch();
}
} catch (IOException e) {
e.printStackTrace();
}
}, "selector-io");
thread.start();
return thread;
}
public void stop(int delay) {
run = false;
}
private void dispatch() throws IOException {
int select = selector.select(2000);
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
final SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
while (channel.read(buffer) > 0) {
buffer.flip();
out.write(buffer.array(), 0, buffer.limit());
buffer.clear();
}
if (out.size() <= 0) {
channel.close();
continue;
}
System.out.println("当前通道:" + channel);
// 解码并封装request
Request request = decode(out.toByteArray());
// 构建response 并附着于当前key
Response response = new Response();
key.attach(response);
// 提交任务至线程池
executor.submit(() -> {
servlet.doService(request, response);
// 其他线程如果因为调用了selector.select()或者selector.select(long)这两个方法而阻塞,
// 调用了selector.wakeup()之后,就会立即返回结果,并且返回的值!=0,
// 如果当前Selector没有阻塞在select方法上,
// 那么本次 wakeup调用会在下一次select阻塞的时候生效。
selector.wakeup();
key.interestOps(SelectionKey.OP_WRITE);
});
} else if (key.isWritable()) {
final SocketChannel channel = (SocketChannel) key.channel();
channel.write(ByteBuffer.wrap(encode((Response) key.attachment())));
key.interestOps(SelectionKey.OP_READ);
key.attach(null);
}
}
}
// 解码Http服务
private Request decode(byte[] bytes) throws IOException {
Request request = new Request();
BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
String firstLine = reader.readLine();
System.out.println(firstLine);
String[] split = firstLine.trim().split(" ");
request.method = split[0];
request.url = split[1];
request.version = split[2];
//读取请求头
Map<String, String> heads = new HashMap<>();
while (true) {
String line = reader.readLine();
if (line.trim().equals("")) {
break;
}
String[] split1 = line.split(":");
heads.put(split1[0], split1[1]);
}
request.heads = heads;
request.params = getUrlParams(request.url);
//读取请求体
request.body = reader.readLine();
return request;
}
//编码Http 服务
private byte[] encode(Response response) {
StringBuilder builder = new StringBuilder(512);
builder.append("HTTP/1.1 ")
.append(response.code).append(Code.msg(response.code)).append("\r\n");
if (response.body != null && response.body.length() != 0) {
builder.append("Content-Length: ")
.append(response.body.length()).append("\r\n")
.append("Content-Type: text/html\r\n");
}
if (response.headers != null && !response.headers.isEmpty()) {
String headStr = response.headers.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue())
.collect(Collectors.joining("\r\n"));
builder.append(headStr + "\r\n");
}
// builder.append ("Connection: close\r\n");// 执行完后关闭链接
builder.append("\r\n").append(response.body);
return builder.toString().getBytes();
}
public abstract static class HttpServlet {
void doService(Request request, Response response) {
if (request.method.equalsIgnoreCase("GET")) {
doGet(request, response);
} else {
doPost(request, response);
}
}
abstract void doGet(Request request, Response response);
abstract void doPost(Request request, Response response);
}
public static class Request {
Map<String, String> heads;
String url;
String method;
String version;
String body; //请求内容
Map<String, String> params;
}
public static class Response {
Map<String, String> headers;
int code;
String body; //返回结果
}
private static Map getUrlParams(String url) {
Map<String, String> map = new HashMap<>();
url = url.replace("?", ";");
if (!url.contains(";")) {
return map;
}
if (url.split(";").length > 0) {
String[] arr = url.split(";")[1].split("&");
for (String s : arr) {
if (s.contains("=")) {
String key = s.split("=")[0];
String value = s.split("=")[1];
map.put(key, value);
} else {
map.put(s, null);
}
}
return map;
} else {
return map;
}
}
}
package com.example.nettydemo.nio.http;
import java.io.IOException;
import java.util.HashMap;
public class HttpServerTest {
//keep-alive 问题不启作用
public void simpleHttpTest() throws IOException, InterruptedException {
SimpleHttpServer simpleHttpServer = new SimpleHttpServer(8080, new SimpleHttpServer.HttpServlet() {
@Override
void doGet(SimpleHttpServer.Request request, SimpleHttpServer.Response response) {
System.out.println(request.url);
response.body="hello word";
response.code=200;
response.headers=new HashMap<>();
if (request.params.containsKey("short")) {
response.headers.put("Connection", "close");
}else if(request.params.containsKey("long")){
response.headers.put("Connection", "keep-alive");
response.headers.put("Keep-Alive", "timeout=30,max=300");
}
}
@Override
void doPost(SimpleHttpServer.Request request, SimpleHttpServer.Response response) {
}
});
simpleHttpServer.start().join();
}
public static void main(String[] args) throws IOException, InterruptedException {
HttpServerTest httpServerTest = new HttpServerTest();
httpServerTest.simpleHttpTest();
}
}
5.基于NIO的零拷贝
什么是mmap?
平时会经常碰见的问题就是:RocketMQ为什么快?Kafka为什么快?什么是mmap?
这一类的问题都逃不过的一个点就是零拷贝,虽然还有一些其他的原因,但是本文的话题主要就是零拷贝。
传统IO
在开始谈零拷贝之前,首先要对传统的IO方式有一个概念。
基于传统的IO方式,底层实际上通过调用read()
和write()
来实现。
通过read()
把数据从硬盘读取到内核缓冲区,再复制到用户缓冲区;然后再通过write()
写入到socket缓冲区
,最后写入网卡设备。
整个过程发生了4次用户态和内核态的上下文切换和4次拷贝,具体流程如下:
- 用户进程通过
read()
方法向操作系统发起调用,此时上下文从用户态转向内核态 - DMA控制器会把数据从硬盘中拷贝到读缓冲区
- CPU把读缓冲区数据拷贝到应用缓冲区,上下文从内核态转为用户态,
read()
返回 - 用户进程通过
write()
方法发起调用,上下文从用户态转为内核态 - CPU将应用缓冲区中数据拷贝到socket缓冲区
- DMA控制器再把数据从socket缓冲区拷贝到网卡,上下文从内核态切换回用户态,
write()
返回
那么,这里指的用户态、内核态指的是什么?上下文切换又是什么?
简单来说,用户空间指的就是用户进程的运行空间,内核空间就是内核的运行空间。
如果进程运行在内核空间就是内核态,运行在用户空间就是用户态。
为了安全起见,他们之间是互相隔离的,而在用户态和内核态之间的上下文切换也是比较耗时的。
从上面我们可以看到,一次简单的IO过程产生了4次上下文切换,这个无疑在高并发场景下会对性能产生较大的影响。
那么什么又是DMA拷贝呢?
因为对于一个IO操作而言,都是通过CPU发出对应的指令来完成,但是相比CPU来说,IO的速度太慢了,CPU有大量的时间处于等待IO的状态。
因此就产生了DMA(Direct Memory Access)直接内存访问技术,本质上来说他就是一块主板上独立的芯片,通过它来进行内存和IO设备的数据传输,从而减少CPU的等待时间。
但是无论谁来拷贝,频繁的拷贝耗时也是对性能的影响。
传统IO代码演示
@Test
public void BufferTest() throws IOException {
String file_name = "D:/500M.txt";
String copy_name = "D:/500M_copy.txt";
File outputFile = new File(copy_name);
outputFile.delete();
outputFile.createNewFile();
long begin = System.nanoTime();
try (InputStream inputStream = new FileInputStream(file_name);
FileOutputStream outputStream = new FileOutputStream(outputFile);
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream)) {
int c;
while ((c = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(c);
}
System.out.println((System.nanoTime() - begin) / 1.0e6);
} catch (Exception e) {
e.printStackTrace();
}
}
零拷贝
零拷贝技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域,这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。
那么对于零拷贝而言,并非真的是完全没有数据拷贝的过程,只不过是减少用户态和内核态的切换次数以及CPU拷贝的次数。
这里,仅仅有针对性的来谈谈几种常见的零拷贝技术。
mmap+write 内存映射
mmap+write简单来说就是使用mmap
替换了read+write中的read操作,减少了一次CPU的拷贝。
mmap
主要实现方式是将读缓冲区的地址和用户缓冲区的地址进行映射,内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。
整个过程发生了4次用户态和内核态的上下文切换和3次拷贝,具体流程如下:
- 用户进程通过
mmap()
方法向操作系统发起调用,上下文从用户态转向内核态 - DMA控制器把数据从硬盘中拷贝到读缓冲区
- 上下文从内核态转为用户态,mmap调用返回
- 用户进程通过
write()
方法发起调用,上下文从用户态转为内核态 - CPU将读缓冲区中数据拷贝到socket缓冲区
- DMA控制器把数据从socket缓冲区拷贝到网卡,上下文从内核态切换回用户态,
write()
返回
mmap
的方式节省了一次CPU拷贝,同时由于用户进程中的内存是虚拟的,只是映射到内核的读缓冲区,所以可以节省一半的内存空间,比较适合大文件的传输。
mmap+write代码演示
// 内存映射
@Test
public void mmapTest() throws IOException {
String file_name = "D:/500M.txt";
String copy_name = "D:/500M_copy.txt";
File file = new File(copy_name);
file.delete();
file.createNewFile();
long begin = System.nanoTime();
FileChannel channel = new RandomAccessFile(file_name, "rw").getChannel();
FileChannel copyChannel = new RandomAccessFile(copy_name, "rw").getChannel();
//1. 建立一个 映射
MappedByteBuffer mapped = channel
.map(FileChannel.MapMode.READ_WRITE, 0, channel.size());
copyChannel.write(mapped);//2. file内核缓冲区1--》cpu拷贝到 file内核缓冲区2
System.out.println((System.nanoTime() - begin) / 1.0e6);
copyChannel.close();
channel.close();
}
sendfile
相比mmap
来说,sendfile
同样减少了一次CPU拷贝,而且还减少了2次上下文切换。
sendfile
是Linux2.1内核版本后引入的一个系统调用函数,通过使用sendfile
数据可以直接在内核空间进行传输,因此避免了用户空间和内核空间的拷贝,同时由于使用sendfile
替代了read+write
从而节省了一次系统调用,也就是2次上下文切换。
整个过程发生了2次用户态和内核态的上下文切换和3次拷贝,具体流程如下:
- 用户进程通过
sendfile()
方法向操作系统发起调用,上下文从用户态转向内核态 - DMA控制器把数据从硬盘中拷贝到读缓冲区
- CPU将读缓冲区中数据拷贝到socket缓冲区
- DMA控制器把数据从socket缓冲区拷贝到网卡,上下文从内核态切换回用户态,
sendfile
调用返回
sendfile
方法IO数据对用户空间完全不可见,所以只能适用于完全不需要用户空间处理的情况,比如静态文件服务器。
sendfile+DMA Scatter/Gather
Linux2.4内核版本之后对sendfile
做了进一步优化,通过引入新的硬件支持,这个方式叫做DMA Scatter/Gather 分散/收集功能。
它将读缓冲区中的数据描述信息–内存地址和偏移量记录到socket缓冲区,由 DMA 根据这些将数据从读缓冲区拷贝到网卡,相比之前版本减少了一次CPU拷贝的过程
整个过程发生了2次用户态和内核态的上下文切换和2次拷贝,其中更重要的是完全没有CPU拷贝,具体流程如下:
- 用户进程通过
sendfile()
方法向操作系统发起调用,上下文从用户态转向内核态 - DMA控制器利用scatter把数据从硬盘中拷贝到读缓冲区离散存储
- CPU把读缓冲区中的文件描述符和数据长度发送到socket缓冲区
- DMA控制器根据文件描述符和数据长度,使用scatter/gather把数据从内核缓冲区拷贝到网卡
sendfile()
调用返回,上下文从内核态切换回用户态
DMA gather
和sendfile
一样数据对用户空间不可见,而且需要硬件支持,同时输入文件描述符只能是文件,但是过程中完全没有CPU拷贝过程,极大提升了性能。
sendfile代码演示
// sendFile 直接传输
@Test
public void transferFromTest() throws IOException {
String file_name = "D:/500M.txt";
String copy_name = "D:/500M_copy.txt";
File file = new File(copy_name);
file.delete();
file.createNewFile();
FileChannel channel = new RandomAccessFile(file_name, "rw").getChannel();
FileChannel copyChannel = new RandomAccessFile(copy_name, "rw").getChannel();
long begin = System.nanoTime();
// sendFile
// 2次切换 2次拷贝
copyChannel.transferFrom(channel, 0, channel.size());//从批定目标拷贝到当前管道
System.out.println((System.nanoTime() - begin) / 1.0e6);
channel.close();
copyChannel.close();
}
应用场景
前面说的两个场景:RocketMQ和Kafka都使用到了零拷贝的技术。
对于MQ而言,无非就是生产者发送数据到MQ然后持久化到磁盘,之后消费者从MQ读取数据。
对于RocketMQ来说这两个步骤使用的是mmap+write
,而Kafka则是使用mmap+write
持久化数据,发送数据使用sendfile
。
零拷贝总结
由于CPU和IO速度的差异问题,产生了DMA技术,通过DMA搬运来减少CPU的等待时间。
传统的IOread+write
方式会产生2次DMA拷贝+2次CPU拷贝,同时有4次上下文切换。
而通过mmap+write
方式则产生2次DMA拷贝+1次CPU拷贝,4次上下文切换,通过内存映射减少了一次CPU拷贝,可以减少内存使用,适合大文件的传输。
sendfile
方式是新增的一个系统调用函数,产生2次DMA拷贝+1次CPU拷贝,但是只有2次上下文切换。因为只有一次调用,减少了上下文的切换,但是用户空间对IO数据不可见,适用于静态文件服务器。
sendfile+DMA gather
方式产生2次DMA拷贝,没有CPU拷贝,而且也只有2次上下文切换。虽然极大地提升了性能,但是需要依赖新的硬件设备支持。
6.使用Netty实现简单UDP/TCP服务
下面我们用Netty实现一个简单的http服务和udp服务,这里只是简单的使用一下,后面我们将详细介绍Netty框架是怎么进行网络开发的
TCP案例
前面我们用原生NIO实现了一个简单的http服务,下面代码是通过netty实现的简单的http服务
package com.example.nettydemo.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
public class BootstrapTest {
// 编写一个Http 服务
// http-->TCP
public void open(int port) {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
bootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)// 指定要打开的管道 自动进行进行注册==》NioServerSocketChannel ->
.childHandler(new ChannelInitializer<Channel>() {//指定 子管道
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast("decode", new HttpRequestDecoder()); // 输入
ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
ch.pipeline().addLast("encode", new HttpResponseEncoder());// 输出流
ch.pipeline().addLast("servlet", new MyServlet());
}
});
// NioServerSocketChannel.bind==》EventLoop.runTasks ==>ServerSocketChannel.bind()
ChannelFuture future = bootstrap.bind(port);
future.addListener(future1 -> System.out.println("注册成功"));
}
// 请求头
// 453 8192 8192 .... 2532
private class MyServlet extends SimpleChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// request (请求头)
// body (请求体)
if (msg instanceof FullHttpRequest) {
FullHttpRequest request= (FullHttpRequest) msg;
System.out.println("url:"+request.uri());
System.out.println(request.content().toString(Charset.defaultCharset()));
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
response.content().writeBytes("hello".getBytes());
ChannelFuture future = ctx.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
}
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
System.out.println("当前请求:" + request.uri());
}
if (msg instanceof HttpContent) {
// 写入文件流
ByteBuf content = ((HttpContent) msg).content();
OutputStream out = new FileOutputStream("D:/work/nettydemo/target/test.txt", true);
content.readBytes(out, content.readableBytes());
out.close();
}
if (msg instanceof LastHttpContent) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
response.content().writeBytes("上传完毕".getBytes());
ChannelFuture future = ctx.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
public static void main(String[] args) throws IOException {
new BootstrapTest().open(8080);
System.in.read();
}
}
通过对比原生的NIO案例我们可以看到netty可以简化网络编程的难度,实际上netty的版本也更稳定,后面具体分析netty的实现后我们将对netty会有比较清晰的认识。
UDP 案例
下面的案例是一个客户端向服务端发起谚语查询的请求,服务端随机返回一个谚语的案例,代码如下:
udp服务端
package com.example.nettydemo.netty.udp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import java.util.concurrent.ThreadLocalRandom;
/**
* @Author: dinghao
* @Date: 2021/10/27 20:48
*/
public class ChineseProverbServer {
public void run(int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChineseProverbServerHandler());
b.bind(port).sync().channel().closeFuture().await();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8685;
if(args.length>0){
try{
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
e.printStackTrace();
}
}
new ChineseProverbServer().run(port);
}
private static class ChineseProverbServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
// 谚语列表
private static final String[] DICTIONARY = {"只要功夫深,铁杵磨成针。","旧时王谢堂前燕,飞入寻常百姓家。",
"洛阳亲友如相问,一片冰心在玉壶","一寸光阴一寸金,寸金难买寸光阴。",
"床前明月光,疑是地上霜。举头望明月,低头思故乡。"};
private String nextQuote(){
int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
return DICTIONARY[quoteId];
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String req = msg.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
if("谚语字典查询?".equals(req)){
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语查询结果:" + nextQuote(), CharsetUtil.UTF_8),
msg.sender()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
}
udp客户端
package com.example.nettydemo.netty.udp;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
/**
* @Author: dinghao
* @Date: 2021/10/27 21:09
*/
public class ChineseProverbClient {
public void run(int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChineseProverbClientHandler());
Channel channel = b.bind(0).sync().channel();
// 向网段内所有的的机器广播UDP消息
channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语字典查询?", CharsetUtil.UTF_8),
new InetSocketAddress("255.255.255.255", port))).sync();
if(!channel.closeFuture().await(15000)){
System.out.println("查询超时!");
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8685;
if(args.length>0){
try{
port = Integer.parseInt(args[0]);
}catch (NumberFormatException e){
e.printStackTrace();
}
}
new ChineseProverbClient().run(port);
}
private static class ChineseProverbClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String resp = msg.content().toString(CharsetUtil.UTF_8);
if(resp.startsWith("谚语查询结果:")){
System.out.println(resp);
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5311.html