使用SseEmitter实现ChatGPT打字机效果的服务端流式输出

背景

ChatGPT已经火了很久了,不得不佩服,技术确实很牛逼。使用过的人应该都体验过他的输出是打字机效果的,内容是一点一点输出的,极大的增强了用户体验,效果如下图所示。使用SseEmitter实现ChatGPT打字机效果的服务端流式输出作为技术人,是不是有很多人跟我一样都好奇他是如何实现的呢,其实就是基于Server-Sent Events(SSE)协议来做的,接下来介绍下SSE,然后做一个简单的demo实现跟ChatGPT一样的打字机效果吧。

什么是Server-Sent Events(SSE)

SSE简介

Server-Sent Events,简称SSE,是一种基于HTTP协议的服务器推送技术,用于实现服务器向客户端实时发送事件流(也就是数据发送)。客户端通过建立一个持久的HTTP连接来接收服务端的数据推送。

SSE基于HTTP长连接,通过使用持久性的、单向的、实时的数据流,使服务器能够主动向客户端发送数据。与传统的HTTP请求-响应模式不同,SSE允许服务器在任何时候向客户端发送数据,而不需要客户端频繁地发起请求。

SSE协议定义了一种特殊的事件流格式,服务器通过将数据封装为事件,发送给客户端。客户端通过监听这些事件,可以实时接收服务器发送的数据。每个事件可以包含一个事件类型、事件标识符和事件数据。客户端可以通过EventSource API来接收和处理这些事件。

SSE特点

  1. 实时性:SSE提供了实时的数据传输,服务器可以即时向客户端发送数据,实现实时通信和实时更新。
  2. 简单易用:SSE使用简单的事件流格式,服务器和客户端之间的交互相对简单,易于实现和维护。
  3. 单向性:SSE是一种单向的通信机制,只允许服务器向客户端发送数据,客户端无法向服务器发送请求。
  4. 自动重连:SSE支持自动重连机制,当连接中断或丢失时,客户端会自动尝试重新建立连接,保持与服务器的持久连接。

SSE适用于需要实时数据更新的应用场景,例如实时聊天、股票行情更新、实时通知等。它与其他实时通信技术(如WebSocket)相比,更加简单易用,适用于一些简单的实时应用场景。需要注意的是,SSE在浏览器的支持程度上有一些限制,不同浏览器对SSE的支持程度可能会有所差异。但在大多数现代浏览器中,SSE都得到了良好的支持。

SSE工作原理

  1. 客户端通过发送一个 HTTP 请求来建立 SSE 连接(连接建立成功则服务端即可进行随时发送数据,失败时会自动进行重连)。
  2. 服务端在建立连接后保持该连接打开,可随时发送事件数据给客户端(也可以关闭,那这样就是单一的请求-响应模式了,发挥不了他的实时数据推送优势了)。
  3. 服务端头部使用“Content-Type:text/event-stream”来标识 SSE 连接,并使用特定格式的数据来发送事件给客户端。
  4. 客户端接收到事件后,可以使用 JavaScript 的 EventSource 接口来处理事件数据。SSE也是HTML5规范的一部分。

什么是SseEmitter?

SseEmitter是Spring框架提供的一个类,具体来说是在Spring MVC中,类全路径,org.springframework.web.servlet.mvc.method.annotation.SseEmitter,用于支持服务器端向客户端实时发送事件流。它基于Server-Sent Events(SSE)协议,可以在HTTP长连接上发送持久性的、单向的、实时的数据流。SseEmitter提供了一种简单而有效的方式,用于实现服务器端的流式输出,适用于各种实时通信场景。

在Spring MVC中,SseEmitter类充当了服务器端和客户端之间的桥梁。服务器可以通过SseEmitter对象向客户端发送SSE事件,而客户端可以通过监听这些事件来接收服务器发送的数据。

使用SseEmitter时,服务器端可以通过SseEmitter对象的方法向客户端发送数据。例如,可以使用send()方法发送一个事件,并将数据作为事件的内容发送。还可以使用send(SseEmitter.SseEventBuilder)方法发送一个自定义的SSE事件。客户端可以通过使用HTML5的EventSource API来监听服务器发送的SSE事件。在客户端页面中,可以创建一个EventSource对象,并指定要接收SSE事件的URL。然后,可以通过监听message事件来处理接收到的数据。

接下来以一个基于springboot使用SseEmitter写的小例子,看一下服务端数据实时推送和打字机的效果。

核心代码实现

  1. pom依赖,主要依赖了spring-boot-starter-web,SseEmitter就在spring.webmvc-5.3.26.jar里。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.star95</groupId>
    <artifactId>pushmsg</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>pushmsg</name>
    <description>pushmsg</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- mvc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  1. 后端
package com.star95.pushmsg.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 客户端接口
 */

@Slf4j
@RestController
@RequestMapping("/sse")
public class CustomerChatController {

    /**
     * 采用线程池异步去发送数据
     */

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(21060, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));


    /**
     * 将所有连接进来的客户端保存到map中
     */

    private static Map<String, SseEmitter> clientMap = new HashMap<>();

    /**
     * 建立连接,传tid作为客户端标识
     *
     * @param tid
     * @return
     */

    @GetMapping(value = "/connect", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter connect(String tid) {
        log.info("有新客户端加入:{}", tid);
        clientMap.put(tid, new SseEmitter(-1L));
        executor.execute(() -> {
            try {
                clientMap.get(tid).send("欢迎" + tid + "的加入");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return clientMap.get(tid);
    }

    /**
     * 客户端发送消息,服务端异步流式输出
     *
     * @param tid
     * @param question
     */

    @PostMapping(value = "/chat")
    public void chat(@RequestParam String tid, String question) {
        log.info("tid:{}, question:{}", tid, question);
        // sse设置不超时
        executor.execute(() -> {
            try {
                String s = "你好,我是AI智能机器人,有什么需要帮助的吗";
                for (int i = 0; i < s.length(); i++) {
                    clientMap.get(tid).send(String.valueOf(s.charAt(i)));
                    Thread.sleep(500);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    /**
     * 服务端主动推送消息
     *
     * @param msg
     * @param clientId
     */

    @PostMapping(value = "/server")
    public void server(@RequestParam String msg, @RequestParam String clientId) {
        log.info("服务端要发送的消息,clientId:{}, msg:{}", clientId, msg);
        // sse设置不超时
        executor.execute(() -> {
            try {
                clientMap.get(clientId).send(msg);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

  1. 前端

模拟客户端页面:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE客户端</title>
    <script type="text/javascript" src="jquery-3.7.1.min.js"></script>
</head>
<body>
    <h3>会话内容</h3>
    <div id="messageBox" style="border: 1px solid #000000">

    </div>
    <br/><br/><br/>
    <div id="sendBox">
        <h3>聊天窗口</h3>
        <textarea name="question" id="question" placeholder="请输入您的问题" rows="10" cols="100"></textarea>
        <div><button id="send" type="button">客户端发送</button></div>
    </div>
</body>
<script>
    const tid = Math.random();
    const source = new EventSource('http://localhost:8080/sse/connect?tid=' + tid);

    source.onmessage = (event) => {
        console.log("----------", event.data)
        var newDiv = document.createElement('span');
        newDiv.innerText = event.data;
        $("#messageBox").append(newDiv);
    };

    source.onerror = (error) => {
        console.error("SSE error:", error);
    };

    document.getElementById("send").onclick = () => {
        $.ajax({
            method: 'post',
            url: "http://localhost:8080/sse/chat",
            data: {
                "tid": tid,
                "question": document.getElementById("question").value
            },
            success: function( result ) {
                document.getElementById("question").value = "";
            }
        });
    }
</script>
</html>

模拟服务端页面,给指定的客户端发送消息:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE服务端</title>
    <script type="text/javascript" src="jquery-3.7.1.min.js"></script>
</head>
<body>
    <div id="sendBox1" style="border: 1px solid #000000">
        <div>模拟服务端发送消息</div>
        <div><input type="text" name="clientId" id="clientId" placeholder="请输入客户端id"/></div>
        <textarea name="msg" id="msg" placeholder="请输入消息" rows="10" cols="100"></textarea>
        <div><button id="send1" type="button">模拟服务端发送</button></div>
    </div>
</body>
<script>
    document.getElementById("send1").onclick = () => {
        $.ajax({
            method: 'post',
            url: "http://localhost:8080/sse/server",
            data: {
                "msg": document.getElementById("msg").value,
                "clientId": document.getElementById("clientId").value
            },
            success: function( result ) {
                document.getElementById("msg").value = "";
            }
        });
    }
</script>
</html>

打字机效果

启动服务,访问客户端页面:http://localhost:8080/sse-client.html使用SseEmitter实现ChatGPT打字机效果的服务端流式输出打开客户端的页面就会进行SSE的连接,“欢迎0.70358297679874的加入”就是服务端的输出。

在下面的聊天窗口,输入一句话,点击客户端发送,服务端响应的数据就会追加到会话内容的后面,这里就是打字机的效果如下:使用SseEmitter实现ChatGPT打字机效果的服务端流式输出

服务端推送数据

访问模拟服务端发送消息给客户端的页面:http://localhost:8080/sse-server.html

效果如下:使用SseEmitter实现ChatGPT打字机效果的服务端流式输出将刚才的客户端id复制过来,随便写一句话发送,客户端就可以收到,比如:使用SseEmitter实现ChatGPT打字机效果的服务端流式输出使用SseEmitter实现ChatGPT打字机效果的服务端流式输出这个案例非常简单,也非常的简陋,哈哈,凑合着看吧。主要是想大家看一下SseEmitter实现的SSE,打字机效果以及服务端的数据实时推送。

重要知识点

以上的案例比较简单,里面也有一些重要的知识点,说明如下:

  1. 自动重连机制

SSE支持自动重连,我们把服务器关掉,可以看到客户端一直在尝试重新连接。使用SseEmitter实现ChatGPT打字机效果的服务端流式输出

  1. 文中演示的知识普通的字符串返回,真实场景中可以返回JSON格式的字符串做对应的处理。
  2. 由于SseEmitter不支持序列化,所以无法使用redis作为共享存储,多台服务器集群时,在服务端需要考虑客户端存储的问题。
  3. SseEmitter类还有两个方法,complete()表示输出完成,一旦调用了这个方法,客户端再还没有重新连接的情况下,如果使用send方法发送数据会出错,直到连接重新建立,可以继续发送,另外一个方法 completeWithError(Throwable ex)表示服务端异常时调用这个方法会返给客户端,客户端EventSource对象的onerror方法需要处理错误。
  4. 使用SseEmitter时会自动设置response header的Content-Type:text/event-stream,其他方式可能需要自行设置。
  5. 这里只是简单的SSE的demo示例,虽然使用 SSE 技术可以实现 ChatGPT 一样的打字机效果,但是文中的SSE是使用基础的JavaScript的EventSource,type 为 eventsource,而 实际ChatGPT 为 fetch。且受浏览器 EventSource API 限制,在使用 SSE 时不能自定义请求头、只能发出 GET 请求。在大多数浏览器中,URL 限制 2000个字符,也无法满足ChatGPT 参数传递需求, 所以可以使用 Fetch API等其他方案也可以实现客户端的接口,具体可自行查阅相关资料。

其他服务端流式输出方案

服务端向客户端推送数据的还有以下几种常见方案:

  1. 轮询(Polling):客户端定时向服务端发送请求,服务端返回最新数据。这种方案的缺点是会产生大量无效的请求,增加了网络负载和服务器压力。
  2. 长轮询(Long Polling):客户端发送请求到服务端,服务端保持连接打开,直到有新数据可推送给客户端或超时。客户端收到响应后再立即发送新的请求。这种方案减少了无效请求的数量,但仍然存在一定的延迟。
  3. WebSocket:WebSocket是一种全双工通信协议,可以在客户端和服务端之间建立持久连接,实现实时双向通信。客户端和服务端可以随时发送和接收数据,实现实时推送。
  4. MQTT(Message Queuing Telemetry Transport):MQTT是一种轻量级的发布/订阅消息传输协议,适用于低带宽和不稳定网络环境。客户端可以订阅感兴趣的主题,服务端向订阅者推送相应的消息。

这些方案网上资料较多,就不再介绍了,他们各有优缺点,结合具体的应用场景和需求选择合适的方案。

欢迎关注公众号,欢迎分享、点赞、在看

使用SseEmitter实现ChatGPT打字机效果的服务端流式输出



原文始发于微信公众号(小新成长之路):使用SseEmitter实现ChatGPT打字机效果的服务端流式输出

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/238296.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!