
前言
Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API 实现。它提供了对TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,本文基于Netty实现消息推送。
另一篇日志实时查看见《SpringBoot 整合websocket 实现日志实时查看》
正文
1.引入依赖
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.4.RELEASE</version>
</dependency>
<dependency>
<groupId>com.github.sazzad16</groupId>
<artifactId>jedis</artifactId>
<version>2.9.2</version>
</dependency>
<!--websocket日志预览-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<!-- 模板引擎 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
2.SpringUtils
从已有的Spring上下文取得已实例化的bean
@Component
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
if(SpringUtils.applicationContext == null){
SpringUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext(){
return applicationContext;
}
//根据name
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
//根据类型
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name,clazz);
}
}
3.编写NettyServer
/**
* NettyServer Netty服务器配置
*/
public class NettyServer {
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group, bossGroup) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(12346)// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//websocket协议本身是基于http协议的,所以这边也要使用http解编码器
ch.pipeline().addLast(new HttpServerCodec());
//以块的方式来写的处理器
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
// 自定义的handler,处理业务逻辑
ch.pipeline().addLast(new MyWebSocketHandler());
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
}
});
// 服务器异步创建绑定
ChannelFuture cf = sb.bind().sync();
System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
// 对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
// 释放线程池资源
group.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
}
4.编写通道组池,管理所有websocket连接
public class MyChannelHandlerPool {
public MyChannelHandlerPool(){}
public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
5.自定义Handler
-
channelActive与客户端建立连接 -
channelInactive与客户端断开连接 -
channelRead客户端发送消息处理
/**
* MyWebSocketHandler
* WebSocket处理器,处理websocket连接相关
*/
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("与客户端建立连接,通道开启!");
RedisService redisService = (RedisService) SpringUtils.getBean("redisService");
redisService.getRedisTemplate().opsForValue().increment("socket:oline",1);
//添加到channelGroup通道组
MyChannelHandlerPool.channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除channelGroup 通道组
MyChannelHandlerPool.channelGroup.remove(ctx.channel());
RedisService redisService = (RedisService) SpringUtils.getBean("redisService");
redisService.getRedisTemplate().opsForValue().increment("socket:oline",-1);
AttributeKey<String> key = AttributeKey.valueOf("name");
String name = ctx.channel().attr(key).get();
System.out.println(name+"与客户端断开连接,通道关闭!");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次连接是FullHttpRequest,处理参数
if (null != msg && msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
String uri = request.uri();
Map paramMap = getUrlParams(uri);
//将用户名称作为自定义属性加入到channel中,方便随时channel中获取用户
AttributeKey<String> key = AttributeKey.valueOf("name");
ctx.channel().attr(key).setIfAbsent(paramMap.get("name")+"");
//如果url包含参数,需要特殊处理
if(uri.contains("?")){
String newUri=uri.substring(0,uri.indexOf("?"));
request.setUri(newUri);
}
}else if(msg instanceof TextWebSocketFrame){
RedisService redisService = (RedisService) SpringUtils.getBean("redisService");
//正常的TEXT消息类型
TextWebSocketFrame frame=(TextWebSocketFrame)msg;
sendAllMessage(frame.text()+",当前在线人数:"+redisService.getRedisTemplate().opsForValue().get("socket:oline"));
}
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
}
private void sendAllMessage(String message){
//收到信息后,群发给所有channel
MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
}
private static Map getUrlParams(String url) throws UnsupportedEncodingException {
Map<String,String> map = new HashMap<>();
url = url.replace("?",";");
if (!url.contains(";")){
return map;
}
if (url.split(";").length > 0){
String[] arr = url.split(";")[1].split("&");
for (String s : arr){
String key = s.split("=")[0];
String value = s.split("=")[1];
//解决路径携带中文乱码
map.put(key,URLDecoder.decode(value, "UTF-8"));
}
return map;
}else{
return map;
}
}
}
6.RedisTemplate工具类
小编这里自己实现的多机房多数据源,你也可以通过自动配置方式装配Redis
@Configuration
@Component
public class RedisMultiConfiguration extends CachingConfigurerSupport {
protected List<RedisTemplate> redisTemplist = new ArrayList<RedisTemplate>();
@Autowired
private Environment env;
public List<RedisTemplate> getRedisTempMap() {
if (redisTemplist.size() != 0) {
return redisTemplist;
} else {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(Integer.parseInt(env.getProperty("spring.redis.pool.max-idle")));
jedisPoolConfig.setMaxTotal(Integer.parseInt(env.getProperty("spring.redis.pool.max-total")));
jedisPoolConfig.setMaxWaitMillis(Integer.parseInt(env.getProperty("spring.redis.pool.max-wait")));
jedisPoolConfig.setMinIdle(Integer.parseInt(env.getProperty("spring.redis.pool.min-idle")));
String[] host = env.getProperty("spring.redis.nodes").split(",");
for (String redisHost : host) {
String[] item = redisHost.split(":");
String ip = "";
String port = "";
String password = "";
String database = "0";
if (item.length == 4) {
ip = item[0];
port = item[1];
password = item[2];
database = item[3];
} else {
ip = item[0];
port = item[1];
password = item[2];
}
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setHostName(ip);
jedisConnectionFactory.setDatabase(Integer.parseInt(database));
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.setPort(Integer.parseInt(port));
jedisConnectionFactory.setUsePool(true);
jedisConnectionFactory.setPoolConfig(jedisPoolConfig);
jedisConnectionFactory.setTimeout(Integer.parseInt(env.getProperty("spring.redis.timeout")));
jedisConnectionFactory.setUseSsl(Boolean.parseBoolean(env.getProperty("spring.redis.ssl")));
jedisConnectionFactory.afterPropertiesSet();
RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setDefaultSerializer(jackson2JsonRedisSerializer);
redisTemplate.setEnableDefaultSerializer(true);
redisTemplate.afterPropertiesSet();
redisTemplist.add(redisTemplate);
}
return redisTemplist;
}
}
}
@Component
public class RedisDataSourceConfig {
@Autowired
protected RedisMultiConfiguration config;
@Bean
public List<RedisTemplate> getTemplateMap() {
return config.getRedisTempMap();
}
}
@Service
public class RedisService {
@Value("${spring.redis.keyExpire}")
private long expireTime;
@Autowired
private RedisDataSourceConfig redisDataSourceConfig;
public boolean set(final String key, final String value) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
connection.setEx(serializer.serialize(key),expireTime, serializer.serialize(value));
return true;
}
});
return result;
}
public boolean setNoExpire(final String key, final String value) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
connection.set(serializer.serialize(key), serializer.serialize(value));
return true;
}
});
return result;
}
public boolean set(final String key, final JSONObject value) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
connection.setEx(serializer.serialize(key),expireTime, serializer.serialize(value.toString()));
return true;
}
});
return result;
}
public boolean setNoExpire(final String key, final JSONObject value) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
connection.set(serializer.serialize(key),serializer.serialize(value.toString()));
return true;
}
});
return result;
}
public String get(final String key) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
String result = redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] value = connection.get(serializer.serialize(key));
return serializer.deserialize(value);
}
});
return result;
}
public JSONObject getJSON(final String key) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
String result = redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] value = connection.get(serializer.serialize(key));
return serializer.deserialize(value);
}
});
return JSONObject.parseObject(result);
}
public boolean setNoExpireTime(final String key, final String value) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
connection.set(serializer.serialize(key),serializer.serialize(value));
return true;
}
});
return result;
}
public boolean setNoExpireTime(final String key, final JSONObject value) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
connection.set(serializer.serialize(key),serializer.serialize(value.toString()));
return true;
}
});
return result;
}
public boolean expire(final String key, long expire) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
return redisTemplate.expire(key, expire, TimeUnit.SECONDS);
}
public boolean hasKey(final String key) {
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
return redisTemplate.hasKey(key);
}
public void delkey(String key){
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
redisTemplate.delete(key);
}
public RedisTemplate getRedisTemplate(){
List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
return redisTemplates != null ? redisTemplates.get(0) : null;
}
}
7.页面
如果不想引入页面,可以通过在线http://www.websocket-test.com/
测试
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>Netty-Websocket</title>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:12346/ws?name=一安未来");
socket.onmessage = function (event) {
var ta = document.getElementById('responseText');
ta.value += event.data + "rn";
};
socket.onopen = function (event) {
var ta = document.getElementById('responseText');
ta.value = "Netty-WebSocket服务器。。。。。。连接 rn";
};
socket.onclose = function (event) {
var ta = document.getElementById('responseText');
ta.value = "Netty-WebSocket服务器。。。。。。关闭 rn";
};
} else {
alert("您的浏览器不支持WebSocket协议!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("WebSocket 连接没有建立成功!");
}
}
</script>
</head>
<body>
<form onSubmit="return false;">
<label>编号</label><input type="text" name="uid" th:value="${uid}"/> <br/>
<label>内容</label><input type="text" name="message" value="这里输入消息"/> <br/>
<br/> <input type="button" value="发送ws消息"
onClick="send(this.form.uid.value+':'+this.form.message.value)"/>
<hr color="black"/>
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>
8.控制层
@Controller
public class IndexController {
@GetMapping(value = {"/","index.html"})
private String indexPage(Model model) {
model.addAttribute("uid", RandomUtil.randomNumbers(6));
return "index";
}
}
9.启动类
@SpringBootApplication
@EnableAutoConfiguration(exclude = {RedisAutoConfiguration.class})
public class PaasTestApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(PaasTestApplication.class, args);
new NettyServer().start();
}
}
演示

号外!号外!
如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!
Spring Boot 实现跨域的 5 种方式,总有一种适合你
别用Date了,Java8新特性之日期处理,现在学会也不迟!
原文始发于微信公众号(一安未来):Netty实战,Springboot + netty +websocket 实现推送消息
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44625.html