RabbitMQ之六种工作模式的使用

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 RabbitMQ之六种工作模式的使用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

RabbitMQ工作模式

RabbitMQ提供了6种模式(官网模式介绍):简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式。

一、简单模式

在这里插入图片描述

P(producer/ publisher):生产者,一个发送消息的用户应用程序。

C(consumer):消费者,一个用来等待接收消息的用户应用程序

队列(红色区域):队列,消息存储在队列中。
				队列只受主机的内存和磁盘限制,队列本质上是一个大的消息缓冲区。
				生产者发送消息到队列,消费者尝试从队列接收数据。

简而言之,生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

1.引入依赖

	<dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

2.编写生产者

public class Producer_SimpleQueue {
    
    private final static String QUEUE_NAME = "simple_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列,如果持久化,Mq重启后队列还在
         * exclusive:是否独占本次连接,队列只允许在该链接中访问,如果连接关闭则队列自动删除。设置true可用于创建临时队列.
         * autoDelete:是否在不使用的时候自动删除队列,该参数与exclusive设置为true就可以实现临时队列。
         * arguments:参数。设置队列的扩展参数
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        String body = "simple_queue";
        //6. 发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *
         * exchange 交换机名称,如果没有指定则使用默认Default Exchage
         * routingKey 路由key,交换机根据路由key将消息转发到指定队列。如果使用默认交换机,路由key设置为队列名称。
         * props 消息其它属性
         * body 消息内容
         *
         */
        channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
        System.out.println("已发送消息:" + body);

        //7.释放资源
      channel.close();
      connection.close();

    }
}

3.编写消费者


public class Consumer_SimpleQueue {
    
    private final static String QUEUE_NAME = "simple_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
    
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列,如果持久化,Mq重启后队列还在
         * exclusive:是否独占本次连接,队列只允许在该链接中访问,如果连接关闭则队列自动删除。设置true可用于创建临时队列.
         * autoDelete:是否在不使用的时候自动删除队列,该参数与exclusive设置为true就可以实现临时队列。
         * arguments:参数。设置队列的扩展参数
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        // 6.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消费者标签,用来标识消费者,在监听队列时设置channel.basicConsume
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
           		//交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                
               String msg=new String(body,"utf-8");
                System.out.println("接受到消息:"+msg);
            }
        };
    
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调,当消费者接收到消息要执行的方法
         */
         //7.监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);


        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();

    }
}

4.执行测试

1.启动生产者

在这里插入图片描述
进入队列页面,可以看到新建了一个队列:simple_queue
在这里插入图片描述
点击simple_queue进入队列详情页,点击Get messages查看消息,在控制台查看消息,消息并不会被消费。
在这里插入图片描述

2.启动消费者

在这里插入图片描述
此时在控制台查看消息,发现消费者已经获取了消息,并且还在监听队列中是否有新的消息。
在这里插入图片描述

二、work模式

简单模式相比,多了一个或一些消费者,多个消费者共同消费同一个队列中的消息。
在这里插入图片描述

多个消费者存在竞争关系:

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息。

适用于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

1.引入依赖

	<dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

2.编写生产者


public class Producer_WorkQueues {
    private final static String QUEUE_NAME = "work_queues";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *
         * exchange 交换机名称,如果没有指定则使用默认Default Exchage
         * routingKey 路由key,简单模式可以传递队列名称
         * props 消息其它属性
         * body 消息内容
         *
         */
        for (int i = 1; i <= 20; i++) {
            String body = i+":work_queues...";
            //6. 发送消息
            channel.basicPublish("","work_queues",null,body.getBytes());
            System.out.println("发送消息:"+body);
        }
        
        //7.释放资源
        channel.close();
        connection.close();
        
    }
}

3.编写消费者1

public class Consumer_WorkQueues1 {
    private final static String QUEUE_NAME = "work_queues";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        // 6.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者1,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
         //7.监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
        
        
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
        
    }
}

3.编写消费者2

public class Consumer_WorkQueues2 {
    private final static String QUEUE_NAME = "work_queues";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        // 6.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者2,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
         //7.监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
        
        
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
        
    }
}

4.执行测试

1.启动两个消费者

启动两个消费者,查看控制台,发现work_queues队列创建,同时有两个消费者监听队列。
在这里插入图片描述

2.启动生产者

启动生产者,发送消息。两个消费者各自消费了不同的10条消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

三、Publish/Subscribe发布与订阅模式(Fanout)

1、每个消费者监听自己的队列。

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
在这里插入图片描述

该工作模式下,多了一个 Exchange 角色

P:生产者,要发送消息的程序,但是不再发送到队列中,而是发给交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange有常见以下3种类型:
		 Fanout:广播,将消息交给所有绑定到交换机的队列 
		 Direct:定向,把消息交给符合指定routing key 的队列
		 Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列 
		 
 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

Fanout,也称为广播。将消息交给所有绑定到交换机的队列

在广播模式下,消息发送流程:

1.  可以有多个消费者
2.  每个消费者有自己的queue(队列)
3.  每个队列都要绑定到Exchange(交换机)
4.  生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5.  交换机把消息发送给绑定过的所有队列
6.  队列的消费者都能拿到消息。实现一条消息被多个消费者消费

1.引入依赖

	<dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

2.编写生产者

public class Producer_PubSub {
    
    private final static String EXCHANGE_NAME = "test_fanout";
    private final static String QUEUE_NAME1 = "test_fanout_queue1";
    private final static String QUEUE_NAME2 = "test_fanout_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
    
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        
        //5. 创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         *
         * exchange 交换机名称
         * type 换机类型 fanout:广播、topic:通配符、direct:定向、headers:参数匹配
         * durable 是否持久化
         * autoDelete 是否自动删除
         * internal 内部使用。 一般false
         * arguments 参数
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,true,false,false,null);
        //7. 绑定队列和交换机
          /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue 队列名称
         * exchange 交换机名称
         * routingKey 路由键,绑定规则,如果交换机的类型为fanout ,routingKey设置为"" 作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
         */
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");

        String body = "famout工作模式。。。";
        //8. 发送消息
         /**
          * 参数明细:
          * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
          * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
          * 3、props,消息的属性
          * 4、body,消息内容
         */
        channel.basicPublish(EXCHANGE_NAME,"",null,body.getBytes());
        System.out.println("发送消息:"+body);

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

3编写消费者1

public class Consumer_PubSub1 {
    
    private final static String QUEUE_NAME = "test_fanout_queue1";
    private final static String EXCHANGE_NAME = "test_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
    
    
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    
    
        // 6.绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 7.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者1,接受到消息:"+msg);
            }
        };
    
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);

        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();

    }
}

4.编写消费者2

public class Consumer_PubSub2 {
    
    private final static String QUEUE_NAME = "test_fanout_queue2";
    private final static String EXCHANGE_NAME = "test_fanout";
    public static void main(String[] args) throws IOException, TimeoutException {
        
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    
        // 6.绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        
        // 7.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者2,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
        
        
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
        
        
    }
}

5.执行测试

启动生产者,查看控制台,发现创建了test_fanout交换机,同时也创建了两个队列,每个队列都收到了一条消息。
在这里插入图片描述在这里插入图片描述
在这里插入图片描述
启动两个消费者,查看控制台,发现两个消费者控制台都输出了都一条消息,同时队列中的消息已被消费,消费者任然在监听消息。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

四、Routing 路由模式(Direct)

1、每个消费者监听自己的队列,并且设置routingkey。

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
在这里插入图片描述

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。

在路由模式中,添加了一个功能 ,只能订阅一部分消息。不同的消息被不同的队列消费。 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

1.引入依赖

	<dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

2.编写生产者

public class Producer_Routing {
    
    private final static String EXCHANGE_NAME = "test_direct";
    private final static String QUEUE_NAME1 = "test_direct_queue1";
    private final static String QUEUE_NAME2 = "test_direct_queue2";
    
    public static void main(String[] args) throws IOException, TimeoutException {
    
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
    
        //5. 创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         *
         * exchange 交换机名称
         * type 换机类型 fanout:广播、topic:通配符、direct:定向、headers:参数匹配
         * durable 是否持久化
         * autoDelete 是否自动删除
         * internal 内部使用。 一般false
         * arguments 参数
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. 创建队列
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,true,false,false,null);
        //7. 绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue 队列名称
         * exchange 路由键,绑定规则
         * routingKey 如果交换机的类型为fanout ,routingKey设置为""
         */
        //队列1绑定 error
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"error");
        //队列2绑定 info  error  warning
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"warning");

        String body = "路由key是: error,队列1和队列2的消费者都能消费。";
        //8. 发送消息
        channel.basicPublish(EXCHANGE_NAME,"error",null,body.getBytes());
        System.out.println("生产者:"+body);

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

2.编写消费者1

public class Consumer_Routing1 {
    private final static String QUEUE_NAME = "test_direct_queue1";
    private final static String EXCHANGE_NAME = "test_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        
        // 6.绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        
        // 7.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者1,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
        
        
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
        
        
    }
}

3.编写消费者2

public class Consumer_Routing2 {
    private final static String QUEUE_NAME = "test_direct_queue2";
    private final static String EXCHANGE_NAME = "test_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("IP");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        // 6.绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        
        
        // 7.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者2,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
        
        
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
        
        
    }
}

4.执行测试

测试一:

发送路由key为:error,由于队列1与队列2都与交换器绑定了路由key为error,所以队列1与队列2都能收到消息。
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在这里插入图片描述

测试二

发送路由key为:info,由于队列2都与交换器绑定了路由key为info,所以队列2都才能收到消息。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
查看控制台,可以查看交换器与队列的绑定关系,以及创建的队列。
在这里插入图片描述
在这里插入图片描述

五、Topics主题模式(topics)

1、每个消费者监听自己的队列,并且设置带统配符的routingkey。

2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
在这里插入图片描述
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!更加灵活。

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: old.orange.rabbit

通配符规则

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

#.rabbit:能够匹配old.orange.rabbit或者 blue.rabbit

green.*:只能匹配green.rabbit

1.引入依赖

	<dependencies>
        <!--rabbitmq java 客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

2.编写生产者

public class Producer_Topics {
    private final static String EXCHANGE_NAME = "test_topic";
    private final static String QUEUE_NAME1 = "test_topic_queue1";
    private final static String QUEUE_NAME2 = "test_topic_queue2";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("ip");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        
        //5. 创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         *
         * exchange 交换机名称
         * type 换机类型 fanout:广播、topic:通配符、direct:定向、headers:参数匹配
         * durable 是否持久化
         * autoDelete 是否自动删除
         * internal 内部使用。 一般false
         * arguments 参数
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6. 创建队列
        channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,true,false,false,null);
        //7. 绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue 队列名称
         * exchange 路由键,绑定规则
         * routingKey 如果交换机的类型为fanout ,routingKey设置为""
         */
        //队列1绑定 error
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange");
        //队列2绑定 info  error  warning
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"#.rabbit");


        String body = "一只老的有分量的兔子....,只能匹配到队列2";
        //8. 发送消息
        channel.basicPublish(EXCHANGE_NAME,"old.weighty.rabbit",null,body.getBytes());
        System.out.println("生产者:"+body);

        //9. 释放资源
        channel.close();
        connection.close();

    }
}

3.编写消费者1


public class Consumer_Topic1 {
    private final static String QUEUE_NAME = "test_topic_queue1";
    private final static String EXCHANGE_NAME = "test_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("ip");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        
        // 6.绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        
        // 7.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者1,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
        
        
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
        
        
    }
}

4.编写消费者2

public class Consumer_Topic2 {
    private final static String QUEUE_NAME = "test_topic_queue2";
    private final static String EXCHANGE_NAME = "test_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        
        
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        //ip  默认值 localhost
        factory.setHost("ip");
        //端口  默认值 5672
        factory.setPort(5672);
        //虚拟机 默认值/
        factory.setVirtualHost("/test");
        //用户名 默认 guest
        factory.setUsername("test");
        //密码 默认值 guest
        factory.setPassword("test");
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /**
         *  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         *
         * queue:队列名称
         * durable:是否持久化队列
         * exclusive:是否独占本次连接
         * autoDelete:是否在不使用的时候自动删除队列
         * arguments:参数。
         *
         * 没有名字叫‘simple_queue’的队列,则会创建该队列,否则不会创建
         *
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        
        // 6.绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        
        
        // 7.定义队列的消费者,并设置消息处理
        Consumer consumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 属性信息
             * @param body 消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                System.out.println("消费者2,接受到消息:"+msg);
            }
        };
        
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         * queue: 队列名称
         * autoAck 是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * callback 消息接收到后回调
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
         
        //不关闭资源,应该一直监听消息
        //channel.close();
        //connection.close();
    }
}

5.执行测试

测试一

使用通配符路由key:old.orange该路由key能匹配队列1与队列2
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试二

使用通配符路由key:old.weighty.rabbit 该路由key只能匹配队列2
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
查看控制台,发现创建了test_topic交换器,也创建了两个队列,同时与队列进行了关系绑定
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

六、RPC模式

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端既是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将RPC方法 的结果发送到RPC响应队列

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

七、Header模式

RabbitMQ官网介绍有6种工作模式,突然看到有个Header模式,特此记录

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

1.生产者

		//声明一个交换机
        channel.exchangeDeclare("exchange_headers", BuiltinExchangeType.HEADERS);
        //声明headers模式匹配规则
        Map<String, Object> headers_one = new Hashtable<String, Object>();
        headers_one.put("headers", "one");
        Map<String, Object> headers_two = new Hashtable<String, Object>();
        headers_two.put("headers", "two");

        //交换机和队列绑定
        channel.queueBind("headers_one","exchange_headers","",headers_one);
        channel.queueBind("headers_two","exchange_headers","",headers_two);


            String message = "headers...";
            Map<String,Object> headers = new Hashtable<String, Object>();
            //匹配通知消费者绑定的header_one
            headers.put("headers", "one");
            //匹配通知消费者绑定的headers_two
            headers.put("headers", "two");
            AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
            properties.headers(headers);

            channel.basicPublish("exchange_headers", "", properties.build(), message.getBytes());

2.消费者

消费者1

		//声明一个交换机
        channel.exchangeDeclare("exchange_headers", BuiltinExchangeType.HEADERS);
        //声明headers模式匹配规则
        Map<String, Object> headers_one = new Hashtable<String, Object>();
        headers_one.put("headers", "one");
        headers_two.put("headers", "two");
        
        //交换机和队列绑定
        channel.queueBind("headers_one","exchange_headers","",headers_one);

消费者2

		//声明一个交换机
        channel.exchangeDeclare("exchange_headers", BuiltinExchangeType.HEADERS);
        //声明headers模式匹配规则
        Map<String, Object> headers_two = new Hashtable<String, Object>();
        headers_two.put("headers", "two");
        
        //交换机和队列绑定
        channel.queueBind("headers_two","exchange_headers","",headers_two);

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137126.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!