远程调用失败kafka重试方案

大家好呀,我是小羊,如果大家喜欢我的文章的话,就关注我一起学习进步吧~

一.简介

今天微服务和远程调用已经使用的很广泛了,可以解决我们很多的问题,不过由于远程调用不可控因素更多,失败的可能性更大,但是一些接口可能需要很高的要求,需要每一次调用都需要成功。比如订单流转。调用失败订单就丢失了,影响很大。我这里介绍一种简单的解决办法,采用消息队列解构接口调用,定时器重新发送,mysql持久化。配合人工处理可以较好的解决这个问题。

二.时序图

远程调用失败kafka重试方案

三.项目简介

3.1创建springboot 父子工程

service-rpc-kafka
├── service-api -- 系统共用api模块
├── service-consumer -- rpc服务消费模块(8062)
├── service-producter-- rpc服务生产模块(8061)
├── service-rpc-fail-dispose -- rpc失败服务处理模块(8063)
├── sql -- 数据库初始化脚本

3.2父工程pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>service-api</module>
        <module>service-consumer</module>
        <module>service-producer</module>
        <module>service-rpc-fail-dispose</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.yangzheng</groupId>
    <artifactId>service-rpc-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-rpc-kafka</name>
    <description>远程调用kafka解耦,处理远程调用失败</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!--引入dubbo环境-->
        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>0.2.0</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.29</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.freemarker</groupId>
            <artifactId>freemarker</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>

        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>

        <!-- hutool 工具集合包 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.5.0</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.3 api接口依赖

TestService  在service-api 模块中给其他服务依赖使用

package com.yangzheng.service;

import com.yangzheng.vo.UserVo;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: TestService
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 11:30
 */

public interface TestService {

    String test(UserVo userVo);
}

UserVo

import lombok.Data;

import java.io.Serializable;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: UserVo
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 15:59
 */

@Data
public class UserVo implements Serializable {
    private String name;
    private Integer age;
}

3.4 producer 服务生产者

启动类加上开启dubbo

@EnableDubbo

yml配置文件

dubbo:
  application:
    name: service-producer
  registry:
    address: localhost:2181
    protocol: zookeeper
    check: false
  protocol:
    name: dubbo
    port: 30003
  monitor:
    protocol: register
  consumer:
    check: false
    timeout: 3000

server:
  port: 8061


spring:
  datasource:
    name: user
    driver-class-namecom.mysql.cj.jdbc.Driver
    ###################以下为druid增加的配置###########################
    typecom.alibaba.druid.pool.DruidDataSource
    urljdbc:mysql://localhost:3306/service_rpc_kafka?serverTimezone
=Asia/Chongqing&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8&useSSL=false&verifyServerCertificate=false&autoReconnct=true&autoReconnectForPools=true&allowMultiQueries=true
    username: root
    password: 123456
    # 下面为连接池的补充设置,应用到上面所有数据源中
    # 初始化大小,最小,最大
    initialSize: 5
    minIdle: 5
    maxActive: 100
    # 配置获取连接等待超时的时间
    maxWait: 60000
    # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
    timeBetweenEvictionRunsMillis: 60000
    # 配置一个连接在池中最小生存的时间,单位是毫秒
    minEvictableIdleTimeMillis: 300000
    validationQuery: SELECT 1 FROM DUAL
    testWhileIdle: true
    testOnBorrow: false
    testOnReturn: false
    # 打开PSCache,并且指定每个连接上PSCache的大小
    poolPreparedStatements: true
    maxPoolPreparedStatementPerConnectionSize: 20
    # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
    filters: stat,wall,logback
    # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
    connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    # 合并多个DruidDataSource的监控数据
    useGlobalDataSourceStat: true

    # druid recycle
    removeAbandoned:  true
    removeAbandonedTimeout: 300
    logAbandoned: true
    ###############以上为配置druid添加的配置########################################
  transaction:
    rollback-on-commit-failure: true

业务实现服务,测试接口是否调用成功

import com.alibaba.dubbo.config.annotation.Service;
import com.yangzheng.service.TestService;
import com.yangzheng.vo.UserVo;
import lombok.extern.slf4j.Slf4j;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: TestServiceImpl
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 11:34
 */

@Service
@Slf4j
public class TestServiceImpl implements TestService {
    @Override
    public String test(UserVo userVo) {
        log.info("dubbo服务调用成功,服务调用者为"+userVo.getName());
        return "dubbo服务调用成功,服务调用者为"+userVo.getName();
    }
}

3.5 服务消费者

controller 先发送到 业务 topic

import com.alibaba.fastjson.JSON;
import com.yangzheng.vo.UserVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: TestController
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 13:17
 */

@RestController
public class TestController {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @RequestMapping("/test")
    public String test(){
        UserVo userVo = new UserVo();
        userVo.setName("yangzheng");
        userVo.setAge(18);
        kafkaTemplate.send("test", JSON.toJSONString(userVo, true));
        return "success";
    }
}

kafka listener监听topic,消费消息调用服务生产者的接口,成功就调用成功的工具类方法,失败就调用失败的工具类方法

import cn.hutool.json.JSONUtil;
import com.alibaba.druid.util.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.yangzheng.service.TestService;
import com.yangzheng.serviceconsumer.util.MqMessageUtil;
import com.yangzheng.vo.UserVo;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: KafkaTestListener
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 13:51
 */

@Component
@Slf4j
public class KafkaTestListener {


    @Reference(check=false, timeout = 60000)
    TestService testService;


    @KafkaListener(topics = "test",groupId = "test")
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            JSONObject json = JSONObject.fromObject(record.value());
            log.info("test 消费了: Topic:" + topic + ",Message:" + json);
            try {
                UserVo userVo = JSONUtil.toBean(json.toString(), UserVo.class);
                String result = testService.test(userVo);
                if (!StringUtils.isEmpty(result)) {
                    MqMessageUtil.handleSuccessMsg(record);
                } else {
                    MqMessageUtil.handleFailMsg(record);
                }
            } catch (Exception e) {
                MqMessageUtil.handleFailMsg(record);
                log.error("",e);
            }
            ack.acknowledge();
        }
    }
}

MqMessageUtil工具类,发送消息到调用接口失败或成功的topic,用于失败重试

import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.core.KafkaTemplate;

/**
 * @author yudong
 * @date 2019/12/31
 */

public class MqMessageUtil {
    public static final String RESEND_FLAG = "mq_message_resend_flag";
    private static KafkaTemplate<String, Object> kafkaTemplate = SpringUtil.getApplicationContext().getBean(KafkaTemplate.class);

    public static void handleSuccessMsg(ConsumerRecord<?, ?> record) {
        JSONObject json = JSONObject.fromObject(record.value());
        if (json.has(RESEND_FLAG)) {
            int id = json.getInt(RESEND_FLAG);
            JSONObject object = new JSONObject();
            object.put("id", id);
            kafkaTemplate.send("mall_moonmall_statistical_success", object.toString());
        }
    }

    public static void handleFailMsg(ConsumerRecord<?, ?> record) {
        String key = record.key() != null ? String.valueOf(record.key()) : "";
        JSONObject json = new JSONObject();
        json.put("topic", record.topic());
        json.put("key", key);
        json.put("value", record.value());
        kafkaTemplate.send("mall_moonmall_statistical_fail", json.toString());
    }

}

SpringUtil

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringUtil implements ApplicationContextAware {

 private static ApplicationContext applicationContext;
 
 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  if(SpringUtil.applicationContext == null){
            SpringUtil.applicationContext  = applicationContext;
     }
  System.out.println("========ApplicationContext配置成功,在普通类可以通过调用SpringUtils.getAppContext()获取applicationContext对象,applicationContext="+SpringUtil.applicationContext+"========");
 }
 
 public static ApplicationContext getApplicationContext() {
  return applicationContext;
 }
}

3.6调用消息处理

监听 MqMessageUtil 发送过来的消息,第一次失败就插入数据库,其他失败就直接返回

import com.yangzheng.servicerpcfaildispose.dao.MallMqLogMapper;
import com.yangzheng.servicerpcfaildispose.model.MallMqLog;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: RpcDisposeListener
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 15:44
 */

@Component
@Slf4j
public class RpcDisposeListener {

    @Autowired
    private MallMqLogMapper mallMqLogMapper;


    /**
     * 监听成功调用
     * @param record
     * @param ack
     * @param topic
     */

    @KafkaListener(topics = "mall_moonmall_statistical_success",groupId = "test")
    public void RpcSuccess(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("mall_moonmall_statistical_success 消费了: Topic:" + topic + ",Message:" + msg);
            JSONObject json = JSONObject.fromObject(record.value());
            int id = json.getInt("id");
            MallMqLog mallMqLog = new MallMqLog();
            mallMqLog.setId(id);
            mallMqLog.setMqStatus(2);
            int affect = mallMqLogMapper.updateById(mallMqLog);
            log.info("updateSuccessMq affect:{},{}", id, affect);
            ack.acknowledge();
        }
    }
    /**
     * 监听失败调用
     * @param record
     * @param ack
     * @param topic
     */

    @KafkaListener(topics = "mall_moonmall_statistical_fail",groupId = "test")
    public void RpcFail(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("mall_moonmall_statistical_fail 消费了: Topic:" + topic + ",Message:" + msg);
            JSONObject json = JSONObject.fromObject(record.value());
            String topicRpc = json.getString("topic");
            String key = json.getString("key");
            String valueStr = json.getString("value");
            JSONObject valueJson = JSONObject.fromObject(valueStr);
            if (valueJson.has("mq_message_resend_flag")) {
                return;
            }
            MallMqLog mallMqLog = new MallMqLog();
            mallMqLog.setTopic(topicRpc);
            mallMqLog.setMqKey(key);
            mallMqLog.setMqValue(valueStr);
            mallMqLog.setResendTimes(0);
            mallMqLog.setMqStatus(2);
            mallMqLogMapper.insert(mallMqLog);
            ack.acknowledge();
        }
    }
}

定时器读取数据库里面的需要重发的消息,重新发给业务topic调用服务

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.mapper.Wrapper;
import com.baomidou.mybatisplus.toolkit.StringUtils;
import com.google.common.collect.Lists;
import com.yangzheng.servicerpcfaildispose.dao.MallMqLogMapper;
import com.yangzheng.servicerpcfaildispose.model.MallMqLog;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Date;
import java.util.List;

/**
 * @author yangzheng
 * @Description: //TODO
 * @Title: RpcFailSchedule
 * @ProjectName service-rpc-kafka
 * @date 2020/6/3/003 16:35
 */

@Component
@Slf4j
public class RpcFailSchedule {

    @Autowired
    MallMqLogMapper mallMqLogMapper;

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Scheduled(cron = "* */1 * * * ?")
    public void handleFailRpc() {
        List<String> resList = Lists.newArrayList();
        Wrapper<MallMqLog> wrapper = new EntityWrapper<>();
        wrapper.and("mq_status",1);
        wrapper.le("resend_times",6);

        // 查询所有处理失败的消息
        List<MallMqLog> list = mallMqLogMapper.selectList(wrapper);
        for (MallMqLog mallMqLog : list) {
            resList.add(mallMqLog.getId() + "," + mallMqLog.getTopic() + "," + mallMqLog.getMqKey());
            Integer resendTimes = mallMqLog.getResendTimes();
            String value = mallMqLog.getMqValue();
            JSONObject json = JSONObject.fromObject(value);
            // 添加重发标志
            json.put("mq_message_resend_flag", mallMqLog.getId());
            value = json.toString();
            String key = StringUtils.isNotEmpty(mallMqLog.getMqKey()) ? mallMqLog.getMqKey() : null;
            // 重新发送处理失败的消息
            kafkaTemplate.send(mallMqLog.getTopic(), key, value)
                    .addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                        @Override
                        public void onFailure(Throwable ex) {
                            // do nothing
                        }

                        @Override
                        public void onSuccess(SendResult<String, Object> result) {
                            // 发送成功,重发次数加1
                            MallMqLog log = new MallMqLog();
                            log.setId(mallMqLog.getId());
                            Integer times = (resendTimes + 1);
                            log.setResendTimes(times);
                            log.setOpTime(new Date());
                            mallMqLogMapper.updateById(log);
                        }
                    });
        }
        log.info("rehandleFailMq:{}", resList);
    }
}

四.总结

这个方法在我们公司的实际生产环境中有用到,效果还不错,因为大版本上线或者系统切换的时候,有可能会有很多原因导致服务调用失败,所以我们就做了这个重试机制,自动重试一定次数,超过最大次数就短信报警人工解决问题,解决问题后,把数据库中失败消息的重试次数改小一点,这些失败的调用又可以自动重试了,不用再人工处理。这个方法优点使用范围比较广,做好之后其他服务想用只有用工具类发消息即可,但是也有一定的缺点,就是流程变得更长了。

五.源码

service-rpc-kafka

https://github.com/Yanyf765/service-rpc-kafka

喜欢这篇文章就给点个赞吧


原文始发于微信公众号(小羊架构):远程调用失败kafka重试方案

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/171794.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!