背景
ChatGPT已经火了很久了,不得不佩服,技术确实很牛逼。使用过的人应该都体验过他的输出是打字机效果的,内容是一点一点输出的,极大的增强了用户体验,效果如下图所示。作为技术人,是不是有很多人跟我一样都好奇他是如何实现的呢,其实就是基于Server-Sent Events(SSE)协议来做的,接下来介绍下SSE,然后做一个简单的demo实现跟ChatGPT一样的打字机效果吧。
什么是Server-Sent Events(SSE)
SSE简介
Server-Sent Events,简称SSE,是一种基于HTTP协议的服务器推送技术,用于实现服务器向客户端实时发送事件流(也就是数据发送)。客户端通过建立一个持久的HTTP连接来接收服务端的数据推送。
SSE基于HTTP长连接,通过使用持久性的、单向的、实时的数据流,使服务器能够主动向客户端发送数据。与传统的HTTP请求-响应模式不同,SSE允许服务器在任何时候向客户端发送数据,而不需要客户端频繁地发起请求。
SSE协议定义了一种特殊的事件流格式,服务器通过将数据封装为事件,发送给客户端。客户端通过监听这些事件,可以实时接收服务器发送的数据。每个事件可以包含一个事件类型、事件标识符和事件数据。客户端可以通过EventSource API来接收和处理这些事件。
SSE特点
-
实时性:SSE提供了实时的数据传输,服务器可以即时向客户端发送数据,实现实时通信和实时更新。 -
简单易用:SSE使用简单的事件流格式,服务器和客户端之间的交互相对简单,易于实现和维护。 -
单向性:SSE是一种单向的通信机制,只允许服务器向客户端发送数据,客户端无法向服务器发送请求。 -
自动重连:SSE支持自动重连机制,当连接中断或丢失时,客户端会自动尝试重新建立连接,保持与服务器的持久连接。
SSE适用于需要实时数据更新的应用场景,例如实时聊天、股票行情更新、实时通知等。它与其他实时通信技术(如WebSocket)相比,更加简单易用,适用于一些简单的实时应用场景。需要注意的是,SSE在浏览器的支持程度上有一些限制,不同浏览器对SSE的支持程度可能会有所差异。但在大多数现代浏览器中,SSE都得到了良好的支持。
SSE工作原理
-
客户端通过发送一个 HTTP 请求来建立 SSE 连接(连接建立成功则服务端即可进行随时发送数据,失败时会自动进行重连)。 -
服务端在建立连接后保持该连接打开,可随时发送事件数据给客户端(也可以关闭,那这样就是单一的请求-响应模式了,发挥不了他的实时数据推送优势了)。 -
服务端头部使用“Content-Type:text/event-stream”来标识 SSE 连接,并使用特定格式的数据来发送事件给客户端。 -
客户端接收到事件后,可以使用 JavaScript 的 EventSource 接口来处理事件数据。SSE也是HTML5规范的一部分。
什么是SseEmitter?
SseEmitter是Spring框架提供的一个类,具体来说是在Spring MVC中,类全路径,org.springframework.web.servlet.mvc.method.annotation.SseEmitter
,用于支持服务器端向客户端实时发送事件流。它基于Server-Sent Events(SSE)协议,可以在HTTP长连接上发送持久性的、单向的、实时的数据流。SseEmitter提供了一种简单而有效的方式,用于实现服务器端的流式输出,适用于各种实时通信场景。
在Spring MVC中,SseEmitter类充当了服务器端和客户端之间的桥梁。服务器可以通过SseEmitter对象向客户端发送SSE事件,而客户端可以通过监听这些事件来接收服务器发送的数据。
使用SseEmitter时,服务器端可以通过SseEmitter对象的方法向客户端发送数据。例如,可以使用send()方法发送一个事件,并将数据作为事件的内容发送。还可以使用send(SseEmitter.SseEventBuilder)方法发送一个自定义的SSE事件。客户端可以通过使用HTML5的EventSource API来监听服务器发送的SSE事件。在客户端页面中,可以创建一个EventSource对象,并指定要接收SSE事件的URL。然后,可以通过监听message事件来处理接收到的数据。
接下来以一个基于springboot使用SseEmitter写的小例子,看一下服务端数据实时推送和打字机的效果。
核心代码实现
-
pom依赖,主要依赖了spring-boot-starter-web,SseEmitter就在spring.webmvc-5.3.26.jar里。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.star95</groupId>
<artifactId>pushmsg</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>pushmsg</name>
<description>pushmsg</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- mvc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
-
后端
package com.star95.pushmsg.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 客户端接口
*/
@Slf4j
@RestController
@RequestMapping("/sse")
public class CustomerChatController {
/**
* 采用线程池异步去发送数据
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
/**
* 将所有连接进来的客户端保存到map中
*/
private static Map<String, SseEmitter> clientMap = new HashMap<>();
/**
* 建立连接,传tid作为客户端标识
*
* @param tid
* @return
*/
@GetMapping(value = "/connect", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter connect(String tid) {
log.info("有新客户端加入:{}", tid);
clientMap.put(tid, new SseEmitter(-1L));
executor.execute(() -> {
try {
clientMap.get(tid).send("欢迎" + tid + "的加入");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return clientMap.get(tid);
}
/**
* 客户端发送消息,服务端异步流式输出
*
* @param tid
* @param question
*/
@PostMapping(value = "/chat")
public void chat(@RequestParam String tid, String question) {
log.info("tid:{}, question:{}", tid, question);
// sse设置不超时
executor.execute(() -> {
try {
String s = "你好,我是AI智能机器人,有什么需要帮助的吗";
for (int i = 0; i < s.length(); i++) {
clientMap.get(tid).send(String.valueOf(s.charAt(i)));
Thread.sleep(500);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* 服务端主动推送消息
*
* @param msg
* @param clientId
*/
@PostMapping(value = "/server")
public void server(@RequestParam String msg, @RequestParam String clientId) {
log.info("服务端要发送的消息,clientId:{}, msg:{}", clientId, msg);
// sse设置不超时
executor.execute(() -> {
try {
clientMap.get(clientId).send(msg);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
模拟客户端页面:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE客户端</title>
<script type="text/javascript" src="jquery-3.7.1.min.js"></script>
</head>
<body>
<h3>会话内容</h3>
<div id="messageBox" style="border: 1px solid #000000">
</div>
<br/><br/><br/>
<div id="sendBox">
<h3>聊天窗口</h3>
<textarea name="question" id="question" placeholder="请输入您的问题" rows="10" cols="100"></textarea>
<div><button id="send" type="button">客户端发送</button></div>
</div>
</body>
<script>
const tid = Math.random();
const source = new EventSource('http://localhost:8080/sse/connect?tid=' + tid);
source.onmessage = (event) => {
console.log("----------", event.data)
var newDiv = document.createElement('span');
newDiv.innerText = event.data;
$("#messageBox").append(newDiv);
};
source.onerror = (error) => {
console.error("SSE error:", error);
};
document.getElementById("send").onclick = () => {
$.ajax({
method: 'post',
url: "http://localhost:8080/sse/chat",
data: {
"tid": tid,
"question": document.getElementById("question").value
},
success: function( result ) {
document.getElementById("question").value = "";
}
});
}
</script>
</html>
模拟服务端页面,给指定的客户端发送消息:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE服务端</title>
<script type="text/javascript" src="jquery-3.7.1.min.js"></script>
</head>
<body>
<div id="sendBox1" style="border: 1px solid #000000">
<div>模拟服务端发送消息</div>
<div><input type="text" name="clientId" id="clientId" placeholder="请输入客户端id"/></div>
<textarea name="msg" id="msg" placeholder="请输入消息" rows="10" cols="100"></textarea>
<div><button id="send1" type="button">模拟服务端发送</button></div>
</div>
</body>
<script>
document.getElementById("send1").onclick = () => {
$.ajax({
method: 'post',
url: "http://localhost:8080/sse/server",
data: {
"msg": document.getElementById("msg").value,
"clientId": document.getElementById("clientId").value
},
success: function( result ) {
document.getElementById("msg").value = "";
}
});
}
</script>
</html>
打字机效果
启动服务,访问客户端页面:http://localhost:8080/sse-client.html打开客户端的页面就会进行SSE的连接,“欢迎0.70358297679874的加入”就是服务端的输出。
在下面的聊天窗口,输入一句话,点击客户端发送,服务端响应的数据就会追加到会话内容的后面,这里就是打字机的效果如下:
服务端推送数据
访问模拟服务端发送消息给客户端的页面:http://localhost:8080/sse-server.html
效果如下:将刚才的客户端id复制过来,随便写一句话发送,客户端就可以收到,比如:
这个案例非常简单,也非常的简陋,哈哈,凑合着看吧。主要是想大家看一下SseEmitter实现的SSE,打字机效果以及服务端的数据实时推送。
重要知识点
以上的案例比较简单,里面也有一些重要的知识点,说明如下:
-
自动重连机制
SSE支持自动重连,我们把服务器关掉,可以看到客户端一直在尝试重新连接。
-
文中演示的知识普通的字符串返回,真实场景中可以返回JSON格式的字符串做对应的处理。 -
由于SseEmitter不支持序列化,所以无法使用redis作为共享存储,多台服务器集群时,在服务端需要考虑客户端存储的问题。 -
SseEmitter类还有两个方法,complete()表示输出完成,一旦调用了这个方法,客户端再还没有重新连接的情况下,如果使用send方法发送数据会出错,直到连接重新建立,可以继续发送,另外一个方法 completeWithError(Throwable ex)表示服务端异常时调用这个方法会返给客户端,客户端EventSource对象的onerror方法需要处理错误。 -
使用SseEmitter时会自动设置response header的Content-Type:text/event-stream,其他方式可能需要自行设置。 -
这里只是简单的SSE的demo示例,虽然使用 SSE 技术可以实现 ChatGPT 一样的打字机效果,但是文中的SSE是使用基础的JavaScript的EventSource,type 为 eventsource,而 实际ChatGPT 为 fetch。且受浏览器 EventSource API 限制,在使用 SSE 时不能自定义请求头、只能发出 GET 请求。在大多数浏览器中,URL 限制 2000个字符,也无法满足ChatGPT 参数传递需求, 所以可以使用 Fetch API等其他方案也可以实现客户端的接口,具体可自行查阅相关资料。
其他服务端流式输出方案
服务端向客户端推送数据的还有以下几种常见方案:
-
轮询(Polling):客户端定时向服务端发送请求,服务端返回最新数据。这种方案的缺点是会产生大量无效的请求,增加了网络负载和服务器压力。 -
长轮询(Long Polling):客户端发送请求到服务端,服务端保持连接打开,直到有新数据可推送给客户端或超时。客户端收到响应后再立即发送新的请求。这种方案减少了无效请求的数量,但仍然存在一定的延迟。 -
WebSocket:WebSocket是一种全双工通信协议,可以在客户端和服务端之间建立持久连接,实现实时双向通信。客户端和服务端可以随时发送和接收数据,实现实时推送。 -
MQTT(Message Queuing Telemetry Transport):MQTT是一种轻量级的发布/订阅消息传输协议,适用于低带宽和不稳定网络环境。客户端可以订阅感兴趣的主题,服务端向订阅者推送相应的消息。
这些方案网上资料较多,就不再介绍了,他们各有优缺点,结合具体的应用场景和需求选择合适的方案。
欢迎关注公众号,欢迎分享、点赞、在看
原文始发于微信公众号(小新成长之路):使用SseEmitter实现ChatGPT打字机效果的服务端流式输出
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/238296.html