RabbitMQ学习一快速认知和初步使用

导读:本篇文章讲解 RabbitMQ学习一快速认知和初步使用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

什么是MQ消息中间件及使用场景

全称MessageQueue,主要是⽤于程序和程序直接通信,异步+解耦+削峰

异步:用户注册-》发送邮件(注册活动)

解耦:订单系统-》物流系统

削峰:秒杀,日志处理

JMS与AMQP

什么是JMS: Java消息服务(Java Message Service),Java平 台中关于⾯向消息中间件的接⼝

JMS是⼀种与⼚商⽆关的 API,⽤来访问消息收发系统 消息,它类似于JDBC(Java

Database Connectivity)。 这⾥,JDBC 是可以⽤来访问许多不同关系数据库的 API

代表:kafka rocketMQ

消息类型:TextMessage/ObjectMessage/StreamMessage

什么是AMQP: AMQP不从API层进⾏限定,直接定义⽹络交换的数据 格式,这使得实现了AMQP

provider天然性就是跨平台    ⽐如Java语⾔产⽣的消息,可以⽤其他语⾔ ⽐如python 的进⾏消费

AQMP可以⽤http来进⾏类⽐,不关⼼实现接⼝的语⾔,只要都按照相应的数据格式去发送报⽂请

求,不同语⾔的client可以和不同语⾔的server进⾏通讯

代表:rabbitMQ

消息类型:Byte[]

RabbitMQ核心概念

Broker: RabbitMQ的服务端程序,可以认为一个mq节点就是一个broker

Producer: 创建消息,然后发布到rabbitMQ

Consumer: 消费队列里的消息

Queue:是rabbitMQ的内部对象,用于存储消息,消息只能存储在队列里面

Channel: ⼀条⽀持多路复⽤的通道,独⽴的双向数据流通 道,可以发布、订阅、接收消息。

         信道是建⽴在真实的TCP连接内的虚拟连接,复⽤TCP连接的通道

Connection连接 RabbitMQsocket链接,它封装了socket协议相 关部分逻辑,⼀个连接上可

         以有多个channel进⾏通信

Exchange:  生产者将消息发送到exchange,exchange将消息路由到一个或多个队列中,队列和交

         换机可以是多对多的关系

Routingkey:producer将消息发送到exchange时,一般会指定一个routingkey,用来指定这个息 

         的路由规则

Binding:通过绑定将exchange和queue关联起来,在绑定的时候一般会指定一个绑定key

     (bindingkey),这样rabbitMQ就知道如果将消息路由到对应的队列中了

Virtual host 虚拟主机:⽤于不同业务模块的逻辑隔离,⼀个Virtual Host

        ⾯可以有若⼲个ExchangeQueue,同⼀个VirtualHost ⾥⾯不能有相同名称的Exchange

Queue

默认是 /,可以自己创建

RabbitMQ学习一快速认知和初步使用

rabbitMQ消息模式及4种交换机说明

rabbitMQ的消息传递模式,也是AMQP类型的消息传递模式

RabbitMQ学习一快速认知和初步使用

 四种交换机

direct exchange: 定向(直连)

        将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。

        例如:一个队列绑定到交换机上要求路由键为”aabb”,则只有被标记为”aabb”的消息才被转发

        到队列上。

fanout exchange: 广播,扇形

        只需要简单的将队列绑定到交换机上,所有发送到交换机上的消息都会被转发到与该交

        换机绑定的所有队列上,用于发布订阅,广播形式。不需处理路由键

topic exchange: 通配符,主题(大多数都使用此交换机)

        主题交换机是一种发布、订阅的模式,结合了直连交换机和扇形交换机的特点,将路由键和

        模式进行匹配。符号”#”代表一个或多个词,符号”*”代表一个词。例如:”abc.#”能

        匹配”abc,ff,gi”,”abc.*”只能匹配”abc.xx”。

headers exchange:(基本不用,后面不做介绍)

        根据发送的消息内容中的headers属性进⾏匹配, 在绑定QueueExchange时指定⼀组键值

        对,当消息发送到RabbitMQ时会取到该消息的headers 与Exchange绑定时指定的键值对进⾏

        匹配。

三种交换机的实战代码演示

springboot依赖版本

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>2.4.3</version>
</dependency>

direct exchange 直连交换机

发送消息代码

public class Send {

    private static final String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //ip地址
        factory.setHost("ip");
        //端口号 一般都是5672
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        //虚拟主机
        factory.setVirtualHost("dev");

        //jdk7语法 创建链接 自动关闭
        try(Connection connection = factory.newConnection()){
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机 直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String error = "订单服务的error信息";
            String info = "订单服务的info信息";
            String debug = "订单服务的debug信息";
            //direct 3条消息分别绑定3个路由键
            channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
            System.out.println("direct交换机发送消息成功");
        }

    }
}

接受消息1

public class Recv1 {

    private static final String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        //虚拟主机
        factory.setVirtualHost("dev");

        //创建链接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //根据路由键绑定交换机和队列  
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
                //手工确认   不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭消息自动确认
        channel.basicConsume(queueName, false, consumer);

    }
}

接收消息2

public class Recv2 {

    private static final String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        //虚拟主机
        factory.setVirtualHost("dev");

        //创建链接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //根据路由键绑定交换机和队列  
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
       
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
                //手工确认   不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭消息自动确认
        channel.basicConsume(queueName, false, consumer);

    }
}

1比2多接收两条消息

fanout exchange 扇形交换机

发送消息

public class Send {

    private static final String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("dev");

        //jdk7语法 创建链接 自动关闭
        try(Connection connection = factory.newConnection()){
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            String msg = "rabbitmq 扇形交换机发送消息";
            //fanout无需处理routingkey
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("fanout交换机发送消息成功");
        }

    }
}

接收消息1

public class Recv1 {

    private static final String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("dev");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列  fanout交换机不用处理routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
                //手工确认   不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭消息自动确认
        channel.basicConsume(queueName, false, consumer);
    }
}

接收消息2


public class Recv2 {

    private static final String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("dev");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列  fanout交换机不用处理routingkey
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
                //手工确认   不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭消息自动确认
        channel.basicConsume(queueName, false, consumer);
    }
}
 

1和2代码相同,接收的消息也都一致

topic exchange 主题交换机

public class Send {

    private static final String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("dev");

        //jdk7语法 创建链接 自动关闭
        try(Connection connection = factory.newConnection()){
            Channel channel = connection.createChannel();
            //绑定交换机 topic交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String error = "订单服务的error信息";
            String info = "订单服务的info信息";
            String debug = "商品服务的debug信息";
            //topic交换机 3条消息绑定3个路由键
            channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
            System.out.println("topic交换机发送消息成功");
        }
    }
}

接收消息1

public class Recv1 {

    private static final String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("dev");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列  TOPIC交换机 #代表一个或多个单词
        channel.queueBind(queueName,EXCHANGE_NAME,"#.error");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
                //手工确认   不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭消息自动确认
        channel.basicConsume(queueName, false, consumer);
    }
}

接收消息2

public class Recv2 {

    private static final String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("dev");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列  TOPIC交换机 *代表一个单词
        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = "+new String(body, StandardCharsets.UTF_8));
                //手工确认   不是多条确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //关闭消息自动确认
        channel.basicConsume(queueName, false, consumer);
    }
}

1只能接收error日志,2能接收所有日志

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/1275.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!