对于socket编程,java语言首选当然是netty,不论是api的易用性,框架的灵活性以及社区的活跃度,这些都是java原生api所不能比拟的,但是要知道,netty底层也是使用java原生api实现的,只不过把一些复杂不容易使用的api做了封装,在框架中添加一些设计模式让我们使用者更加方便进行业务代码开发。如果要深入了解netty,对于java原生网络编程是必须要学习的,通过原生方式我们可以更好的理解TCP建立连接过程,数据发送和接收以及网络开发过程中的很多细节内容。总之,通过学习原生socket开发能让我们更加深入了解技术细节。
下面通过代码方式分别讲解阻塞式、非阻塞式以及IO多路复用这几种网络编程方式的实现。
在所有的示例代码之前,先约定好一个编解码方式,由于示例程序是通过客户端向服务器端发送字符串,所以这里就会涉及到字符串的编解码方式,为什么要使用编解码器,这是由于网络中只能传输字节码,我们常用的字符串要编码成字节数组才能发送,而且socket中的发送和接收数据是以流的方式,没有边界的概念,但是数据真正在网络传输过程中又会被拆分成一个个小的数据包,这里就会出现常说的“半包和粘包”,之所以出现这种情况,就是因为发送和接收的是数据流,数据流没有边界概念,而程序处理的却是一个有界的数据。所以自定义的编解码器就需要指定数据的边界,格式为:数据长度[4 bytes] + 数据内容[n bytes]。用4个字节表示数据长度,后面跟着数据内容。
自定义的编解码器如下:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
/**
* 自定义一个字符串编解码器:
* 该编解码器由两部分组成:字符串长度 + 字符串内容
*
* @Author xingo
* @Date 2023/10/24
*/
public class StrCodec {
/**
* 字符串编码
* @param str
* @return
*/
public static ByteBuffer encode(String str) {
byte[] bytes = str.trim().getBytes(StandardCharsets.UTF_8);
int len = bytes.length;
// 组装发送的数据:数据长度 + 数据内容
ByteBuffer buffer = ByteBuffer.allocate(len + 4);
buffer.putInt(len);
buffer.put(bytes);
buffer.flip();
return buffer;
}
/**
* 字符串解码
* @param channel
* @return
*/
public static String decode(SelectionKey key, SocketChannel channel) {
try {
// 构造4字节buffer用于读取数据长度
ByteBuffer lenBytes = ByteBuffer.allocate(4);
int read = channel.read(lenBytes);
if(read == -1) { // 正常断开
close(key, channel);
return null;
} else if(read > 0) { // 正常读取
// 先读取数据长度
lenBytes.flip();
int len = lenBytes.getInt();
System.out.println("数据的长度 : " + len);
if(len > 0) {
// 根据数据长度设置buffer用于读取数据内容
ByteBuffer datas = ByteBuffer.allocate(len);
byte[] dst = new byte[len];
channel.read(datas);
datas.flip();
datas.get(dst);
// 返回字符串
return new String(dst, StandardCharsets.UTF_8);
} else if(len == 0) {
return "";
}
}
} catch (IOException e) {
e.printStackTrace();
close(key, channel);
}
return null;
}
/**
* 根据channel读取到的数据关闭连接
* @param key
* @param channel
*/
private static void close(SelectionKey key, SocketChannel channel) {
if(key != null) {
key.cancel();
}
try {
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
上面说的阻塞式IO、非阻塞式IO、IO多路复用都是针对服务端的不同,这里统一写一个客户端用于向服务端发送数据:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
/**
* @Author xingo
* @Date 2023/10/17
*/
public class JavaClient implements Runnable {
/**
* 服务器地址
*/
private String SERVER_HOST = "127.0.0.1";
/**
* 服务器端口号
*/
private int SERVER_PORT = 8804;
/**
* 连接是否成功
*/
private boolean success;
/**
* 退出线程
*/
private boolean exit;
private SocketChannel channel;
private Selector selector;
public JavaClient() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public boolean isSuccess() {
return success;
}
/**
* 写数据
* @param connect
* @return
*/
public int write(String connect) {
if(connect == null || connect.equals("")) {
return 0;
}
ByteBuffer buffer = StrCodec.encode(connect.trim());
try {
return this.channel.write(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 关闭连接
*/
public void close() {
try {
this.channel.close();
this.exit = true;
this.selector.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
// 开启一个socket连接
this.channel = SocketChannel.open();
// 设置为非阻塞
this.channel.configureBlocking(false);
// 与服务端建立连接
this.channel.connect(new InetSocketAddress(SERVER_HOST, SERVER_PORT));
// 把channel注册到selector上:监听connect事件
SelectionKey sscKey = this.channel.register(this.selector, SelectionKey.OP_CONNECT, null);
// 指定关注的事件,通过方法interestOps()可以指定多个事件
sscKey.interestOps(SelectionKey.OP_CONNECT);
// 用于接收服务端发送的数据
while (true) {
// 等待事件
this.selector.select();
// 判断标识为true时退出循环
if(this.exit) {
System.out.println("client exit!");
break;
}
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if(selectionKey.isConnectable()) { // 发生连接事件
SocketChannel sc = (SocketChannel) selectionKey.channel();
if(sc.finishConnect()) {
this.success = true;
System.out.println("connect server success!");
} else {
System.out.println("connect server fail!");
}
selectionKey.interestOps(SelectionKey.OP_READ);
} else if(selectionKey.isReadable()) { // 发生可读事件
SocketChannel channel = (SocketChannel) selectionKey.channel();
String str = StrCodec.decode(selectionKey, channel);
System.out.println("接收到数据 : " + str);
if(str == null) {
break;
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
// 开启客户端
JavaClient client = new JavaClient();
Thread thread = new Thread(client);
thread.start();
// 等待客户端建立连接完成
while (!client.isSuccess()) {
TimeUnit.MILLISECONDS.sleep(10);
}
// 提示输入内容,并读取输入数据
System.out.println("input : ");
Scanner scanner = new Scanner(System.in);
String line = scanner.nextLine();
while (true) {
// 约定好特殊字符串 ##$$ 用于判断是否退出,退出时关闭连接
if("##$$".equals(line)) {
client.close();
break;
}
if(line != null && !line.trim().equals("")) {
client.write(line.trim());
}
System.out.println("input : ");
line = scanner.nextLine();
}
}
}
以上准备内容已经完成,接下来就讲解服务端的各种实现方式和示例代码。
1、阻塞式IO实现socket编程
阻塞式表示连接执行的每一步都是阻塞的,在这一步没有处理完成时,就不能执行其他操作,这种处理方式严重的影响性能,因为在程序在大部分时间都是在等待,不论是等待连接还是等待读数据,下面这段代码就是服务端阻塞式编程的实现:
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* 阻塞式IO服务端
*
* @Author xingo
* @Date 2023/10/17
*/
public class JavaIoServer implements Runnable {
public static final int SERVER_PORT = 8801;
@Override
public void run() {
try {
// 1、 开启一个服务端
ServerSocketChannel server = ServerSocketChannel.open();
// 2、绑定端口
server.bind(new InetSocketAddress("127.0.0.1", SERVER_PORT));
while (true) {
SocketChannel sc = server.accept(); // 阻塞等待客户端连接
if(sc != null) {
System.out.println("client connect : " + sc);
// 由于服务端等待客户端连接的过程是阻塞式的,
// 为了避免阻塞导致数据无法读取,客户端与服务端的每一个连接都需要开启新的线程
new Thread(new IoWorker(sc)).start();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
JavaIoServer server = new JavaIoServer();
new Thread(server).start();
}
}
这里的IoWorker类用于处理IO数据的读取,避免被主线程等待连接而阻塞数据的读取:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* 服务端连接:与客户端连接对应
*
* @Author xingo
* @Date 2023/10/17
*/
public class IoWorker implements Runnable {
private SocketChannel channel;
public IoWorker(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
try {
if(this.channel == null) {
break;
}
// 数据的前4个字节是数据长度,用于后面接收数据
ByteBuffer lenBuffer = ByteBuffer.allocate(4);
// 没有数据可读阻塞等待数据
int read = this.channel.read(lenBuffer);
if(read == -1) { // 正常断开
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
} else if(read > 0) {
lenBuffer.flip();
int len = lenBuffer.getInt();
// 根据数据长度标识创建ByteBuffer用于接收数据
ByteBuffer dataBuffer = ByteBuffer.allocate(len);
this.channel.read(dataBuffer);
dataBuffer.flip();
byte[] bytes = new byte[dataBuffer.limit()];
dataBuffer.get(bytes);
// 输出内容
System.out.println("---------------- 接收到的数据 ----------------");
System.out.println("数据长度 : " + len);
System.out.println("数据内容 : " + new String(bytes));
}
} catch (IOException e) {
e.printStackTrace();
try {
this.channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
break;
}
}
}
}
这种阻塞式IO很少用于大并发连接的场景,因为每一个连接都需要开启一个线程来处理,这大大增加了服务器的开销。
2、非阻塞式IO实现socket编程
对于阻塞式IO,为了能够尽快处理连接上的数据,我们需要为每个连接单独开启线程处理请求,而非阻塞式IO,即使连接上没有事件(读事件、写事件、连接事件)发生,程序也不会被阻塞,而是继续运行,这样我们就不必为每个连接都单独开启线程进行处理,我们可以开启一个线程池,只有需要处理的连接才分配线程进行处理,代码大致如下:
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* 非阻塞式IO服务端
*
* @Author xingo
* @Date 2023/10/18
*/
public class JavaNioServer implements Runnable {
public static final int SERVER_PORT = 8802;
@Override
public void run() {
try {
// 1、 开启一个服务端
ServerSocketChannel server = ServerSocketChannel.open();
// 设置服务连接是非阻塞模式
server.configureBlocking(false);
// 2、绑定端口
server.bind(new InetSocketAddress("127.0.0.1", SERVER_PORT));
// 开启一个集合存储所有的连接,异步遍历集合并取出channel读取数据
NioChannels channels = new NioChannels();
new Thread(channels).start();
while (true) {
SocketChannel channel = server.accept(); // 这里的accept不在是阻塞式的,就算是没有连接程序也会继续向下运行
if(channel != null) {
channel.configureBlocking(false);
System.out.println("connect channel : " + channel);
// 将连接放入集合中循环检查是否有数据
channels.add(channel);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
JavaNioServer server = new JavaNioServer();
new Thread(server).start();
}
}
服务端定义一个集合存储所有的channel,里面异步遍历集合,如果有数据就处理,没有数据放回集合等待下一次遍历。
定义集合类:
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 存放所有连接的集合,监听集合中的连接是否有数据处理
*
* @Author xingo
* @Date 2023/10/18
*/
public class NioChannels implements Runnable {
// 用于处理连接数据读取的线程池
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 512, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(512));
// 存储连接的队列
LinkedBlockingQueue<SocketChannel> channels = new LinkedBlockingQueue<>();
public void add(SocketChannel channel) {
if(channel != null) {
try {
channel.configureBlocking(false);
this.channels.add(channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void run() {
while (true) {
try {
SocketChannel channel = channels.take();
// 放入线程池执行
threadPoolExecutor.execute(new NioWorker(this, channel));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
处理连接中数据类:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* 读取客户端数据任务
*
* @Author xingo
* @Date 2023/10/18
*/
public class NioWorker implements Runnable {
private NioChannels channels;
private SocketChannel channel;
public NioWorker(NioChannels channels, SocketChannel channel) {
this.channels = channels;
this.channel = channel;
}
@Override
public void run() {
try {
ByteBuffer lenBuffer = ByteBuffer.allocate(4);
// 数据的前4个字节是数据长度,用于后面接收数据
int read = channel.read(lenBuffer);
if(read == -1) { // 连接正常关闭
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
} else if(read > 0) { // 正常读取
lenBuffer.flip();
int length = lenBuffer.getInt();
// 根据数据长度标识创建ByteBuffer用于接收数据
ByteBuffer dataBuffer = ByteBuffer.allocate(length);
this.channel.read(dataBuffer);
dataBuffer.flip();
byte[] bytes = new byte[dataBuffer.limit()];
dataBuffer.get(bytes);
// 输出内容
System.out.println("---------------- 接收到的数据 ----------------");
System.out.println("数据长度 : " + length);
System.out.println("数据内容 : " + new String(bytes));
channels.add(channel);
} else {
channels.add(channel);
}
} catch (IOException e) {
e.printStackTrace();
// 读取异常关闭连接
try {
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
非阻塞式IO相对阻塞式IO可以通过线程池方式来处理连接请求,但是会发现里面会有大量空轮询,就算是通道没有数据也要分配线程去处理。
3、通过多路复用selector实现socket编程
针对上面的非阻塞式IO的弊端,可以通过多路复用技术来解决,它不需要服务端通过轮询方式来得到有数据的连接,而是将通道关心的事件注册到系统,当关心的事件发生后,会获取到通知执行相关代码处理业务逻辑。
下面的服务端划分为关心两类事件:一类关心连接事件,当有客户端的连接请求时会通知系统执行;另一类关心读取事件,当客户端向服务端发送数据时得到通知:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* selector方式服务端
*
* @Author xingo
* @Date 2023/10/16
*/
public class JavaServer implements Runnable {
public static final int SERVER_PORT = 8803;
@Override
public void run() {
try {
// 开启服务端通道
ServerSocketChannel server = ServerSocketChannel.open();
// selector 注册器
Selector selector = Selector.open();
// 数据读取的selector
WorkerThread worker = new WorkerThread("worker");
// server绑定端口
server.socket().bind(new InetSocketAddress(SERVER_PORT));
// 设置server连接是非阻塞的
server.configureBlocking(false);
// 把channel注册到selector上:监听accept事件
SelectionKey sscKey = server.register(selector, SelectionKey.OP_ACCEPT, null);
// 指定关注的事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
// 没有事件发生,线程就会阻塞,有事件发生了才会继续运行
// 事件发生后,要么处理(channel.accept())要么取消(selectionKey.cancel()),否则循环不会阻塞
selector.select();
// 获取关注的事件集合(连接事件),遍历事件集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 发生连接事件
if(selectionKey.isAcceptable()) {
// 获取到channel
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
// 接受连接
SocketChannel clientChannel = serverChannel.accept();
worker.register(clientChannel);
System.out.println("Client connected: " + clientChannel);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
Thread.currentThread().setName("bossThread");
JavaServer server = new JavaServer();
new Thread(server).start();
}
}
上面的服务端selector只关心连接事件,再定义一个selector用于处理读取数据事件:
import org.example.StrCodec;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* 定义一个线程,处理数据读取事件
*
* @Author xingo
* @Date 2023/10/16
*/
public class WorkerThread implements Runnable {
private String name;
private Selector selector;
public WorkerThread(String name) {
this.name = name;
try {
this.selector = Selector.open();
new Thread(this, this.name).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void register(SocketChannel channel) {
if(channel != null) {
try {
// 设置channel为非阻塞
channel.configureBlocking(false);
// 注册到selector中
SelectionKey scKey = channel.register(this.selector, SelectionKey.OP_READ, null);
// 设置关注的事件
scKey.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void run() {
while (true) {
// 指定超时时间的等待:这里需要指定超时时间等待,避免在 accept 事件中的 register() 方法阻塞
// 如果先发生注册事件,再发生阻塞事件,客户端此时发送消息过来,run 方法中是不会再发生阻塞,会把这个事件处理完毕。
// 如果先发生阻塞事件,再发生注册事件,客户端此时发送消息过来,你的 read 事件还没注册到 Selector 上面,selector.select()会认为你现在没有事件要处理,会一直阻塞。
// 所以避免由于 select() 方法阻塞导致 channel 的 register() 注册不进来,这里要选择有超时时间的等待,让 channel 的注册方法在下次阻塞前将关注的事件注册进来
try {
if(this.selector.select(10) == 0) {
continue;
}
// this.selector.select();
} catch (IOException e) {
e.printStackTrace();
}
// 获取关注的事件集合(连接事件),遍历事件集合
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 发生读事件
if(selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
String str = StrCodec.decode(selectionKey, channel);
System.out.println("接收到数据 : " + str);
}
// 发生写事件
/*if(selectionKey.isWritable()) {
System.out.println("socket channel 写数据");
}*/
}
}
}
}
在上面代码中,有一段注释写到事件注册和事件读取的先后关系有可能导致读取不到数据的情况,这里的解决方式是通过读取等待超时的方式让事件有机会注册进来,如果不使用该方式,也可以通过线程唤醒方式实现让事件注册进来,对服务端代码的处理读数据事件做如下修改:
import org.example.StrCodec;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* selector读取数据:通过线程唤醒方式注入关心事件
*
* @Author xingo
* @Date 2023/10/23
*/
public class WorkerThread implements Runnable {
private String name;
private Selector selector;
/**
* 使用队列存储已经建立连接但还没有注册到selector中的channel
*/
private ConcurrentLinkedQueue<SocketChannel> queue = new ConcurrentLinkedQueue<>();
public WorkerThread(String name) {
this.name = name;
try {
this.selector = Selector.open();
new Thread(this, this.name).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void register(SocketChannel channel) {
if(channel != null) {
// 将channel放入队列
queue.add(channel);
// 唤醒selector
this.selector.wakeup();
}
}
private void register0(SocketChannel channel) {
try {
// 设置channel为非阻塞
channel.configureBlocking(false);
// 注册到selector中
SelectionKey scKey = channel.register(this.selector, SelectionKey.OP_READ, null);
// 设置关注的事件
scKey.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {
try {
while (true) {
// 这里采用阻塞式等待,当有channel到来时,该阻塞会被唤醒
this.selector.select();
// 拿出队列头元素,如果有数据返回,就交给selector读事件监听
SocketChannel sc = queue.poll();
while (sc != null) {
register0(sc);
sc = queue.poll();
}
// 获取关注的事件集合(连接事件),遍历事件集合
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 发生读事件
if(selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
String str = StrCodec.decode(selectionKey, channel);
System.out.println("接收到数据 : " + str);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4、多路复用多线程
在项目中,通常数据读取需要更多的线程去监控,避免因为连接过多导致数据读取耗时久影响其他连接的数据,所以这里我们仿造netty的处理方式,将处理selector划分为boss和worker,boss专门负责连接的建立,通常一个线程就可以了,而worker设置为多线程,线程数量一般与CPU核数一致,下面是对服务端代码的改造:
server处理连接:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* boss服务:负责监听建立的连接,连接建立后将读数据交给worker线程执行,这里简单的按照轮询方式分配
*
* @Author xingo
* @Date 2023/10/23
*/
public class JavaBossServer implements Runnable {
public static final int SERVER_PORT = 8804;
@Override
public void run() {
try {
// 开启服务端通道
ServerSocketChannel server = ServerSocketChannel.open();
// selector 注册器
Selector selector = Selector.open();
// 数据读取的selector,一般设置为CPU的核数
int nums = Runtime.getRuntime().availableProcessors();
AtomicInteger index = new AtomicInteger(0);
WorkerThread[] workers = new WorkerThread[nums];
for(int i = 0; i < nums; i++) {
workers[i] = new WorkerThread("worker-" + (i + 1));
}
// server绑定端口
server.socket().bind(new InetSocketAddress(SERVER_PORT));
// 设置server连接是非阻塞的
server.configureBlocking(false);
// 把channel注册到selector上:监听accept事件
SelectionKey sscKey = server.register(selector, SelectionKey.OP_ACCEPT, null);
// 指定关注的事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
// 没有事件发生,线程就会阻塞,有事件发生了才会继续运行
// 事件发生后,要么处理(channel.accept())要么取消(selectionKey.cancel()),否则循环不会阻塞
selector.select();
// 获取关注的事件集合(连接事件),遍历事件集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 发生连接事件
if(selectionKey.isAcceptable()) {
// 获取到channel
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
// 接受连接
SocketChannel clientChannel = serverChannel.accept();
workers[index.incrementAndGet() % nums].register(clientChannel);
System.out.println("Client connected: " + clientChannel);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
Thread.currentThread().setName("bossThread");
JavaBossServer server = new JavaBossServer();
new Thread(server).start();
}
}
工作线程处理读取数据事件,真正业务开发中,还应该有一个业务线程池用于耗时的业务处理,这样就可以让worker线程只关心数据的读取,而不用处理耗时的业务逻辑:
import org.example.StrCodec;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* 负责数据的读取
*
* @Author xingo
* @Date 2023/10/23
*/
public class WorkerThread implements Runnable {
private String name;
private Selector selector;
public WorkerThread(String name) {
this.name = name;
try {
this.selector = Selector.open();
new Thread(this, this.name).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void register(SocketChannel channel) {
if(channel != null) {
try {
// 设置channel为非阻塞
channel.configureBlocking(false);
// 注册到selector中
SelectionKey scKey = channel.register(this.selector, SelectionKey.OP_READ, null);
// 设置关注的事件
scKey.interestOps(SelectionKey.OP_READ);
/*scKey.interestOps(SelectionKey.OP_WRITE);*/
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void run() {
try {
while (true) {
// 指定超时时间的等待:这里需要指定超时时间等待,避免在 accept 事件中的 register() 方法阻塞
// 如果先发生注册事件,再发生阻塞事件,客户端此时发送消息过来,run 方法中是不会再发生阻塞,会把这个事件处理完毕。
// 如果先发生阻塞事件,再发生注册事件,客户端此时发送消息过来,你的 read 事件还没注册到 Selector 上面,selector.select()会认为你现在没有事件要处理,会一直阻塞。
// 所以避免由于 select() 方法阻塞导致 channel 的 register() 注册不进来,这里要选择有超时时间的等待,让 channel 的注册方法在下次阻塞前将关注的事件注册进来
if(this.selector.select(10) == 0) {
continue;
}
// this.selector.select();
// 获取关注的事件集合(连接事件),遍历事件集合
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 发生读事件
if(selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
String str = StrCodec.decode(selectionKey, channel);
System.out.println("线程 " + Thread.currentThread().getName() + " 接收到数据 : " + str);
if(str != null) {
channel.write(StrCodec.encode("ok"));
}
}
// 发生写事件
/*if(selectionKey.isWritable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
System.out.println("socket channel 写数据");
}*/
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上就是java通过socket编程全部几种方式的代码实现,可以发现对比netty,还是比较复杂的,而且这只是简单的示例代码,更多的细节还没有考虑到,如果真在在项目中使用原生api进行socket开发会更加复杂,我们不单单需要知道相关的概念,还需要了解操作系统和网络相关的协议内容,所以在项目中还是建议使用netty做开发,会省去很多的问题和bug,让我们更专注业务逻辑的开发。
socket底层是走tcp协议的(也可以使用udp协议编程),有兴趣的同学可以使用tcpdump或Wireshark抓包工具更加深入的理解底层原理,通过对数据包的分析会更加明白建立连接和释放连接的整个过程都经历了什么。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181845.html