05【Redis的发布订阅】

追求适度,才能走向成功;人在顶峰,迈步就是下坡;身在低谷,抬足既是登高;弦,绷得太紧会断;人,思虑过度会疯;水至清无鱼,人至真无友,山至高无树;适度,不是中庸,而是一种明智的生活态度。

导读:本篇文章讲解 05【Redis的发布订阅】,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

五、Redis的发布订阅(pub/sub)

1.1 简介

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

发送者(发布者)并不是直接发送它们的消息给指定的接收者(订阅者),而是将消息发布到特定的消息通道,并且不需要知道订阅者的任何信息。订阅者可以订阅一个或多个感兴趣的消息通道,同时也只会收到他们感兴趣通道的信息,而不用去关心是谁发布的。这种发布者与订阅者的解耦,使其具备更强的扩展性并得到一个更加动态的网络拓扑。

订阅:客户端订阅喜欢的频道。

在这里插入图片描述

发布:消息发送给指定的频道,由频道发送给订阅它的客户端。

在这里插入图片描述

1.2 订阅发布应用

  • 订阅频道
subscribe channel [channel ...] 

示例:订阅a、b、c频道

subscribe a b c
  • 发布消息
publish channel message

示例:给a频道发送hello

publish a hello

在这里插入图片描述

  • 查看现有多少频道
127.0.0.1:6379> pubsub channels
1) "a"
2) "e"
3) "c"
4) "b"
127.0.0.1:6379>
  • 退订给定的频道
unsubscribe channel [channel ...]

示例:退订a频道

unsubscribe a 

tips:注意:由于redis客户端订阅操作会占用当前客户端窗口,因此执行不了任何redis命令,退订频道命令一般用于程序客户端操作使用(如Java客户端、C客户端、PHP客户端等)

  • 订阅多个符合条件的频道
psubscribe parent [parent ...]

示例:订阅所有以a结尾的频道

psubscribe *a
  • 退订多个符合条件的频道
punsubscribe parent [parent ...]

示例:退订以a结尾的所有频道

punsubscribe *a *b *c

案例:

client1:

127.0.0.1:6379> psubscribe goods.*

client2:

127.0.0.1:6379> psubscribe order.*

client3:

127.0.0.1:6379> publish goods.save hello~
(integer) 1
127.0.0.1:6379> publish ace hi~
(integer) 1
127.0.0.1:6379> 

观测变化:

在这里插入图片描述

1.3 Java操作发布订阅API

1.3.1 Jedis操作发布订阅

在Jedis中提供有JedisPubSub类,该类主要用于监听触发发布订阅指定命令的执行;当有发布订阅相关命令执行时,就会触发JedisPubSub中指定的方法;

  • JedisPubSub提供的方法如下:
方法 触发时机
onSubscribe 当有频道订阅时触发(subscribe)
onMessage 当有频道收到消息时触发(publish)
onUnsubscribe 当有频道退订时触发(unsubscribe)
onPSubscribe 当使用psubscribe命令订阅一批频道时触发
onPUnsubscribe 当使用punsubscribe命令退订一批频道时触发
onPMessage 当使用pubsub命令时调用
  • 编写监听类:
package com.dfbz.listener;

import redis.clients.jedis.JedisPubSub;

/**
 * @author lscl
 * @version 1.0
 * @intro:
 */
public class PubSubListener extends JedisPubSub {

    /**
     * 当有频道进行订阅时调用(subscribe)
     *
     * @param channel:              订阅的频道
     * @param subscribedChannels:   当前jedis连接订阅了几个频道
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("有频道【" + channel + "】订阅了: " + subscribedChannels);
    }

    /**
     * 当频道收到消息时调用(publish)
     *
     * @param channel
     * @param message
     */
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("接收到来自【" + channel + "】的信息【" + message + "】");

        // 如果发送的消息是close时退订这个频道
        if("close".equals(message)){
            this.unsubscribe(channel);
        }
    }

    /**
     * 取消订阅频道时调用(unsubscribe)
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("有频道【" + channel + "】取消订阅了: " + subscribedChannels);
    }

    /**
     * 使用psubscribe命令订阅一批频道时触发
     *
     * @param pattern
     * @param subscribedChannels
     */
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("使用【" + pattern + "】表达式订阅了频道: " + subscribedChannels);

    }

    /**
     * 使用punsubscribe命令退订一批频道时触发
     *
     * @param pattern
     * @param subscribedChannels
     */
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("使用【" + pattern + "】表达式退订了频道: " + subscribedChannels);
    }


    /**
     * 使用pubsub命令时触发
     *
     * @param pattern
     * @param channel
     * @param message
     */
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("订阅表达式为【" + pattern + "】,订阅的频道是【" + channel + "】,消息【" + message + "】");
    }
}
  • 测试类:
package com.dfbz.demo01;

import com.dfbz.listener.PubSubListener;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * @author lscl
 * @version 1.0
 * @intro:
 */
public class Demo01 {

    private JedisPool jedisPool;

    @Before
    public void before() {
        jedisPool = new JedisPool("localhost", 6379);
    }

    @Test
    public void test1() throws Exception {

        PubSubListener pubSubListener = new PubSubListener();
        // 从连接池获取一个连接
        Jedis jedis = jedisPool.getResource();

        jedis.subscribe(pubSubListener, "a", "b", "c");

    }

    @Test
    public void test2() throws Exception {
        Jedis jedis = jedisPool.getResource();

        jedis.publish("a", "a say hello~");
        jedis.publish("b", "b say hello~");
        jedis.publish("c", "c say hello~");

        // 退订a频道
        jedis.publish("a", "close");

        // 退订是比较耗时的操作
        Thread.sleep(10);
        jedis.publish("a", "a say hello~");
        jedis.publish("b", "b say hello~");
        jedis.publish("c", "c say hello~");
    }
}

测试pattern模式:

package com.dfbz.demo01;

import com.dfbz.listener.PubSubListener;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * @author lscl
 * @version 1.0
 * @intro:
 */
public class Demo02 {

    private JedisPool jedisPool;

    @Before
    public void before() {
        jedisPool = new JedisPool("localhost", 6379);
    }

    @Test
    public void test1() throws Exception {

        PubSubListener pubSubListener = new PubSubListener();
        // 从连接池获取一个连接
        Jedis jedis = jedisPool.getResource();

        jedis.psubscribe(pubSubListener, "goods.*", "order.*");

    }

    @Test
    public void test2() throws Exception {
        Jedis jedis = jedisPool.getResource();

        jedis.publish("goods.save","goods.save hello~");
        jedis.publish("goods.delete","goods.delete hello~");
        jedis.publish("order.query","order.query hello~");
        jedis.publish("order.update","order.update hello~");
    }
}

1.3.2 SpringBoot操作发布订阅

  • 1)引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dfbz</groupId>
    <artifactId>02_Redis_SpringBoot</artifactId>
    <version>1.0-SNAPSHOT</version>


    <parent>
        <artifactId>spring-boot-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.0.1.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>
  • 2)编写监听器:

CRMListener:

package com.dfbz.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

/**
 * @author lscl
 * @version 1.0
 * @intro: 监听CRM业务相关的消息
 */
@Component
public class CRMListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] bytes) {
        // redis序列化工具
        RedisSerializer<?> serializer = redisTemplate.getValueSerializer();

        // 频道
        byte[] channel = message.getChannel();

        // 发送的数据
        byte[] body = message.getBody();
        System.out.println("我是CRMListener--来自【" + new String(channel) + "】频道的消息:【" +serializer.deserialize(body) + "】");
    }
}

OAListener:

package com.dfbz.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

/**
 * @author lscl
 * @version 1.0
 * @intro: 监听OA业务相关的消息
 */
@Component
public class OAListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // redis序列化工具
        RedisSerializer<?> serializer = redisTemplate.getValueSerializer();

        // 频道
        byte[] channel = message.getChannel();

        // 发送的数据
        byte[] body = message.getBody();

        System.out.println("我是OAListener--来自【" + new String(channel) + "】频道的消息:【" + serializer.deserialize(body).toString() + "】");
    }
}

  • 启动类(注册监听):
package com.dfbz;

import com.dfbz.listener.CRMListener;
import com.dfbz.listener.OAListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.util.ArrayList;
import java.util.List;

/**
 * @author lscl
 * @version 1.0
 * @intro:
 */
@SpringBootApplication
public class RedisApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisApplication.class);
    }

    // 封装成监听适配器
    @Bean
    public MessageListenerAdapter CRMListenerAdapter(CRMListener crmListener) {
        return new MessageListenerAdapter(crmListener);
    }

    // 封装成监听适配器
    @Bean
    public MessageListenerAdapter OAListenerAdapter(OAListener oaListener) {
        return new MessageListenerAdapter(oaListener);
    }


    // 注册监听适配器
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory redisConnectionFactory,
            MessageListenerAdapter CRMListenerAdapter,
            MessageListenerAdapter OAListenerAdapter
    ) {

        // 监听规则集合
        List<Topic> oaList = new ArrayList<Topic>();

        // 普通订阅,订阅具体的频道
        ChannelTopic deptTopic = new ChannelTopic("dept");
        oaList.add(deptTopic);

        // 模式订阅,支持模式匹配订阅,*为模糊匹配符
        PatternTopic userTopic = new PatternTopic("user.*");
        oaList.add(userTopic);

        // 监听规则集合
        List<Topic> crmList = new ArrayList<Topic>();

        // 普通订阅,订阅具体的频道
        ChannelTopic examineTopic = new ChannelTopic("examine");
        crmList.add(examineTopic);

        // 模式订阅,支持模式匹配订阅,*为模糊匹配符
        PatternTopic reportTopic = new PatternTopic("report.*");
        crmList.add(reportTopic);

        // 监听容器
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(CRMListenerAdapter, crmList);
        redisMessageListenerContainer.addMessageListener(OAListenerAdapter, oaList);
        return redisMessageListenerContainer;
    }
}
  • 测试类:
package com.dfbz.demo01;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author lscl
 * @version 1.0
 * @intro:
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo01 {
    @Autowired
    private RedisTemplate redisTemplate;


    @Test
    public void test1(){

        redisTemplate.convertAndSend("dept","dept hello~");
        redisTemplate.convertAndSend("user.save","user.save hello~");
        redisTemplate.convertAndSend("user.delete","user.delete hello~");
        redisTemplate.convertAndSend("examine","examine hello~");
        redisTemplate.convertAndSend("report.query","report.query hello~");
        redisTemplate.convertAndSend("report.update","report.update hello~");

    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131734.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!