大家好,我是程序员阿晶,本篇文章主要介绍如何在SpringBoot项目中使用Netty实现远程调用
。
记得收藏关注本人小站:https://www.xiaojingge.com
以及个人导航资源子站:http://nav.xiaojingge.com
前言
在进行网络连接的时候,建立套接字(Socket)连接是一个比较消耗性能的事情,特别是在分布式的情况下,用线程池去保持多个客户端连接,是一种非常消耗线程的行为。
那么我们该通过什么技术解决上述的问题呢,不得不提一个网络连接利器:Netty
介绍
Netty
Netty是一个NIO客户端服务器框架:
-
它可快速轻松地开发网络应用程序,例如协议服务器和客户端。 -
它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。
NIO是一种非阻塞IO ,它具有以下的特点
-
单线程可以连接多个客户端。 -
选择器可以实现但线程管理多个 Channel
,新建的通道都要向选择器注册。 -
一个 SelectionKey
键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。 -
selector
进行select()
操作可能会产生阻塞,但是可以设置阻塞时间,并且可以用wakeup()
唤醒selector
,所以NIO是非阻塞IO。
Netty模型selector模式
它相对普通NIO的在性能上有了提升,采用了:
-
NIO采用多线程的方式可以同时使用多个 selector
-
通过绑定多个端口的方式,使得一个 selector
可以同时注册多个ServerSocketServer
-
单个线程下只能有一个 selector
,用来实现Channel
的匹配及复用
下面用一张图,描述一下关系:

Netty模型reactor模式
半包问题
TCP/IP在发送消息时,可能会拆包,这就导致接收端无法知道什么时候收到的是完整的数据。
在传统的BIO(同步阻塞IO模型)在读取不到数据时会发生阻塞,但是NIO(同步非阻塞IO模型)不会。
为了解决NIO的半包问题,Netty在Selector
模型的基础上,提出了reactor
模式,从而解决客户端请求在服务端不完整的问题。

上图,简单地可以描述为“Boss接活,让worker干
”:
manReactor
用来接收请求(会与客户端进行握手验证)
subReactor
用来处理请求(不与客户端直接连接)
编码
依赖
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.47.Final</version>
</dependency>
服务端
NettyServer.java:服务启动监听器
package com.itjing.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @author lijing
* @date 2022年05月28日 17:36
* @description 服务启动监听器
*/
@Slf4j
public class NettyServer {
public void start() {
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
// new 一个主线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// new 一个工作线程组
EventLoopGroup workGroup = new NioEventLoopGroup(200);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer())
.localAddress(socketAddress)
// 设置队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
try {
ChannelFuture future = bootstrap.bind(socketAddress).sync();
log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("服务器开启失败", e);
} finally {
// 关闭主线程组
bossGroup.shutdownGracefully();
// 关闭工作线程组
workGroup.shutdownGracefully();
}
}
}
ServerChannelInitializer.java:netty服务初始化器
package com.itjing.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* @author lijing
* @date 2022年05月28日 17:38
* @description netty服务初始化器
*/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加编解码
socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}
NettyServerHandler.java:netty服务端处理器
package com.itjing.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* @author lijing
* @date 2022年05月28日 17:39
* @description netty服务端处理器
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务器收到消息: {}", msg.toString());
ctx.write("你也好哦");
ctx.flush();
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
主启动类
package com.itjing.netty;
import com.itjing.netty.remotecall.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@Slf4j
public class SpringbootNettyApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootNettyApplication.class, args);
//开启Netty服务
NettyServer nettyServer = new NettyServer();
nettyServer.start();
log.info("======服务已经启动========");
}
}
客户端
NettyClientUtil.java:Netty客户端
package com.itjing.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* @author lijing
* @date 2022年05月28日 18:01
* @description NettyClient工具类
*/
@Slf4j
public class NettyClientUtil {
public static Map<String, Object> helloNetty(String msg) {
NettyClientHandler nettyClientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
// 该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(nettyClientHandler);
}
});
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
log.info("客户端发送成功....");
// 发送消息
future.channel().writeAndFlush(msg);
// 等待连接被关闭
future.channel().closeFuture().sync();
return nettyClientHandler.getResponse();
} catch (Exception e) {
log.error("客户端Netty失败", e);
throw new RuntimeException(e);
} finally {
// 以一种优雅的方式进行线程退出
group.shutdownGracefully();
}
}
}
NettyClientHandler.java:客户端处理器
package com.itjing.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* @author lijing
* @date 2022年05月28日 18:02
* @description 客户端处理器
*/
@Slf4j
@Setter
@Getter
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private Map<String, Object> response = new HashMap<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端Active .....");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端收到消息: {}", msg.toString());
this.response.put("result", msg.toString());
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
验证
package com.itjing.netty.controller;
import com.itjing.netty.client.NettyClientUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* @author lijing
* @date 2022年05月28日 18:07
* @description
*/
@RestController
@Slf4j
public class NettyController {
@GetMapping("/helloNetty")
public Map<String, Object> helloNetty(String msg) {
return NettyClientUtil.helloNetty(msg);
}
}
访问接口即可。
比如我访问:http://localhost:8080/helloNetty?msg=你好啊

原文始发于微信公众号(程序员阿晶):SpringBoot+Netty实现远程调用
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/19611.html