Redis实现发布订阅功能

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。Redis实现发布订阅功能,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

在项目中,要实现订阅发布功能通常会使用高性能的消息队列实现,如kafka、rocketmq等,它们都可以实现高吞吐量的发布订阅功能,其实redis也支持发布订阅功能,只不过功能比较简单,但是对于日常小并发量的使用还是足以应付的,redis的订阅是广播模式的,订阅了同一个主题的多个端都会收到相同的消息,下面使用jedis客户端演示一下如何使用该功能:

首先需要引入jedis客户端:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.0</version>
</dependency>

接下来我们可以定义一个订阅者,并指定订阅的通道,订阅了这个通道后当其他端发布消息时,这里就可以获取到消息:

import redis.clients.jedis.JedisPubSub;

import java.util.concurrent.atomic.AtomicInteger;

public class JedisConsumer extends JedisPubSub {

    public static final AtomicInteger INCR = new AtomicInteger(0);

    /**
     * 接收到消息时的回调
     * @param channel       通道名
     * @param message       消息内容
     */
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("onMessage method | channel : " + channel + ", rec message : " + message + " | message count : " + INCR.incrementAndGet() + " | " + Thread.currentThread().getName());
    }

    /**
     * 接收到消息时的回调
     * @param pattern       匹配模式
     * @param channel       通道名
     * @param message       消息内容
     */
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("onPMessage method | pattern : " + pattern + ", channel : " + channel + ", rec message : " + message);
    }

    /**
     * 订阅时的回调
     * @param channel               通道名
     * @param subscribedChannels    订阅的通道数
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("onSubscribe method | channel : " + channel + ", subscribedChannels : " + subscribedChannels);
    }

    /**
     * 取消订阅时的回调
     * @param channel               通道名
     * @param subscribedChannels    订阅的通道数
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("onUnsubscribe method | channel : " + channel + ", subscribedChannels : " + subscribedChannels);
    }

    /**
     * 取消订阅模式时的回调
     * @param pattern               匹配模式
     * @param subscribedChannels    订阅的通道数
     */
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("onPUnsubscribe method | pattern : " + pattern + ", subscribedChannels : " + subscribedChannels);
    }

    /**
     * 订阅指定模式时的回调
     * @param pattern               匹配模式
     * @param subscribedChannels    订阅的通道数
     */
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("onPSubscribe method | pattern : " + pattern + ", subscribedChannels : " + subscribedChannels);
    }

    @Override
    public void onPong(String pattern) {
        System.out.println("onPong method | pattern : " + pattern);
    }

    @Override
    public void ping() {
        super.ping();
    }

    @Override
    public int getSubscribedChannels() {
        return super.getSubscribedChannels();
    }
}

定义一个jedis连接池工具类,该工具类中可以获取到jedis连接和归还jedis连接:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.time.Duration;

/**
 * jedis连接工具
 */
public class JedisUtil {

    /**
     * 连接地址
     */
    private String host;
    /**
     * 连接端口号
     */
    private int port;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接池
     */
    private JedisPool jedisPool;

    /**
     * 连接初始化
     * @param host
     * @param port
     * @param password
     */
    public JedisUtil(String host, int port, String password) {
        this.host = host;
        this.port = port;
        this.password = password;

        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(256);
        config.setMaxIdle(256);
        config.setMinIdle(1);
        config.setMaxWait(Duration.ofMillis(300));

//        jedisPool = new JedisPool(config, host, port, 500, password);
        jedisPool = new JedisPool(config, this.host, this.port, 500);
    }

    /**
     * 关闭连接池
     */
    public void close() {
        if(jedisPool != null && !jedisPool.isClosed()) {
            jedisPool.clear();
            jedisPool.close();
        }
    }

    /**
     * 获取连接
     * @return
     */
    public Jedis getJedis() {
        if(jedisPool != null && !jedisPool.isClosed()) {
            return jedisPool.getResource();
        }
        return null;
    }

    /**
     * 归还jedis对象
     * @param jedis
     */
    public void returnJedis(Jedis jedis) {
        if(jedis != null) {
            jedis.close();
        }
    }
}

在定义一个测试类,实现消费者订阅指定的通道:

import redis.clients.jedis.Jedis;

public class JedisTest {

    /**
     * 订阅的通道名
     */
    public static final String CHANNEL_NAME = "test-channel";

    public static void main(String[] args) {
        JedisUtil util = new JedisUtil("127.0.0.1", 6379, "");

        Jedis jedis = util.getJedis();
        JedisConsumer consumer = new JedisConsumer();
        jedis.subscribe(consumer, CHANNEL_NAME);
    }
}

接下来测试一下,开启一个客户端连接到redis服务器,并发送3个命令:

[root@localhost src]# ./redis-cli 
127.0.0.1:6379> publish test-channel Hello,world!
(integer) 1
127.0.0.1:6379> publish test-channel Hello,world!
(integer) 1
127.0.0.1:6379> publish test-channel Hello,world!
(integer) 1
127.0.0.1:6379> 

在看程序这边可以收到订阅的消息了:

onSubscribe method | channel : test-channel, subscribedChannels : 1
onMessage method | channel : test-channel, rec message : Hello,world! | message count : 1 | main
onMessage method | channel : test-channel, rec message : Hello,world! | message count : 2 | main
onMessage method | channel : test-channel, rec message : Hello,world! | message count : 3 | main

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181858.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!