SpringBoot实战:整合Redis实现发布/订阅功能

前言

Redis 作为一个高性能的内存类型数据库,除了支持缓存与持久化外,还可以使用 Redis  实现延迟队列功能。接下来我们来聊聊如何使用 Redis 来实现 延迟队列功能(发布/订阅)Pub/Sub



Redis 发布/订阅简介

发布/订阅是一种消息通信模式,发布者发布消息,订阅者接收消息。发布者和订阅者之间没有之间的联系,消息由消息中间件传递。

SpringBoot实战:整合Redis实现发布/订阅功能

❷ Redis 发布/订阅的优点

  • 高性能:依托Redis的内存存储特性,其发布/订阅机制展现出极高的读写效率,能够迅速处理并分发消息,确保系统响应的敏捷性。

  • 简单易用:Redis的发布/订阅API设计直观,便于快速集成到各种应用程序中,降低了开发和维护的复杂度。

  • 实时性强:一旦消息被发布,Redis能够即时地将这些消息推送给所有活跃的订阅者,确保了信息传递的实时性和同步性。

❸ Redis 发布/订阅的缺点

  • 消息丢失风险:鉴于Redis是内存数据库,若发生Redis实例故障或重启,尚未处理的消息可能会因未持久化而丢失,影响消息传递的可靠性。

  • 缺乏持久化支持:Redis的发布/订阅模式当前不提供消息持久化功能,意味着历史消息无法被存储和后续检索,限制了消息系统的应用场景。

  • 订阅者管理难题:发布者无法直接管理订阅者的数量和状态,这可能导致消息传递的不确定性,无法保证所有预期的订阅者都能成功接收到消息。

  • 无消息确认机制:Redis的发布/订阅模式没有内置的消息确认机制,发布者无法直接确认订阅者是否已成功接收并处理了消息,增加了消息传递的不确定性。

❹ Redis发布订阅命令

SpringBoot实战:整合Redis实现发布/订阅功能

示例代码:


步骤一

开始之前需要首先配置好 Redis 服务器,在pom.xml 文件中添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

步骤二、配置相关参数

在项目的配置文件中,配置 Redis 相关配置

spring:
    #redis
    redis:
        # 地址
        host: 127.0.0.1
        # 端口,默认为6379
        port: 6379
        # 数据库索引
        database: 0
        # 密码
        password:
        # 连接超时时间
        timeout: 10s
        lettuce:
            pool:
                # 连接池中的最小空闲连接
                min-idle: 5
                # 连接池中的最大空闲连接
                max-idle: 8
                # 连接池的最大数据库连接数
                max-active: 20
                # #连接池最大阻塞等待时间(使用负值表示没有限制)
                max-wait: -1ms

步骤三:创建redis配置类

package com.example.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter)
 
{
        //设置连接工厂RedisConnectionFactory
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅订阅名称 micro 的通道
        container.addMessageListener(listenerAdapter, new ChannelTopic("micro"));
        // 订阅名称 'test-' 开头的全部通道
        container.addMessageListener(listenerAdapter, new PatternTopic("test-*"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        return new MessageListenerAdapter(receiver);
    }
}

步骤四、创建消息发布工具类

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessagePublisher {
    private final StringRedisTemplate redisTemplate;

    public MessagePublisher(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    public void publish(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

接收类

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageReceiver implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        //消息通道
        String channel = new String(message.getChannel());
        //消息内容
        String messageBody = new String(message.getBody());
        // 消息订阅的匹配规则,如 new PatternTopic("test-*") 中的 test-*
        String msgPattern = new String(pattern);
        log.info("接收消息: channel={} body={} pattern={} ", channel, messageBody, msgPattern);
        // 这里处理接收的消息
    }
}

步骤五、创建测试类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api")
public class PublisherController {

    @Autowired
    private MessagePublisher messagePublisher;

    @GetMapping("/publish")
    public String publish(@RequestParam String message) {
        messagePublisher.publish("micro", message);
        return "Message published: " + message;
    }
}

测试验证

SpringBoot实战:整合Redis实现发布/订阅功能

查看控制台, Redis实现发布和订阅功能执行情况

SpringBoot实战:整合Redis实现发布/订阅功能


原文始发于微信公众号(Java技术前沿):SpringBoot实战:整合Redis实现发布/订阅功能

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/299668.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!