【网课平台】Day7.分布式事务控制与微服务的远程调用

有时候,不是因为你没有能力,也不是因为你缺少勇气,只是因为你付出的努力还太少,所以,成功便不会走向你。而你所需要做的,就是坚定你的梦想,你的目标,你的未来,然后以不达目的誓不罢休的那股劲,去付出你的努力,成功就会慢慢向你靠近。

导读:本篇文章讲解 【网课平台】Day7.分布式事务控制与微服务的远程调用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

一、需求:课程审核

1、需求分析

课程发布前要先审核,审核通过方可发布。
在这里插入图片描述
在课程基本表course_base表加审核状态字段。审核通过后,教学机构可发布(发布状态)。

在这里插入图片描述

2、建表与数据模型

思考:课程提交审核后教育方能修改课程吗?

  • 若不允许,万一提交完后发现少点啥,想改就得等审核完。如果审核周期较长,说不定等审核完,用户想修改啥自己都忘了,用户体验很差
  • 若允许改,如果审核方查数据和教学方改数据的表是同一份,会出现:审核员点击审核,获取到了视频A,觉得不错,准备审核通过,此时教学方修改了视频,而审核员获取到的还是旧数据,就有Bug

在这里插入图片描述
为解决上面的问题,设计课程预发布表来分开两方数据:

  • 教育方点击提交审核,汇总四张基本表中(当前这个时间点的)课程信息写入课程预发布表
  • 审核方从预发布表拿数据进行审核
  • 审核途中,教育方可以改数据,但改的是四张基本表,不写入预发布表
  • 审核通过,将预发布表的信息写入课程发布表

在这里插入图片描述

即修改后点击保存,是更改了基本信息表,只有点提交审核,才update预发布表。从而实现:

  • 提交后可修改
  • 修改后必须点击提交审核才能提交
  • 提交审核后必须等审核完才能再次将新的修改再次提交审核

课程预发布表:

在这里插入图片描述
注意这里,字段太多的信息直接用json格式,数据类型text

在这里插入图片描述

审核记录表:

在这里插入图片描述

审核后:

  • 更新课程基本信息表的课程审核状态为:已经提交
  • 课程审核后更新课程基本信息表的审核状态、课程预发布表的审核状态,并将审核结果写入课程审核记录

3、接口定义

//提交审核接口

@ResponseBody
@PostMapping ("/courseaudit/commit/{courseId}")
public void commitAudit(@PathVariable("courseId") Long courseId){

}

4、Mapper层开发

直接生成PO类、写Mapper接口继承BaseMapper<PO>

5、Service层开发

分析,在这里要实现的逻辑是:

  • 查询课程基本信息、课程营销信息、课程计划信息
  • 插入到,课程预发布表course_publish_pre。若已存在,则更新
  • 更新课程基本表course_base课程审核状态为:已提交

要做的业务校验(约束)有:

  • 对已提交审核的课程不允许提交审核(业务逻辑校验)
  • 本机构只允许提交本机构的课程(身份校验)
  • 没有上传图片或没有添加课程计划,不允许提交审核
Java
/**
 * @description 提交审核
 * @param courseId  课程id
*/
public void commitAudit(Long companyId,Long courseId);

写实现类:

@Override
@Transactional
public void commitAudit(Long companyId, Long courseId) {

	 CourseBase courseBase = courseBaseMapper.selectById(courseId);
	 //课程审核状态
	 String auditStatus = courseBase.getAuditStatus();
	 //当前审核状态为已提交不允许再次提交
	 if("202003".equals(auditStatus)){
	  MyException.cast("当前为等待审核状态,审核完成可以再次提交(只能保存修改)。");
	 }
	 //本机构只允许提交本机构的课程
	 if(!courseBase.getCompanyId().equals(companyId)){
	  MyException.cast("不允许提交其它机构的课程。");
	 }
	
	 //课程图片是否填写
	 if(StringUtils.isEmpty(courseBase.getPic())){
	  MyException.cast("提交失败,请上传课程图片");
	 }
	
	 //添加课程预发布记录
	 CoursePublishPre coursePublishPre = new CoursePublishPre();
	 //课程基本信息加部分营销信息
	 CourseBaseInfoDto courseBaseInfo = courseBaseInfoService.getCourseBaseInfo(courseId);
	 BeanUtils.copyProperties(courseBaseInfo,coursePublishPre);
	 //课程营销信息
	 CourseMarket courseMarket = courseMarketMapper.selectById(courseId);
	 //转为json
	 String courseMarketJson = JSON.toJSONString(courseMarket);
	 //将课程营销信息json数据放入课程预发布表
	 coursePublishPre.setMarket(courseMarketJson);
	
	 //查询课程计划信息
	 List<TeachplanDto> teachplanTree = teachplanService.findTeachplanTree(courseId);
	 if(teachplanTree.size()<=0){
	  XueChengPlusException.cast("提交失败,还没有添加课程计划");
	 }
	 //转json
	 String teachplanTreeString = JSON.toJSONString(teachplanTree);
	 coursePublishPre.setTeachplan(teachplanTreeString);
	
	 //设置预发布记录状态,已提交
	 coursePublishPre.setStatus("202003");
	 //教学机构id
	 coursePublishPre.setCompanyId(companyId);
	 //提交时间
	 coursePublishPre.setCreateDate(LocalDateTime.now());
	 CoursePublishPre coursePublishPreUpdate = coursePublishPreMapper.selectById(courseId);
	 if(coursePublishPreUpdate == null){
	  //添加课程预发布记录,不存在是插入,存在时更新
	  coursePublishPreMapper.insert(coursePublishPre);
	 }else{
	  coursePublishPreMapper.updateById(coursePublishPre);
	 }
	
	 //更新课程基本表的审核状态
	 courseBase.setAuditStatus("202003");
	 //同步更新状态
	 courseBaseMapper.updateById(courseBase);
}

注意这里引用类型对象转json:String objectJson = JSON.toJSONString(xxObject);

6、完善controller层

@ResponseBody
@PostMapping ("/courseaudit/commit/{courseId}")
public void commitAudit(@PathVariable("courseId") Long courseId){
     Long companyId = 1232141425L;
     coursePublishService.commitAudit(companyId,courseId);

 }

以上为提交审核的接口。运营方审核,即:

  • 更改预发布表中的审核状态为审核通过202004
  • 更改课程基本表的审核状态为审核通过202004
  • 审核接口不再贴代码

二、需求:课程发布

1、需求分析

审核员审核通过后,教学方可以选择发布。
在这里插入图片描述

2、建表与数据模型

课程发布后,课程信息的展示有两个问题(海量用户的查看和教学方的预览不是一个问题):

  • 如何快速搜索课程
  • 打开课程 详情页时仍然去查MySQL数据库,性能不够

为了提高网站的速度需要将课程信息进行缓存,并且要将课程信息加入索引库方便搜索,静态页面也不能再每次模型渲染,而是直接存储静态页面:

在这里插入图片描述
新建课程发布表,和预发布表一样,状态字段是发布状态,不再是审核状态

在这里插入图片描述

3、技术方案

分布式事务

和之前的事务不同,课程发布操作后将数据写入数据库、redis、elasticsearch、MinIO四个地方,这四个地方已经不限制在一个数据库内,是由四个分散的服务去提供,与这四个服务去通信需要网络通信,而网络存在不可到达性,这种分布式系统环境下,通过与不同的服务进行网络通信去完成事务称之为分布式事务

参考:本地事务和分布式事务的区别

以下场景都存在分布式事务:

  • 微服务架构下:
    在这里插入图片描述
  • 单服务多数据库:
    在这里插入图片描述
  • 多服务单数据库
    在这里插入图片描述

先根据实际场景定要满足CP还是AP,再技术选型。此需求我们只需要数据的最终一致性,不用强一致,因此选择AP

课程发布的分布式事务控制实现

本地消息表+任务调度机制完成分布式事务最终数据一致性的控制

在这里插入图片描述

  • 在内容管理服务的数据库中添加一个消息表,消息表和课程发布表在同一个数据库
  • 课程发布通过本地事务向课程发布表写入课程发布信息,同时向消息表写课程发布的消息。通过数据库进行控制,只要课程发布表插入成功消息表也插入成功,消息表的数据就记录了某门课程发布的任务
  • 定时调度内容管理服务去定时扫描消息表的记录
  • 消息表中扫描到数据后向redis、elasticsearch、MinIO同步数据
  • 同步成功后删除消息表的这条记录

时序图:

在这里插入图片描述

4、接口定义

@Api(value = "课程预览发布接口",tags = "课程预览发布接口")
@Controller
public class CoursePublishController {
...
 @ApiOperation("课程发布")
 @ResponseBody
 @PostMapping ("/coursepublish/{courseId}")
public void coursepublish(@PathVariable("courseId") Long courseId){

}

5、消息处理SDK

关于信息表,有以下操作:

  • 新增消息表
  • 扫描信息表
  • 更新信息表
  • 删除信息表

以后其他服务也可能需要这一套针对信息表的处理逻辑
在这里插入图片描述
为了复用代码,考虑将它抽成一个通用服务,此时,该服务就要连接多个数据库,且涉及到和其他微服务的网络通信 ====> 不合理

考虑将消息表的逻辑处理做成一个SDK工具包,而不是通用服务

在这里插入图片描述

消息表的设计:

  • 将小任务作为任务的不同的阶段,如课程发布任务需要执行三个同步操作:存储课程到redis、存储课程到索引库,存储课程页面到文件系统。
  • 每完成一个阶段在相应的阶段状态字段打上完成标记,即使这个大任务没有完成再重新执行时,如果小阶段任务完成了也不会重复执行某个小阶段的任务

在这里插入图片描述
SDK提供的接口定义:

package com.xuecheng.messagesdk.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.xuecheng.messagesdk.model.po.MqMessage;

import java.util.List;

/**
 *  消息处理服务接口
 *  MqMessage即消息表的PO类
 */
public interface MqMessageService extends IService<MqMessage> {

    /**
     * @description 扫描消息表记录,采用与扫描视频处理表相同的思路
     * @param shardIndex 分片序号
     * @param shardTotal 分片总数
     * @param count 扫描记录数
     * @return java.util.List 消息记录
     */
    public List<MqMessage> getMessageList(int shardIndex, int shardTotal,  String messageType,int count);

    /**
     * @description 完成任务
     * @param id 消息id
     * @return int 更新成功:1
     */
    public int completed(long id);

    /**
     * @description 完成阶段任务
     * @param id 消息id
     * @return int 更新成功:1
     */
    public int completedStageOne(long id);
    public int completedStageTwo(long id);
    public int completedStageThree(long id);
    public int completedStageFour(long id);

    /**
     * @description 查询阶段状态
     * @param id
     * @return int
    */
    public int getStageOne(long id);
    public int getStageTwo(long id);
    public int getStageThree(long id);
    public int getStageFour(long id);

}

消息SDK提供消息处理抽象类,此抽象类供使用者去继承使用:

package com.xuecheng.messagesdk.service;

import com.xuecheng.messagesdk.model.po.MqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;
import java.util.concurrent.*;

/**
 * @description 消息处理抽象类
 */
@Slf4j
@Data
public abstract class MessageProcessAbstract {

    @Autowired
    MqMessageService mqMessageService;


    /**
     * @param mqMessage 执行任务内容
     * @return boolean true:处理成功,false处理失败
     * @description 任务处理
     */
    public abstract boolean execute(MqMessage mqMessage);


    /**
     * @description 扫描消息表多线程执行任务
     * @param shardIndex 分片序号
     * @param shardTotal 分片总数
     * @param messageType  消息类型
     * @param count  一次取出任务总数
     * @param timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒
     * @return void
    */
    public void process(int shardIndex, int shardTotal,  String messageType,int count,long timeout) {

        try {
            //扫描消息表获取任务清单
            List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);
            //任务个数
            int size = messageList.size();
            log.debug("取出待处理消息"+size+"条");
            if(size<=0){
                return ;
            }

            //创建线程池
            ExecutorService threadPool = Executors.newFixedThreadPool(size);
            //计数器
            CountDownLatch countDownLatch = new CountDownLatch(size);
            messageList.forEach(message -> {
                threadPool.execute(() -> {
                    log.debug("开始任务:{}",message);
                    //处理任务
                    try {
                        boolean result = execute(message);
                        if(result){
                            log.debug("任务执行成功:{})",message);
                            //更新任务状态,删除消息表记录,添加到历史表
                            int completed = mqMessageService.completed(message.getId());
                            if (completed>0){
                                log.debug("任务执行成功:{}",message);
                            }else{
                                log.debug("任务执行失败:{}",message);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);
                    }
                    //计数
                    countDownLatch.countDown();
                    log.debug("结束任务:{}",message);

                });
            });

            //等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
            countDownLatch.await(timeout,TimeUnit.SECONDS);
            System.out.println("结束....");
        } catch (InterruptedException e) {
           e.printStackTrace();

        }

    }

}

继承上面的抽象类,写任务的执行方法

/**
 * @description 消息处理测试类,继承MessageProcessAbstract
 */
@Slf4j
@Component
public class MessageProcessClass extends MessageProcessAbstract {


    @Autowired
    MqMessageService mqMessageService;

    //执行任务
    @Override
    public boolean execute(MqMessage mqMessage) {
        Long id = mqMessage.getId();
        log.debug("开始执行任务:{}",id);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        //取出阶段状态,判断这个阶段是否执行成功过
        int stageOne = mqMessageService.getStageOne(id);
        if(stageOne<1){
            log.debug("开始执行第一阶段任务");
            System.out.println();
            int i = mqMessageService.completedStageOne(id);
            if(i>0){
                log.debug("完成第一阶段任务");
            }

        }else{
            log.debug("无需执行第一阶段任务");
        }

        return true;
    }
}

集成上面的SDK,直接引入Maven坐标即可:

<dependency>
    <groupId>com.xuecheng</groupId>
    <artifactId>xuecheng-plus-message-sdk</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

6、Mapper层开发

自动生成信息表的PO类和Mapper继承BaseMapper

7、Service层开发

当前是课程审核通过后,教育方可以选择发布课程(上架商品),上架的数据流是:

  • 向课程发布表course_publish插入一条记录,记录来源于课程预发布表,如果存在则更新,发布状态为:已发布
  • 更新course_base表的课程发布状态为:已发布
  • 删除预发布表记录
  • 向信息表插入一条数据(任务),任务的处理由后续定时任务处理,发布接口插入这条数据即可
@Transactional
@Override
public void publish(Long companyId, Long courseId) {

	 //校验业务逻辑
	 //查询课程预发布表,预发布表没数据即都没提交
	 CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
	 if(coursePublishPre == null){
	    XueChengPlusException.cast("请先提交课程审核,审核通过才可以发布");
	 }
	 //本机构只允许提交本机构的课程
	 if(!coursePublishPre.getCompanyId().equals(companyId)){
	  XueChengPlusException.cast("不允许提交其它机构的课程。");
	 }
	
	 //课程审核状态
	 String auditStatus = coursePublishPre.getStatus();
	 //审核通过方可发布
	 if(!"202004".equals(auditStatus)){
	  XueChengPlusException.cast("操作失败,课程审核通过方可发布。");
	 }
	 //保存课程发布信息
	 saveCoursePublish(courseId);
	
	 //保存消息表
	 saveCoursePublishMessage(courseId);
	
	//删除课程预发布表对应记录
	 coursePublishPreMapper.deleteById(courseId);

}

保存任务信息的saveCoursePublishMessage(courseId)方法:

Java
 /**
  * @description 保存消息表记录
  * @param courseId  课程id
  * @return void
  */
private void saveCoursePublishMessage(Long courseId){
	 MqMessage mqMessage = mqMessageService.addMessage("course_publish", String.valueOf(courseId), null, null);
	 if(mqMessage==null){
	  XueChengPlusException.cast(CommonError.UNKOWN_ERROR);
 	}
}

任务信息通过发布接口写入信息表后,需要通过定时任务进行:

  • 页面静态化并存储
  • 索引存入Elasticsearch
  • 课程缓存信息存入Redis

8、页面静态化

课程预览功能通过模板引擎在页面模板中填充数据来生成html页面,此过程是客户端请求服务器时,服务器才开始渲染填充出html,最后响应给服务器,而服务端的响应并发能力有限,这里的预览以后要给海量用户预览,而非发布课程前,教育方的一个人预览。
在这里插入图片描述

===>

因此考虑提前通过模板引擎技术生成html页面,而静态页面可以使用nginx、apache等高性能的web服务器,并发性能高

==>

页面静态化就是对于数据变化不频繁而又频繁请求的页面,直接生成html页面存起来。本需求中需要我完成两步:

  • 生成静态化页面
  • 上传html到文件系统

接口定义:

/**
 * @description 课程静态化
 * @param courseId  课程id
 * @return File 静态化文件
*/
public File generateCourseHtml(Long courseId);
/**
 * @description 上传课程静态化页面
 * @param file  静态化文件
 * @return void
*/
public void  uploadCourseHtml(Long courseId,File file);

实现接口:

public class CoursePublishServiceImpl implements CoursePublishService{


	@Override
    public File generateCourseHtml(Long courseId) {

        //静态化文件
        File htmlFile  = null;

        try {
            //配置freemarker
            Configuration configuration = new Configuration(Configuration.getVersion());

            //加载模板
            //选指定模板路径,classpath下templates下
            //得到classpath路径
            String classpath = this.getClass().getResource("/").getPath();
            configuration.setDirectoryForTemplateLoading(new File(classpath + "/templates/"));
            //设置字符编码
            configuration.setDefaultEncoding("utf-8");

            //指定模板文件名称
            Template template = configuration.getTemplate("course_template.ftl");

            //准备数据
            CoursePreviewDto coursePreviewInfo = this.getCoursePreviewInfo(courseId);

            Map<String, Object> map = new HashMap<>();
            map.put("model", coursePreviewInfo);

            //静态化
            //参数1:模板,参数2:数据模型
            String content = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);
//            System.out.println(content);
            //将静态化内容输出到文件中
            InputStream inputStream = IOUtils.toInputStream(content);
            //创建临时静态化文件
            htmlFile = File.createTempFile("course",".html");
            log.debug("课程静态化,生成静态文件:{}",htmlFile.getAbsolutePath());
            //输出流
            FileOutputStream outputStream = new FileOutputStream(htmlFile);
            IOUtils.copy(inputStream, outputStream);
        } catch (Exception e) {
            log.error("课程静态化异常:{}",e.toString());
            XueChengPlusException.cast("课程静态化异常");
        }

        return htmlFile;
    }

    @Override
    public void uploadCourseHtml(Long courseId, File file) {
    	//上传生成的html需要调用媒资管理服务的上传接口
    	//看下面微服务调用
    }


}

9、微服务远程调用

内容管理服务对页面静态化生成html文件需要调用媒资管理服务的上传文件接口。在Spring Cloud中可以使用Feign进行远程调用。下面配置Feign实现HTTP请求的发送

  • 添加Maven依赖
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- Spring Cloud 微服务远程调用 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-httpclient</artifactId>
</dependency>
<!--feign支持Multipart格式传参-->
<dependency>
    <groupId>io.github.openfeign.form</groupId>
    <artifactId>feign-form</artifactId>
    <version>3.8.0</version>
</dependency>
<dependency>
    <groupId>io.github.openfeign.form</groupId>
    <artifactId>feign-form-spring</artifactId>
    <version>3.8.0</version>
</dependency>

  • 在nacos配置feign-dev.yaml公用配置文件
feign:
  hystrix:
    enabled: true  # 开启熔断
  circuitbreaker:
    enabled: true
hystrix:
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 30000  #熔断超时时间
ribbon:
  ConnectTimeout: 60000 #连接超时时间
  ReadTimeout: 60000 #读超时时间
  MaxAutoRetries: 0 #重试次数
  MaxAutoRetriesNextServer: 1 #切换实例的重试次数

  • 在内容管理service工程引入这个公共配置
shared-configs:
  - data-id: feign-${spring.profiles.active}.yaml
    group: xuecheng-plus-common
    refresh: true

  • 在调用方编写feign接口
/**
 * 在调用方定义接口
 * @description 媒资管理服务远程接口
 * value即被调用方的服务名
 * configuration后面的类是配置feign支持Multipart
 */
 @FeignClient(value = "media-api",configuration = MultipartSupportConfig.class)
public interface MediaServiceClient {

 @RequestMapping(value = "/media/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
 String uploadFile(@RequestPart("filedata") MultipartFile upload,@RequestParam(value = "objectName",required=false) String objectName);
}

  • 在启动类添加@EnableFeignClients注解
@EnableFeignClients(basePackages={"com.xuecheng.content.feignclient"})
  • 测试
@SpringBootTest
public class FeignUploadTest {

    @Autowired
    MediaServiceClient mediaServiceClient;

    //远程调用,上传文件
    @Test
    public void test() {
    
        MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(new File("D:\\develop\\test.html"));
        mediaServiceClient.uploadFile(multipartFile,"course","test.html");
    }

}

10、熔断降级

微服务之间互相调用,当某一个服务异常,无法被正常调用,如果不去处理,可能会导致雪崩效应。如:以A服务异常开始:

在这里插入图片描述

以上问题的处理方案是–配置熔断和降级

熔断

当下游服务(被调用方)异常而断开与上游服务的交互,它就相当于保险丝,下游服务异常触发了熔断,从而保证上游服务不受影响。

在这里插入图片描述

降级

下游服务异常触发熔断后,上游服务不再调用异常的微服务而转去走另一套处理逻辑 ==> 降级处理逻辑 ==> 这个降级处理逻辑可以是一个服务本地的方法

在这里插入图片描述
熔断和降级,熔断是为了保护系统,是一种保护系统的手段,降级则是一种熔断后的处理方法

开启熔断

  • 开启熔断,在feign-dev.yaml中
feign:
  hystrix:
    enabled: true
  circuitbreaker:
    enabled: true

  • 设置熔断的超时时间,为了防止一次处理时间较长触发熔断
hystrix:
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 30000  #熔断超时时间
ribbon:
  ConnectTimeout: 60000 #连接超时时间
  ReadTimeout: 60000 #读超时时间
  MaxAutoRetries: 0 #重试次数
  MaxAutoRetriesNextServer: 1 #切换实例的重试次数

定义降级逻辑

实现方式一 =====> fallback

//定义一个fallback类
//实现在调用方编写feign接口(MediaServiceClient接口)
//熔断后走这里
public class MediaServiceClientFallback implements MediaServiceClient {

	@Override
	public String uploadFile( MultipartFile upload,String objectName){
		return null;
	}
}

在原调用方编写的feign接口上加入属性fallback

@FeignClient(value = "media-api",configuration = MultipartSupportConfig.class,fallback = MediaServiceClientFallback.class)
@RequestMapping("/media")
public interface MediaServiceClient{
...

此方式无法取出熔断所抛出的异常

实现方式二 =====> fallbackFactory

//定义MediaServiceClientFallbackFactory如下:
//实现FallbackFactory接口
//泛型指定为原调用方编写的feign接口

@Slf4j
@Component
public class MediaServiceClientFallbackFactory implements FallbackFactory<MediaServiceClient> {
	//拿到异常信息
    @Override
    public MediaServiceClient create(Throwable throwable) {
        return new MediaServiceClient(){

	//发生熔断后,上游方法执行这个方法来降级处理
    @Override
    public String uploadFile(MultipartFile upload, String objectName) {
        //降级方法
        log.debug("调用媒资管理服务上传文件时发生熔断,异常信息:{}",throwable.toString(),throwable);
        return null;
            }
        };
    }
}

这里返回一个啥自己定义,这里定义返回null,上游请求若得到一个null,就说明熔断降级处理了

11、发布任务代码完善

微服务调用结束后,将生成的HTML页面上传,完善上传方法:

@Override
public void uploadCourseHtml(Long courseId, File file) {
     MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(file);
     String course = mediaServiceClient.uploadFile(multipartFile, "course/"+courseId+".html");
     if(course==null){
         XueChengPlusException.cast("上传静态文件异常");
     }
 }

完善定时任务执行代码:

//生成课程静态化页面并上传至文件系统
public void generateCourseHtml(MqMessage mqMessage,long courseId){
    log.debug("开始进行课程静态化,课程id:{}",courseId);
    //消息id
    Long id = mqMessage.getId();
    //消息处理的service
    MqMessageService mqMessageService = this.getMqMessageService();
    //消息幂等性处理
    int stageOne = mqMessageService.getStageOne(id);
    if(stageOne == 1){
        log.debug("课程静态化已处理直接返回,课程id:{}",courseId);
        return ;
    }

    //生成静态化页面
    File file = coursePublishService.generateCourseHtml(courseId);
    //上传静态化页面
    if(file!=null){
        coursePublishService.uploadCourseHtml(courseId,file);
    }
    //保存第一阶段状态
    mqMessageService.completedStageOne(id);

}

最后在xxl-job中加入任务:

在这里插入图片描述
到此,剩Redis和索引信息没处理。

三、需求:课程搜索

1、需求分析

用户通过课程搜索找到课程信息,进一步去查看课程的详细信息,进行选课、支付、学习。

在这里插入图片描述

关键点分析:

  • 根据一级分类、二级分类搜索课程信息
  • 根据关键字搜索课程信息,搜索方式为全文检索,关键字需要匹配课程的名称、 课程内容
  • 根据难度等级搜索课程
  • 搜索结点分页显示
  • 结果中关键字高亮显示

2、全文检索

全文检索即扫描文章中的每一个词,对每一个词建立一个索引,并指明该词在文章中出现的次数和位置。当用户查询时,检索程序根据事先建立的索引进行查找,即通过索引来搜到文章。这里需要对课程信息建立索引。

在这里插入图片描述

全文检索的速度非常快,早期应用在搜索引擎技术中,比如:百度、google

在这里插入图片描述
Elasticsearch与MySQL之间概念的对应关系见下表:

在这里插入图片描述

3、定义模型类

搜索条件Dto:

/**
 * @description 搜索课程参数dtl
 */
 @Data
 @ToString
public class SearchCourseParamDto {

  //关键字
  private String keywords;

  //大分类
  private String mt;

  //小分类
  private String st;
  //难度等级
  private String grade;


}

4、定义接口

/**
 * @description 课程搜索接口
 * PageResult、PageParams是自己定义的分页相关的通用类
 */
@Api(value = "课程搜索接口",tags = "课程搜索接口")
@RestController
@RequestMapping("/course")
public class CourseSearchController {



   @ApiOperation("课程搜索列表")
   @GetMapping("/list")
   public PageResult<CourseIndex> list(PageParams pageParams, SearchCourseParamDto searchCourseParamDto){

    
   
   }
}

5、开发Mapper层

生成po类,创建Mapper接口并继承BaseMapper<PO>

6、开发Service层

Service层接口定义:

/**
 * @description 课程搜索service
 * @author Mr.M
 * @date 2022/9/24 22:40
 * @version 1.0
 */
public interface CourseSearchService {


    /**
     * @description 搜索课程列表
     * @param pageParams 分页参数
     * @param searchCourseParamDto 搜索条件
    */
    SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto searchCourseParamDto);

实现类:

/**
 * @description 课程搜索service实现类
 */
@Slf4j
@Service
public class CourseSearchServiceImpl implements CourseSearchService {

    @Value("${elasticsearch.course.index}")
    private String courseIndexStore;
    @Value("${elasticsearch.course.source_fields}")
    private String sourceFields;

    @Autowired
    RestHighLevelClient client;

    @Override
    public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {

        //设置索引
        SearchRequest searchRequest = new SearchRequest(courseIndexStore);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        //source源字段过虑
        String[] sourceFieldsArray = sourceFields.split(",");
        searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
        
        //分页
        Long pageNo = pageParams.getPageNo();
        Long pageSize = pageParams.getPageSize();
        int start = (int) ((pageNo-1)*pageSize);
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(Math.toIntExact(pageSize));
       //布尔查询
        searchSourceBuilder.query(boolQueryBuilder);
        
        //请求搜索
        searchRequest.source(searchSourceBuilder);
       
        
        SearchResponse searchResponse = null;
        try {
            searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
            log.error("课程搜索异常:{}",e.getMessage());
            return new SearchPageResultDto<CourseIndex>(new ArrayList(),0,0,0);
        }

        //结果集处理
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        //记录总数
        TotalHits totalHits = hits.getTotalHits();
        //数据列表
        List<CourseIndex> list = new ArrayList<>();

        for (SearchHit hit : searchHits) {

            String sourceAsString = hit.getSourceAsString();
            CourseIndex courseIndex = JSON.parseObject(sourceAsString, CourseIndex.class);
            list.add(courseIndex);

        }
        SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits.value,pageNo,pageSize);

        

        return pageResult;
    }


}

靠,没学过elasticsearch,这里有点吃力,先记个后端处理高亮的代码,后续工作遇到再细学吧。

@Override
public SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {

    //设置索引
    SearchRequest searchRequest = new SearchRequest(courseIndexStore);

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    //source源字段过虑
    String[] sourceFieldsArray = sourceFields.split(",");
    searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});
    if(courseSearchParam==null){
        courseSearchParam = new SearchCourseParamDto();
    }
    //关键字
    if(StringUtils.isNotEmpty(courseSearchParam.getKeywords())){
        //匹配关键字
        MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(courseSearchParam.getKeywords(), "name", "description");
        //设置匹配占比
        multiMatchQueryBuilder.minimumShouldMatch("70%");
        //提升另个字段的Boost值
        multiMatchQueryBuilder.field("name",10);
        boolQueryBuilder.must(multiMatchQueryBuilder);
    }
    //过虑
    if(StringUtils.isNotEmpty(courseSearchParam.getMt())){
        boolQueryBuilder.filter(QueryBuilders.termQuery("mtName",courseSearchParam.getMt()));
    }
    if(StringUtils.isNotEmpty(courseSearchParam.getSt())){
        boolQueryBuilder.filter(QueryBuilders.termQuery("stName",courseSearchParam.getSt()));
    }
    if(StringUtils.isNotEmpty(courseSearchParam.getGrade())){
        boolQueryBuilder.filter(QueryBuilders.termQuery("grade",courseSearchParam.getGrade()));
    }
    //分页
    Long pageNo = pageParams.getPageNo();
    Long pageSize = pageParams.getPageSize();
    int start = (int) ((pageNo-1)*pageSize);
    searchSourceBuilder.from(start);
    searchSourceBuilder.size(Math.toIntExact(pageSize));
    //布尔查询
    searchSourceBuilder.query(boolQueryBuilder);
    //高亮设置
    HighlightBuilder highlightBuilder = new HighlightBuilder();
    highlightBuilder.preTags("<font class='eslight'>");
    highlightBuilder.postTags("</font>");
    //设置高亮字段
    highlightBuilder.fields().add(new HighlightBuilder.Field("name"));
    searchSourceBuilder.highlighter(highlightBuilder);
    //请求搜索
    searchRequest.source(searchSourceBuilder);
    //聚合设置
    buildAggregation(searchRequest);
    SearchResponse searchResponse = null;
    try {
        searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    } catch (IOException e) {
        e.printStackTrace();
        log.error("课程搜索异常:{}",e.getMessage());
        return new SearchPageResultDto<CourseIndex>(new ArrayList(),0,0,0);
    }

    //结果集处理
    SearchHits hits = searchResponse.getHits();
    SearchHit[] searchHits = hits.getHits();
    //记录总数
    TotalHits totalHits = hits.getTotalHits();
    //数据列表
    List<CourseIndex> list = new ArrayList<>();

    for (SearchHit hit : searchHits) {

        String sourceAsString = hit.getSourceAsString();
        CourseIndex courseIndex = JSON.parseObject(sourceAsString, CourseIndex.class);

        //取出source
        Map<String, Object> sourceAsMap = hit.getSourceAsMap();

        //课程id
        Long id = courseIndex.getId();
        //取出名称
        String name = courseIndex.getName();
        //取出高亮字段内容
        Map<String, HighlightField> highlightFields = hit.getHighlightFields();
        if(highlightFields!=null){
            HighlightField nameField = highlightFields.get("name");
            if(nameField!=null){
                Text[] fragments = nameField.getFragments();
                StringBuffer stringBuffer = new StringBuffer();
                for (Text str : fragments) {
                    stringBuffer.append(str.string());
                }
                name = stringBuffer.toString();

            }
        }
        courseIndex.setId(id);
        courseIndex.setName(name);

        list.add(courseIndex);

    }
    SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits.value,pageNo,pageSize);

    //获取聚合结果
    List<String> mtList= getAggregation(searchResponse.getAggregations(), "mtAgg");
    List<String> stList = getAggregation(searchResponse.getAggregations(), "stAgg");

    pageResult.setMtList(mtList);
    pageResult.setStList(stList);

    return pageResult;
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/146054.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!