在项目中,要实现订阅发布功能通常会使用高性能的消息队列实现,如kafka、rocketmq等,它们都可以实现高吞吐量的发布订阅功能,其实redis也支持发布订阅功能,只不过功能比较简单,但是对于日常小并发量的使用还是足以应付的,redis的订阅是广播模式的,订阅了同一个主题的多个端都会收到相同的消息,下面使用jedis客户端演示一下如何使用该功能:
首先需要引入jedis客户端:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.0</version>
</dependency>
接下来我们可以定义一个订阅者,并指定订阅的通道,订阅了这个通道后当其他端发布消息时,这里就可以获取到消息:
import redis.clients.jedis.JedisPubSub;
import java.util.concurrent.atomic.AtomicInteger;
public class JedisConsumer extends JedisPubSub {
public static final AtomicInteger INCR = new AtomicInteger(0);
/**
* 接收到消息时的回调
* @param channel 通道名
* @param message 消息内容
*/
@Override
public void onMessage(String channel, String message) {
System.out.println("onMessage method | channel : " + channel + ", rec message : " + message + " | message count : " + INCR.incrementAndGet() + " | " + Thread.currentThread().getName());
}
/**
* 接收到消息时的回调
* @param pattern 匹配模式
* @param channel 通道名
* @param message 消息内容
*/
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("onPMessage method | pattern : " + pattern + ", channel : " + channel + ", rec message : " + message);
}
/**
* 订阅时的回调
* @param channel 通道名
* @param subscribedChannels 订阅的通道数
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe method | channel : " + channel + ", subscribedChannels : " + subscribedChannels);
}
/**
* 取消订阅时的回调
* @param channel 通道名
* @param subscribedChannels 订阅的通道数
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("onUnsubscribe method | channel : " + channel + ", subscribedChannels : " + subscribedChannels);
}
/**
* 取消订阅模式时的回调
* @param pattern 匹配模式
* @param subscribedChannels 订阅的通道数
*/
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("onPUnsubscribe method | pattern : " + pattern + ", subscribedChannels : " + subscribedChannels);
}
/**
* 订阅指定模式时的回调
* @param pattern 匹配模式
* @param subscribedChannels 订阅的通道数
*/
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("onPSubscribe method | pattern : " + pattern + ", subscribedChannels : " + subscribedChannels);
}
@Override
public void onPong(String pattern) {
System.out.println("onPong method | pattern : " + pattern);
}
@Override
public void ping() {
super.ping();
}
@Override
public int getSubscribedChannels() {
return super.getSubscribedChannels();
}
}
定义一个jedis连接池工具类,该工具类中可以获取到jedis连接和归还jedis连接:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.time.Duration;
/**
* jedis连接工具
*/
public class JedisUtil {
/**
* 连接地址
*/
private String host;
/**
* 连接端口号
*/
private int port;
/**
* 密码
*/
private String password;
/**
* 连接池
*/
private JedisPool jedisPool;
/**
* 连接初始化
* @param host
* @param port
* @param password
*/
public JedisUtil(String host, int port, String password) {
this.host = host;
this.port = port;
this.password = password;
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(256);
config.setMaxIdle(256);
config.setMinIdle(1);
config.setMaxWait(Duration.ofMillis(300));
// jedisPool = new JedisPool(config, host, port, 500, password);
jedisPool = new JedisPool(config, this.host, this.port, 500);
}
/**
* 关闭连接池
*/
public void close() {
if(jedisPool != null && !jedisPool.isClosed()) {
jedisPool.clear();
jedisPool.close();
}
}
/**
* 获取连接
* @return
*/
public Jedis getJedis() {
if(jedisPool != null && !jedisPool.isClosed()) {
return jedisPool.getResource();
}
return null;
}
/**
* 归还jedis对象
* @param jedis
*/
public void returnJedis(Jedis jedis) {
if(jedis != null) {
jedis.close();
}
}
}
在定义一个测试类,实现消费者订阅指定的通道:
import redis.clients.jedis.Jedis;
public class JedisTest {
/**
* 订阅的通道名
*/
public static final String CHANNEL_NAME = "test-channel";
public static void main(String[] args) {
JedisUtil util = new JedisUtil("127.0.0.1", 6379, "");
Jedis jedis = util.getJedis();
JedisConsumer consumer = new JedisConsumer();
jedis.subscribe(consumer, CHANNEL_NAME);
}
}
接下来测试一下,开启一个客户端连接到redis服务器,并发送3个命令:
[root@localhost src]# ./redis-cli
127.0.0.1:6379> publish test-channel Hello,world!
(integer) 1
127.0.0.1:6379> publish test-channel Hello,world!
(integer) 1
127.0.0.1:6379> publish test-channel Hello,world!
(integer) 1
127.0.0.1:6379>
在看程序这边可以收到订阅的消息了:
onSubscribe method | channel : test-channel, subscribedChannels : 1
onMessage method | channel : test-channel, rec message : Hello,world! | message count : 1 | main
onMessage method | channel : test-channel, rec message : Hello,world! | message count : 2 | main
onMessage method | channel : test-channel, rec message : Hello,world! | message count : 3 | main
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181858.html