RabbitMQ入门学习以及包含源码 使用Mac

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。RabbitMQ入门学习以及包含源码 使用Mac,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

RabbitMQ学习包含源码

这是自己在学习当中写的一篇博客,所有的内容都是自己亲测有效的。

安装RabbitMQ

由于我用的是mac,所以在这里就用mac讲解了

  1. 首先安装brew brew官网
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  1. 使用brew安装rabbitmq
brew update
brew install rabbitmq
  1. 启动rabbitmq
brew services start rabbitmq
#关闭
brew services stop rabbitmq
  1. 进入 http://localhost:15672/
  2. 账号和密码默认都是guest

RabbitMQ入门

搭建工程

使用idea搭建一个maven工程即可

在这里插入图片描述

添加依赖在pom.xml文件中

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

编写生产者

package com.duoduo.rabbitmq.simple;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ClassName Producer.java
 * @Author 拾光
 * @Date 2020年07月07日 17:58:00
 * @Description mq生产者
 */
public class Producer {
    static final String QUEUE_NAME = "simple_que";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //主机地址;默认为 localhost
        connectionFactory.setHost("localhost");
        //虚拟主机名称 默认为 /
        connectionFactory.setVirtualHost("/");
        //设置端口 默认为 5672
        connectionFactory.setPort(5672);
        //连接用户名;默认为  guest
        connectionFactory.setUsername("guest");
        //连接密码;默认为  guest
        connectionFactory.setPassword("guest");
        //创建新的连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //要发送的消息
        String msg="你好啊,小宝贝";
        /**
         *   参数  1:交换机名称,如果没有指定则使用默认  Default   Exchage
         *   参数  2:路由  key,简单模式可以传递队列名称
         *   参数  3:消息其它属性
         *   参数  4:消息内容
         */
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("已发送消息:"+msg);
        //关闭资源
        channel.close();
        connection.close();
    }
}

抽取获得连接的工具类

package com.duoduo.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @ClassName ConnectionUtil.java
 * @Author 拾光
 * @Date 2020年07月07日 18:43:00
 * @Description 连接mq工具类
 */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //主机地址;默认为 localhost
        connectionFactory.setHost("localhost");
        //虚拟主机名称 默认为 /
        connectionFactory.setVirtualHost("/");
        //设置端口 默认为 5672
        connectionFactory.setPort(5672);
        //连接用户名;默认为  guest
        connectionFactory.setUsername("guest");
        //连接密码;默认为  guest
        connectionFactory.setPassword("guest");
        //创建连接
        return connectionFactory.newConnection();
    }
}

创建消费者

package com.duoduo.rabbitmq.simple;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 2020年07月07日 18:47:00
 * @Description mq消费者
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
               System.out.println("路由key为:"+envelope.getRoutingKey());
               //交换机
               System.out.println("交换机为:"+envelope.getExchange());
               //消息ID
               System.out.println("消息id为:"+envelope.getDeliveryTag());
               //收到的消息
               System.out.println("收到的消息为:"+new String(body,"utf-8"));
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

小结:

在这里插入图片描述
上述的案列中用的简单模式,再上图的模型中,有如下概念:
P:生产者,也就是要发送消息的程序
C:消费者,消息的接受者,会一直等着消息的到来
queue: 消息队列,上图中红色的部分,类似一个邮箱,可以缓存消息。
在 rabitMQ 中消费者是一定要到某个消息队列中去获取消息的

AMQP

介绍

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

rabbitmq运转流程

生产者发送消息

生产者连接到Rabbitmq Broker,建立一个连接(Connection),开启一个信道(Channel)
生产者声明一个交换机,并设置相关属性,比如交换机类型、是否持久化
生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过路由键将交换机和队列绑定起来
生产者发送消息至Rabbitmq Broker,其中包含路由键、交换器等信息
相应的交换器根据接收到的路由键查找相匹配的队列
如果找到,则将从生产者发送过来的消息存入相应的队列中
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道
关闭连接

消费者接受消息

消费者连接到Rabbitmq Broker,建立一个连接(Connection),开启一个信道(Channel)
消费者向Rabbitmq Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
等待Rabbitmq Broker回应并投递相应队列中的消息,消费者接受消息
消费者确认(ack)接收到的消息
Rabbitmq从队列中删除相应已经被确认的消息
关闭信道
关闭连接

工作模式介绍

官网介绍:RabbitMQ介绍官网

这里简单介绍下五种工作模式的主要特点:

  1. 简单模式:一个生产者,一个消费者

  2. work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。

  3. 订阅模式:一个生产者发送的消息会被多个消费者获取。

  4. 路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key

  5. topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

Work queues 工作队列模式

代码

生产者

package com.duoduo.rabbitmq.work;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName WorkProduce.java
 * @Author 拾光
 * @Date 2020年07月08日 17:03:00
 * @Description 工作模式生产者
 */
public class WorkProduce {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        //通过使用工具类来创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //要发送的消息
        for (int i = 0; i < 20; i++) {
            String msg="你好啊,小宝贝-----工作模式--"+i;
            /**
             *   参数  1:交换机名称,如果没有指定则使用默认  Default   Exchage
             *   参数  2:路由  key,简单模式可以传递队列名称
             *   参数  3:消息其它属性
             *   参数  4:消息内容
             */
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("已发送消息:"+msg);
        }


        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

package com.duoduo.rabbitmq.work;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 2020年07月07日 18:47:00
 * @Description mq消费者
 */
public class WorkConsumer1 {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(WorkConsumer1.QUEUE_NAME,true,false,false,null);
        //一次只接受并处理一条消息
        channel.basicQos(1);
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               try {
                   //路由key
                   System.out.println("路由key为:"+envelope.getRoutingKey());
                   //交换机
                   System.out.println("交换机为:"+envelope.getExchange());
                   //消息ID
                   System.out.println("消息id为:"+envelope.getDeliveryTag());
                   //收到的消息
                   System.out.println("收到的消息为:"+new String(body,"utf-8"));
                   Thread.sleep(10000);
                   //确认消息
                   channel.basicAck(envelope.getDeliveryTag(),false);
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(WorkConsumer1.QUEUE_NAME,false,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

消费者2

package com.duoduo.rabbitmq.work;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 2020年07月07日 18:47:00
 * @Description mq消费者
 */
public class WorkConsumer2 {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(WorkConsumer2.QUEUE_NAME,true,false,false,null);
        //一次只接受并处理一条消息
        channel.basicQos(1);
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               try {
                   //路由key
                   System.out.println("路由key为:"+envelope.getRoutingKey());
                   //交换机
                   System.out.println("交换机为:"+envelope.getExchange());
                   //消息ID
                   System.out.println("消息id为:"+envelope.getDeliveryTag());
                   //收到的消息
                   System.out.println("收到的消息为:"+new String(body,"utf-8"));
                   Thread.sleep(10000);
                   //确认消息
                   channel.basicAck(envelope.getDeliveryTag(),false);
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(WorkConsumer2.QUEUE_NAME,false,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

测试

在这里插入图片描述
在这里插入图片描述

小结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
在这里就不写订阅模式了,因为下面要介绍发布订阅模式

Publish/Subscribe 发布与订阅模式

模式说明

发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的 队列都将接收到消息

生产者

package com.duoduo.rabbitmq.发布订阅模式;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName WorkProduce.java
 * @Author 拾光
 * @Date 2020年07月08日 17:03:00
 * @Description 工作模式生产者
 */
public class PublicProduce {
    /**
     * 交换机名称
     */
    static  final String FANOUT_EXCHANGE="fanout_exchange";
    //队列名称
    static  final String FANOUT_QUEUE_1="fanout_queue_1";
    //队列名称
    static  final String FANOUT_QUEUE_2="fanout_queue_2";

    public static void main(String[] args)throws Exception {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //申明交换机
        /**
         * 申明交换机
         * 参数一: 交换机名称
         * 参数二: 交换机类型 fanout ,topic, direct,headers
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
        channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
        channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
        for (int i = 1; i < 10; i++) {
            String message="你好朋友,发布订阅模式"+i;
            /**
             *   参数  1:交换机名称,如果没有指定则使用默认  Default   Exchage
             *   参数  2:路由  key,简单模式可以传递队列名称
             *   参数  3:消息其它属性
             *   参数  4:消息内容
             */
            channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
            System.out.println("已发送消息:"+message);
        }
        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

package com.duoduo.rabbitmq.发布订阅模式;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 2020年07月07日 18:47:00
 * @Description mq消费者
 */
public class PubliceConsumer1 {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(PublicProduce.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);

        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(PublicProduce.FANOUT_QUEUE_1,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(PublicProduce.FANOUT_QUEUE_1,PublicProduce.FANOUT_EXCHANGE,"");
        //一次只接受并处理一条消息
        channel.basicQos(1);
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               try {
                   //路由key
                   System.out.println("路由key为:"+envelope.getRoutingKey());
                   //交换机
                   System.out.println("交换机为:"+envelope.getExchange());
                   //消息ID
                   System.out.println("消息id为:"+envelope.getDeliveryTag());
                   //收到的消息
                   System.out.println("收到的消息为:"+new String(body,"utf-8"));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(PublicProduce.FANOUT_QUEUE_1,true,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

消费者2

package com.duoduo.rabbitmq.发布订阅模式;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 20200707 18:47:00
 * @Description mq消费者
 */
public class PubliceConsumer2 {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //申明交换机
        channel.exchangeDeclare(PublicProduce.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);

        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(PublicProduce.FANOUT_QUEUE_2,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(PublicProduce.FANOUT_QUEUE_2,PublicProduce.FANOUT_EXCHANGE,"");
        //一次只接受并处理一条消息
        channel.basicQos(1);
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               try {
                   //路由key
                   System.out.println("路由key为:"+envelope.getRoutingKey());
                   //交换机
                   System.out.println("交换机为:"+envelope.getExchange());
                   //消息ID
                   System.out.println("消息id为:"+envelope.getDeliveryTag());
                   //收到的消息
                   System.out.println("收到的消息为:"+new String(body,"utf-8"));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(PublicProduce.FANOUT_QUEUE_2,true,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

测试

在这里插入图片描述
在这里插入图片描述

小结:

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

Routing路由模式

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的
Routingkey 与消息的 Routing key 完全一致,才会接收到消息

生产者

package com.duoduo.rabbitmq.route;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName WorkProduce.java
 * @Author 拾光
 * @Date 2020年07月08日 17:03:00
 * @Description 路由模式
 */
public class RouteProduce {
    /**
     * 交换机名称
     */
    static  final String DIRECT_EXCHANGE="direct_exchange";
    //队列名称
    static  final String DIRECT_QUEUE_INSERT="direct_queue_insert";
    //队列名称
    static  final String DIRECT_QUEUE_UPDATE="direct_queue_update";

    public static void main(String[] args)throws Exception {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //申明交换机
        /**
         * 申明交换机
         * 参数一: 交换机名称
         * 参数二: 交换机类型 fanout ,topic, direct,headers
         */
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(DIRECT_QUEUE_INSERT,true,false,false,null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(DIRECT_QUEUE_INSERT,DIRECT_EXCHANGE,"insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE,DIRECT_EXCHANGE,"update");
            /**
             *   参数  1:交换机名称,如果没有指定则使用默认  Default   Exchage
             *   参数  2:路由  key,简单模式可以传递队列名称
             *   参数  3:消息其它属性
             *   参数  4:消息内容
             */
            String message="新增了路由模式:route key为insert";
        channel.basicPublish(DIRECT_EXCHANGE,"insert",null,message.getBytes());
        System.out.println("已发送消息:"+message);
         message="新增了路由模式:route key为update";
        channel.basicPublish(DIRECT_EXCHANGE,"update",null,message.getBytes());
        System.out.println("已发送消息:"+message);

        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者1

package com.duoduo.rabbitmq.route;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.duoduo.rabbitmq.发布订阅模式.PublicProduce;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 2020年07月07日 18:47:00
 * @Description mq消费者
 */
public class RouteConsumer1 {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(RouteProduce.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);

        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(RouteProduce.DIRECT_QUEUE_INSERT,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(RouteProduce.DIRECT_QUEUE_INSERT,RouteProduce.DIRECT_EXCHANGE,"insert");
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               try {
                   //路由key
                   System.out.println("路由key为:"+envelope.getRoutingKey());
                   //交换机
                   System.out.println("交换机为:"+envelope.getExchange());
                   //消息ID
                   System.out.println("消息id为:"+envelope.getDeliveryTag());
                   //收到的消息
                   System.out.println("收到的消息为:"+new String(body,"utf-8"));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(RouteProduce.DIRECT_QUEUE_INSERT,true,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

消费者2

package com.duoduo.rabbitmq.route;

import com.duoduo.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Consumer.java
 * @Author 拾光
 * @Date 2020年07月07日 18:47:00
 * @Description mq消费者
 */
public class RouteConsumer2 {
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //申明交换机
        channel.exchangeDeclare(RouteProduce.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);

        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候删除队列
         * 参数5:队列其他参数
         */
        channel.queueDeclare(RouteProduce.DIRECT_QUEUE_UPDATE,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(RouteProduce.DIRECT_QUEUE_UPDATE,RouteProduce.DIRECT_EXCHANGE,"update");
        //创建消费者
       DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
           /**
            * @param consumerTag 消息者标签
            * @param envelope  消息包的内容,可以从中获取ID,消息交换机,消息路由
            * @param properties  属性信息
            * @param body 消息
            */
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               try {
                   //路由key
                   System.out.println("路由key为:"+envelope.getRoutingKey());
                   //交换机
                   System.out.println("交换机为:"+envelope.getExchange());
                   //消息ID
                   System.out.println("消息id为:"+envelope.getDeliveryTag());
                   //收到的消息
                   System.out.println("收到的消息为:"+new String(body,"utf-8"));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       };
       //监听消息
        /**
         * 参数一 队列名称
         * 参数二 是否自动确认
         * 参数三 消息接受到后回调
         */
        channel.basicConsume(RouteProduce.DIRECT_QUEUE_UPDATE,true,defaultConsumer);

        //关闭资源
//        channel.close();
//        connection.close();
    }
}

测试

在这里插入图片描述
在这里插入图片描述

小结:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合routing key的队列。

源码地址:https://github.com/chaiduolai/Learn_RabbitMQ.git
如果有不对的地方欢迎交流!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/159017.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!