💗推荐阅读文章💗
- 🌸JavaSE系列🌸👉1️⃣《JavaSE系列教程》
- 🌺MySQL系列🌺👉2️⃣《MySQL系列教程》
- 🍀JavaWeb系列🍀👉3️⃣《JavaWeb系列教程》
- 🌻SSM框架系列🌻👉4️⃣《SSM框架系列教程》
🎉本博客知识点收录于🎉👉🚀《RabbitMQ系列教程》🚀—>✈️《RabbitMQ系列教程-第四章-06-RabbitMQ工作模式之RPC模式》✈️
4.6 RPC 模式
4.6.1 简介
以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;
例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;
- RPC业务分析
在RPC模式中,客户端和服务器都是Producer也都是Consumer;
RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
- RPC调用图解:
4.6.2 客户端
package com.dfbz.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
public Connection connection;
public Channel channel;
public static final String RPC_QUEUE_NAME = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.132");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) throws Exception {
// 初始化信息
RPCClient rpcClient = new RPCClient();
// 发起远程调用
Integer response = rpcClient.call(20);
System.out.println(response);
rpcClient.channel.close();
rpcClient.connection.close();
}
public Integer call(Integer money) throws IOException, InterruptedException {
// 随机生成一个correlationId(密钥)
final String corrId = UUID.randomUUID().toString();
// 后期服务端回调给客户端的队列名(随机生成的回调队列名)
String replyQueueName = channel.queueDeclare().getQueue();
// 设置发送消息的一些参数
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId) // 密钥
.replyTo(replyQueueName) // 回调队列名
.build();
// 采用Simple模式发送给Server端
channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));
// 定义延迟队列
final BlockingQueue<Integer> response = new ArrayBlockingQueue<>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
// 回调方法,当收到消息之后,会自动执行该方法
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
System.out.println("响应的消息:" + new String(body));
// 往延迟队列中添加信息(服务端响应的最新余额)
response.offer(Integer.parseInt(new String(body, "UTF-8")));
}
}
});
// 获取延迟队列中的信息(如果没有信息将一直阻塞)
return response.take();
}
public void close() throws IOException {
connection.close();
}
}
4.6.2 服务端
package com.dfbz.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
// 总金额
private static Integer money = 0;
/**
* 加钱方法
* @param n
* @return
*/
private static Integer addMoney(int n) {
money += n;
return money;
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.132");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()
) {
channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);
System.out.println("等待客户端请求.....");
while (true) {
// 接受到客户端的请求(消息)
channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {
// 回调方法,当收到消息之后,会自动执行该方法
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 本次的消息配置
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId()) // 客户端发送的密钥
.build();
System.out.println("客户端的消息: "+new String(body,"UTF-8"));
String response = "";
try {
String message = new String(body, "UTF-8");
// 调用加钱方法
response = addMoney(Integer.parseInt(message)) + "";
} finally {
// 发送一个消息给客户端
/*
properties.getReplyTo(): Client端设置的回调队列名
replyProps: 封装的参数(主要是CorrelationId)
*/
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
}
}
});
}
}
}
}
4.6.3 RPC模式小结
严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;
Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131621.html