概述
BIO模型
Blocking IO(Input-Output)
模型是,一个线程对应一个客户端。
server中accept是阻塞的,client/server中socket读写也是阻塞的,性能很低。
代码简单示例如下:
- client
package com.study.nettystudy.bio;
import java.io.IOException;
import java.net.Socket;
public class Client {
public static void main(String[] args) {
Socket socket = null;
try {
socket = new Socket("127.0.0.1", 8888);
socket.getOutputStream().write("hello server".getBytes());
final byte[] bytes = new byte[1024];
final int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes, 0, len));
} catch (IOException e) {
e.printStackTrace();
}finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- server
package com.study.nettystudy.bio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Objects;
public class Server {
public static void main(String[] args) throws IOException {
final ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress("127.0.0.1", 8888));
while (true) {
final Socket socket = ss.accept();
new Thread(() -> {
handle(socket);
}).start();
}
}
private static void handle(Socket socket) {
try {
byte[] bytes = new byte[1024];
int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes, 0, len));
socket.getOutputStream().write(bytes, 0, len);
socket.getOutputStream().flush();
} catch (IOException e) {
e.printStackTrace();
}finally {
if (Objects.nonNull(socket)) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
NIO模型
NIO单线程模型
概念:selector(监听器)通过主动轮询的方式,监听不同socket上感兴趣的事件(socket connect、read、write)等,定时轮询到有事件发生,然后处理发生的事件,只有一个selector线程在维护所以客户端的连接、读写等事件;相当于一个selector监听多个socket上的连接、读写等事件,selector轮询感知到有些socket上发生感兴趣的事件时到进行处理。
代码示例
server端代码
package com.study.nettystudy.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8888));
ssc.configureBlocking(false);
System.out.println("server started");
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞方法,当注册在selector上的key有事件发生时(事件由操作系统通知)返回
selector.select();
final Set<SelectionKey> keys = selector.selectedKeys();
final Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
final SelectionKey key = it.next();
// 删除,否则下个循环重复处理
it.remove();
handle(key);
}
}
}
private static void handle(SelectionKey key) {
if (key.isAcceptable()) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
} else if(key.isReadable()) {
final SocketChannel sc = (SocketChannel) key.channel();
final ByteBuffer buffer = ByteBuffer.allocate(512);
buffer.clear();
try {
final int len = sc.read(buffer);
if (len != -1) {
System.out.println(new String(buffer.array(), 0, len));
}
final ByteBuffer bufferWrite = ByteBuffer.wrap("Hello Client".getBytes());
sc.write(bufferWrite);
} catch (IOException e) {
e.printStackTrace();
}finally {
if (sc != null) {
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
Client端代码与Bio的相同,无需改变。
NIO-Reactor模式
NIO响应式模式,一个selector(只负责监听accecpt链接事件),读写事件交给worker线程池处理,减少selector处理时间,提供系统的吞吐量。
代码示例
package com.study.nettystudy.nio;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PoolServer {
private final ExecutorService pool = Executors.newFixedThreadPool(50);
private Selector selector;
public static void main(String[] args) throws IOException {
final PoolServer poolServer = new PoolServer();
poolServer.initServer(8888);
poolServer.listen();
}
public void initServer(int port) throws IOException {
final ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress("127.0.0.1", port));
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("pool server started");
}
public void listen() throws IOException {
while (true) {
selector.select();
final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
final SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
pool.execute(new ThreadHandler(key));
}
}
}
}
}
class ThreadHandler implements Runnable {
private final SelectionKey key;
ThreadHandler(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
final SocketChannel channel = (SocketChannel) key.channel();
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.clear();
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
int size = 0;
while ((size = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
byteArrayOutputStream.write(byteBuffer.array(), 0, size);
byteBuffer.clear();
}
byteArrayOutputStream.close();
final byte[] readBytes = byteArrayOutputStream.toByteArray();
System.out.println(new String(readBytes));
final ByteBuffer writeBuffer = ByteBuffer.allocate(readBytes.length);
writeBuffer.put(readBytes);
writeBuffer.flip();
channel.write(writeBuffer);
if (size == -1) {
channel.close();
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.selector().wakeup();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
AIO模型
无需selector不停的轮询,由操作系统通知我们,当有连接事件时,操作系统调用我们注册的观察者逻辑,读写的事件可以像NIO一样交给线程池处理。
代码示例
package com.study.nettystudy.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
final AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
assc.bind(new InetSocketAddress("127.0.0.1", 8888));
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel client, Object attachment) {
assc.accept(null, this);
final ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println(new String(attachment.array()));
client.write(ByteBuffer.wrap("HelloClient1".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
while (true) {
Thread.sleep(1000);
}
}
}
整个代码使用观察者模式,把accept的处理逻辑注册到操作系统面,io的读逻辑注册到操作系统,当有accept连接时,由操作系统调用注册的逻辑。相当于是我们观察操作系统的socket连接、读等事件,当有事件发送时,操作系统通知我们。
小结
NIO,AIO在Linux操作系统底层都是使用epoll(事件轮询)机制,因此AIO在操作系统层面还是和NIO一样都是轮询机制,只是AIO在java 层面把操作系统的轮询机制封装成了异步通知模式。
因此,Netty是对NIO模式的封装,接口看起来和AIO的类似。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/99958.html