如何用MQ实现RPC远程调用?(附代码)

追求适度,才能走向成功;人在顶峰,迈步就是下坡;身在低谷,抬足既是登高;弦,绷得太紧会断;人,思虑过度会疯;水至清无鱼,人至真无友,山至高无树;适度,不是中庸,而是一种明智的生活态度。

导读:本篇文章讲解 如何用MQ实现RPC远程调用?(附代码),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

💗推荐阅读文章💗

🎉本博客知识点收录于🎉👉🚀《RabbitMQ系列教程》🚀—>✈️《RabbitMQ系列教程-第四章-06-RabbitMQ工作模式之RPC模式》✈️

4.6 RPC 模式

4.6.1 简介

以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;

例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;

  • RPC业务分析

在这里插入图片描述

在RPC模式中,客户端和服务器都是Producer也都是Consumer;

RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • RPC调用图解:

在这里插入图片描述

4.6.2 客户端

package com.dfbz.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

    public Connection connection;
    public Channel channel;
    public static final String RPC_QUEUE_NAME = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.132");
        factory.setPort(5672);
        factory.setUsername("lscl");
        factory.setPassword("admin");
        factory.setVirtualHost("/lscl");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception {
        // 初始化信息
        RPCClient rpcClient = new RPCClient();
        // 发起远程调用
        Integer response = rpcClient.call(20);

        System.out.println(response);

        rpcClient.channel.close();
        rpcClient.connection.close();
    }

    public Integer call(Integer money) throws IOException, InterruptedException {

        // 随机生成一个correlationId(密钥)
        final String corrId = UUID.randomUUID().toString();

        // 后期服务端回调给客户端的队列名(随机生成的回调队列名)
        String replyQueueName = channel.queueDeclare().getQueue();
        
        // 设置发送消息的一些参数
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)			// 密钥
                .replyTo(replyQueueName)		// 回调队列名
                .build();

		// 采用Simple模式发送给Server端
        channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));

        // 定义延迟队列
        final BlockingQueue<Integer> response = new ArrayBlockingQueue<>(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

            // 回调方法,当收到消息之后,会自动执行该方法
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                if (properties.getCorrelationId().equals(corrId)) {
                    System.out.println("响应的消息:" + new String(body));

                    // 往延迟队列中添加信息(服务端响应的最新余额)
                    response.offer(Integer.parseInt(new String(body, "UTF-8")));
                }
            }
        });

        // 获取延迟队列中的信息(如果没有信息将一直阻塞)
        return response.take();
    }

    public void close() throws IOException {
        connection.close();
    }
}

4.6.2 服务端

package com.dfbz.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    // 总金额
    private static Integer money = 0;

    /**
     * 加钱方法
     * @param n
     * @return
     */
    private static Integer addMoney(int n) {
        money += n;
        return money;
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.132");
        factory.setPort(5672);
        factory.setUsername("lscl");
        factory.setPassword("admin");
        factory.setVirtualHost("/lscl");

        try (
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()
        ) {

            channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);

            System.out.println("等待客户端请求.....");

            while (true) {

                // 接受到客户端的请求(消息)
                channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {

                    // 回调方法,当收到消息之后,会自动执行该方法
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        
                        // 本次的消息配置
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                                .Builder()
                                .correlationId(properties.getCorrelationId())		// 客户端发送的密钥
                                .build();

                        System.out.println("客户端的消息: "+new String(body,"UTF-8"));
                        String response = "";

                        try {
                            String message = new String(body, "UTF-8");

                            // 调用加钱方法
                            response = addMoney(Integer.parseInt(message)) + "";

                        } finally {

                            // 发送一个消息给客户端
                            /*
                            properties.getReplyTo(): Client端设置的回调队列名
                            replyProps:	封装的参数(主要是CorrelationId)
                             */
                            channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        }
                    }
                });
            }

        }
    }
}

4.6.3 RPC模式小结

严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;

在这里插入图片描述

Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131621.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!