目录
前言:
通过简陋
的页面,实现多个用户之间的消息通信,为一个聊天室
功能实现,只实现了聊天室
的基本功能,没有实现聊天室
的好看的页面;
1、pom文件
pom引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、静态工具类
用于记录聊天室
交互过程中的常量储存,代码如下:
import org.springframework.web.socket.WebSocketSession;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class UserRecordParamManager {
/**
* 在线人数
*/
public static AtomicInteger onlineNumber = new AtomicInteger(0);
/**
* 当前在线人员Id
*/
public static List<String> onlineUser = Collections.synchronizedList(new ArrayList<>());
/**
* 用户和连接session映射
*/
public static Map<String, WebSocketSession> userSession = new ConcurrentHashMap<>();
}
3、实现HandshakeInterceptor
通过实现HandshakeInterceptor
接口的beforeHandshake
与afterHandshake
接口,作用如下:
- beforeHandshake: 在握手之前执行该方法, 继续握手返回true, 中断握手返回false. 通过attributes参数设置WebSocketSession的属性
- afterHandshake: 在握手之后执行该方法. 无论是否握手成功都指明了响应状态码和相应头。
- 代码如下:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* @Description: 创建握手 此类用来获取登录用户信息并交由websocket管理
**/
@Component
public class UserWebSocketInterceptor implements HandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 在握手之前执行该方法, 继续握手返回true, 中断握手返回false. 通过attributes参数设置WebSocketSession的属性
*
* @param request
* @param response
* @param webSocketHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
logger.info("握手前请求连接URL:" + request.getURI());
// 获取userId(token)
String userId = ((ServletServerHttpRequest) request).getServletRequest().getParameter("userId");
// 验证userId(token)是否有效
//如果有效
logger.info("用户:{},建立连接...", userId);
// 加入到属性中
attributes.put("userId", userId);
return true;
}
/**
* 在握手之后执行该方法. 无论是否握手成功都指明了响应状态码和相应头.
*/
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
4、配置消息发送类
自定义了两个方法用于实现,服务端将消息发送到相应的webSocket
客户端。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
/**
* @Description:
**/
@Component
public class UserSendMessageManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public void sendMessageToUser(String userId, String message) {
try {
WebSocketSession session = UserRecordParamManager.userSession.get(userId);
if (session != null && session.isOpen()) {
TextMessage textMessage = new TextMessage(message.getBytes());
session.sendMessage(textMessage);
}
} catch (IOException e) {
logger.error(e.getMessage());
}
}
public void sendMessageAll(String message) {
// 遍历取出所有session进行发送消息
try {
for (WebSocketSession session : UserRecordParamManager.userSession.values()) {
if (session.isOpen()) {
TextMessage textMessage = new TextMessage(message.getBytes());
session.sendMessage(textMessage);
}
}
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
5、配置实体类
该实体类用于解析客户端传递到服务端的json数据,代码如下:
public class RecevieUserMessage {
/**
* 消息类型:server、ping、user、all
*/
private String type;
/**
* 内容
*/
private String msg;
/**接收人员Id(发给指定人时)
*
*/
private String recevieUserId;
}
6、实现WebSocketHandler
通过实现WebSocketHandler
接口的afterConnectionEstablished
、handleMessage
、handleTransportError
、afterConnectionClosed
、supportsPartialMessages
方法,作用如下:
- afterConnectionEstablished: 客户端成功连接后触发。
- handleMessage: 接收到客户端消息时触发。
- handleTransportError: 客户端连接失败触发。
- afterConnectionClosed: 客户端连接失败后触发。
代码如下:
import com.alibaba.fastjson.JSON;
import com.lhz.socket.entity.RecevieUserMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
/**
* @Description: 消息处理器
**/
@Component
public class UserWebSocketMessageHandler implements WebSocketHandler {
@Resource
private UserSendMessageManager userSendMessageManager;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 用户进入系统监听
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 自加
int userNum = UserRecordParamManager.onlineNumber.incrementAndGet();
// 获取用户Id
String userId = session.getAttributes().get("userId").toString();
logger.info("有新连接加入! sessionId:{},", session.getId());
logger.info("userId:{},在线人数{}", userId, userNum);
// 加自己加入内存中
UserRecordParamManager.onlineUser.add(userId);
UserRecordParamManager.userSession.put(userId, session);
}
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
try {
String message = webSocketMessage.getPayload().toString();
String userId = webSocketSession.getAttributes().get("userId").toString();
// 解析内容为对象
RecevieUserMessage userMessage = JSON.parseObject(message, RecevieUserMessage.class);
String msg = userMessage.getMsg();
String type = userMessage.getType();
/**
* 处理客户端接收的不同类型的消息
* 消息类型:server、ping、user、all
*/
switch (type) {
case "server":
userSendMessageManager.sendMessageToUser(userId, "这里是服务端,已收到消息!");
break;
case "ping":
userSendMessageManager.sendMessageToUser(userId, "收到心跳!");
break;
case "user":
// 获取接收人Id
String recevieUserId = userMessage.getRecevieUserId();
// 验证对方是否在线
if (!UserRecordParamManager.onlineUser.contains(recevieUserId)) {
// 发送消息内容为错误码
userSendMessageManager.sendMessageToUser(userId, "201");
}
userSendMessageManager.sendMessageToUser(recevieUserId, msg);
break;
case "all":
userSendMessageManager.sendMessageAll(msg);
break;
}
} catch (Exception e) {
e.printStackTrace();
logger.info("发生了错误了");
}
}
/**
* 用户连接出错
*/
@Override
public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
logger.info("服务端发生了错误:" + throwable.getMessage());
closeConnection(webSocketSession);
}
/**
* 用户退出后的处理
*/
@Override
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus status) throws Exception {
closeConnection(webSocketSession);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
private void closeConnection(WebSocketSession webSocketSession) {
String sessionId = webSocketSession.getId();
String offUserId = webSocketSession.getAttributes().get("userId").toString();
// 自减
int userNum = UserRecordParamManager.onlineNumber.decrementAndGet();
UserRecordParamManager.onlineUser.remove(offUserId);
UserRecordParamManager.userSession.remove(offUserId);
logger.info("有连接关闭! sessionId:{},", sessionId);
logger.info("userId:{},在线人数{}", offUserId, userNum);
}
}
7、WebSocketConfig配置
配置WebSocket,配置socket的连接地址
,配置Handler
与Interceptors
。
import com.lhz.socket.websocket.user.UserWebSocketMessageHandler;
import com.lhz.socket.websocket.user.UserWebSocketInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.annotation.Resource;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private UserWebSocketMessageHandler userWebSocketMessageHandler;
@Resource
private UserWebSocketInterceptor userWebSocketInterceptor;
/**
* 注册WebSocket处理类
*
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 加载连接地址及拦截器
registry.addHandler(userWebSocketMessageHandler, "/user")
.addInterceptors(userWebSocketInterceptor)
.setAllowedOrigins("*");
//.withSockJS(); //通过socketJS方式连接
//不同的地址配置不同的拦截器及处理器
}
/**
* 支持websocket
*/
@Bean
public ServerEndpointExporter createServerEndExporter() {
return new ServerEndpointExporter();
}
}
8、前端页面
样式如下:
代码如下:
代码中,需要根据实际情况对连接地址
做修改,代码中的地址是ws://localhost:9009/user?userId={userId}
,需要根据自己的环境,修改地址中的ip值以及port值
。
<!DOCTYPE HTML>
<html>
<head>
<title>Test My WebSocket</title>
</head>
<body>
<p style="text-align: center"}>Socket测试<p/>
<input style="width: 300px" id="address" type="text" value="ws://localhost:9009/user?userId={userId}" />
<button id="conection" onclick="conection()">连接</button>
<button id="close" onclick="closeWebSocket()">断开</button>
<p/>
<input style="width: 300px" id="server-text" type="text" value="发送测试消息到服务器!" />
<button id="send-server" onclick="sendServer('server')">发送至服务</button>
<p/>
<input style="width: 80px" id="user-id" type="text" value="userId" />
<input style="width: 300px" id="user-text" type="text" value="发送给指定人员!" />
<button id="send-user" onclick="sendUser('user')">发送给用户</button>
<p/>
<input style="width: 300px" id="all-text" type="text" value="发送给所有人!" />
<button id="send-all" onclick="sendAll('all')">发送所有人</button>
<div id="message"></div>
</body>
<script type="text/javascript">
//避免重复连接
var lockReconnect = false;
// socket对象
var websocket = null;
//建立连接
function conection(){
createWebSocket(true);
}
function createWebSocket(tag) {
var address = document.getElementById("address").value;
console.log("createWebSocket...");
if (tag) {//true表示正常连接
count = 1;
}
try {
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
websocket = new WebSocket(address);
} else {
alert('当前浏览器不支持\n请更换浏览器');
}
init();
} catch (e) {
console.log('catch' + e);
reconnect();
}
}
function init() {
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("连接失败...");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("连接成功...");
//心跳检测重置
heartCheck.start();
}
//接收到消息的回调方法
websocket.onmessage = function(event){
var data = event.data;
if (data === '201') {
data = "对方不在线,无法发送!"
}
if(data != '收到心跳!'){
setMessageInnerHTML(data);
}
//心跳检测重置
heartCheck.start();
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
}
function reconnect() {
if (lockReconnect) {
return;
}
//最多重新连接五次
if (count >= 5) {
alert("无法连接到服务,请稍后刷新重试!");
return;
}
console.log("第" + count + "次尝试连接中...");
count++;
lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
var tt = setTimeout(function () {
createWebSocket(false);
lockReconnect = false;
}, 1000);
}
//心跳检测
var heartCheck = {
timeout: 5000, //30000,
timeoutObj: null,
serverTimeoutObj: null,
start: function () {
console.log('开启心跳检测');
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function () {
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
console.log("ping...");
//心跳识别名称叫做 heartCheck
sendHeartCheck();
}, this.timeout)
}
};
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function sendServer(type) {
var text = document.getElementById("server-text").value;
var msgData = {};
msgData['type'] = type;
msgData['msg'] = text;
console.log(msgData);
websocket.send(JSON.stringify(msgData));
}
function sendUser(type) {
var recevieUserId = document.getElementById("user-id").value;
var text = document.getElementById("user-text").value;
var msgData = {};
msgData['type'] = type;
msgData['msg'] = text;
msgData['recevieUserId'] = recevieUserId;
console.log(msgData);
websocket.send(JSON.stringify(msgData));
}
function sendAll(type) {
var text = document.getElementById("all-text").value;
var msgData = {};
msgData['type'] = type;
msgData['msg'] = text;
console.log(msgData);
websocket.send(JSON.stringify(msgData));
}
function sendHeartCheck(){
var msgData = {};
msgData['type'] = 'ping';
msgData['msg'] = 'ping';
console.log(msgData);
websocket.send(JSON.stringify(msgData));
}
</script>
</html>
9、测试
将上述《8、前端页面》
中的前端代码,复制到html文件中,打开两个html
页面,页面效果如下:
9.1、连接测试
将两个html
页面中的{userId}分别修改为1、2,并且点击连接
按钮;
服务端控制台信息如下:
9.2、发送消息测试
客户端
发送信息到服务端
,服务端收到消息后,给与一个回馈。
对应前端代码如下:
对应后端代码如下:
类:UserWebSocketMessageHandler
9.3、用户间消息测试
我们通过用户1
客户端给用户2
客户端发送一个消息;
对应前端代码如下:
对应后端代码如下:
类:UserWebSocketMessageHandler
9.4、消息群发测试
在客户端用户2
中发送群消息,效果如下:
对应前端代码如下:
对应后端代码如下:
类:UserWebSocketMessageHandler
9.5、服务端主动推送测试
为了测试服务器
主动推送消息到socket客户端
,我们实现一个controller
,通过http接口
的方式触发消息推送条件。
controller:
import com.lhz.socket.websocket.user.UserSendMessageManager;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* 通过http请求接口进行触发,由服务器主动发送内容
*/
@RestController
public class UserController {
@Resource
private UserSendMessageManager userSendMessageManager;
@GetMapping("/send/{userId}")
public void sendToUser(@PathVariable("userId") String userId) {
userSendMessageManager.sendMessageToUser(userId, userId + "——这里是服务端主动推送!");
}
@GetMapping("/sendAll")
public void sendAll() {
userSendMessageManager.sendMessageAll("ALL——这里是服务端主动推送!");
}
}
使用:
通过http请求接口即可
-
sendToUser接口:
浏览器访问接口:http://localhost:9009/send/1
,服务端主动给客户端用户1
发送消息,效果如下:
-
sendAll接口:
浏览器访问接口:http://localhost:9009/sendAll
,服务端主动给所有的客户端
发送消息,效果如下:
10、建议
在阅读文章时,可以直接先按步骤把代码建立好,然后再根据测试步骤
进行代码的梳理。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18126.html