Netty入门之可写事件以及多线程版的通信

往期文章简单讲解了Netty入门基础篇的相关基本概念:

本次主要讲解如何处理ByteBuffer的可写事件.

先上代码:

  • Server
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();
// 注册并绑定accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));

while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
// 这里记得要移除key
iter.remove();
if(key.isAcceptable()) {
// 因为serverSocketChannel的key只有一个。所以这里简写了。直接
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);

StringBuilder sb = new StringBuilder();
for(int i=0;i<=3000000; i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
while (buffer.hasRemaining()){
// 这里不能保证一次全部写入进去 返回实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
}
}
}

}
}
  • Client
    public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
int count = 0;

while(true){
// 接收数据
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();

}
}

上述代码的运行结果如图所示:

  • serverNetty入门之可写事件以及多线程版的通信
  • client
Netty入门之可写事件以及多线程版的通信

通过上述结果我们不难发现这个server端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候Buffer是满的( server端打印0的时候,它是无法写的)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当buffer满的时候,我去进行别的操作,当buffer清空了触发一个写事件 上代码:

  • server(就是对上述代码进行了优化)
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();
// 注册并绑定accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));

while(true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
// 这里记得要移除key
iter.remove();
if(key.isAcceptable()) {
// 因为serverSocketChannel的key只有一个。所以这里简写了。直接
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector,0,null);
sckey.interestOps(SelectionKey.OP_READ);
StringBuilder sb = new StringBuilder();
for(int i=0;i<=3000000; i++){
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());



// 判断是否有剩余内容
if ( buffer.hasRemaining()){
// 关注可写事件 这里需要注意以下,避免替换掉之前关注的 可读事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE );
// sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE );
// 把未写完的数据挂到sckey上 通过附件的方式
sckey.attach(buffer);

}
}else if (key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
// 清理操作
if (!buffer.hasRemaining()){
// 清除buffer
key.attach(null);
// 不需要关注可写事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}

}
}
}

主要就是利用附件的特性和关注可写事件

关于可读事件就讲这些,接下来给大家说一下如何利用多线程来进行优化通信,充分利用多核CPU

如图所示:Netty入门之可写事件以及多线程版的通信

说明
  • 黄色框框代表客户端
  • Boss建立连接 accept事件
  • worker 关注读写事件
单个worker 版本
  • server
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//1. 创建固定数量的worker 并初始化
Worker worker = new Worker("worker-0");
while (true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}",sc.getRemoteAddress());
//2. 关联selector
// 初始化selector 启动worker-0
log.debug("before register...{}",sc.getRemoteAddress());
worker.register(sc);
log.debug("after register...{}",sc.getRemoteAddress());

}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public Worker(String name){
this.name = name;
}
// 初始化线程 和selector
public void register(SocketChannel sc) throws IOException {
if (!start){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
start =true;
}
// 此时这里还是boss线程执行的 因为run方法才是worker-0线程 可以利用消息队列 ConcurrentLinkedDeque
// sc.register(selector,SelectionKey.OP_READ,null);
// 像队列添加任务 但是这个任务并没有立即执行 我们在worker-0线程中取出来执行
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();// 唤醒selector
}

@Override
public void run() {
while (true){
try {
selector.select(); // worker-0 阻塞 wakeup
// 取出来的可能为空
Runnable task = queue.poll();
if (task !=null) {
task.run();// 执行了 sc.register(selector,SelectionKey.OP_READ,null);
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ....{}",channel.getLocalAddress());
// 这里有很多细节 具体的我不在这里进行赘述,往期文章有写 比如这里的buffer可能会出现黏包半包
// 客户端异常断开 检测异常 还有写的数据量过多 等等问题 往期文章都有写这里简单写一下关于多线程的逻辑
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

// worker.register();log.debug("before register...{}",sc.getLocalAddress()); 启动worker-0
// sc.register(worker.selector,SelectionKey.OP_READ,null);

//*这两个代码是运行在两个线程中 当有客户端连接的时候它会阻塞住 这里如何解决? 方法有很多, 接下来模仿Netty的
// 让register 也运行在worker-0线程中*/
}
  • Client
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
System.in.read();
// System.out.println("waiting.......");
}

}
  • 注意:

这里采用了队列的方式

多个worker版本

主要修改的地方

Netty入门之可写事件以及多线程版的通信
  • 完整代码
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("Boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//1. 创建固定数量的worker 并初始化
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
// 计时器
AtomicInteger index = new AtomicInteger();
while (true){
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}",sc.getRemoteAddress());
//2. 关联selector
// 初始化selector 启动worker-0
log.debug("before register...{}",sc.getRemoteAddress());
// round robin 负载均衡算法
workers[index.getAndIncrement() % workers.length].register(sc);
log.debug("after register...{}",sc.getRemoteAddress());

}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public Worker(String name){
this.name = name;
}
// 初始化线程 和selector
public void register(SocketChannel sc) throws IOException {
if (!start){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
start =true;
}
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();// 唤醒selector
}

@Override
public void run() {
while (true){
try {
selector.select(); // worker-0 阻塞 wakeup
// 取出来的可能为空
Runnable task = queue.poll();
if (task !=null) {
task.run();// 执行了 sc.register(selector,SelectionKey.OP_READ,null);
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ....{}",channel.getLocalAddress());
// 这里有很多细节 具体的我不在这里进行赘述,往期文章有写 比如这里的buffer可能会出现黏包半包
// 客户端异常断开 检测异常 还有写的数据量过多 等等问题 往期文章都有写这里简单写一下关于多线程的逻辑
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

多个worker版本的个数获取Runtime.getRuntime().availableProcessors() 这里需要注意

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

关于多线程版的通信讲到这里就告一段落了。

微信搜索【码上遇见你】获取更多精彩内容


原文始发于微信公众号(码上遇见你):Netty入门之可写事件以及多线程版的通信

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/78766.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!