rabbitMq工作模式特性及整合springboot

导读:本篇文章讲解 rabbitMq工作模式特性及整合springboot,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq

mq(message queue)消息队列

官网:www.rabbitmq.com
使用消息队列的优点:
    1、异步可加快访问速度 (以前一个订单接口需要做下单、库存、付款、快递等相关操作,有了mq只需要给相关信息传入队列,下单、库存、付款、快递等相关操作会自动从队列中收到信息进行异步操作)
    2、解耦下游服务或其他服务或语言可接入
    3、削峰高并发访问量可分摊多个队列分摊
缺点:
    1、系统可用性降低(一旦mq挂了系统就宕机了)
    2、系统复杂性增大 (增加了mq模块需要考虑更多)

RabbitMQ的高级特性

  • 消费端限流
  • TTL 全称time to live(存活时间/过期时间) – 当消息到达存活时间后还没被消费会被丢弃 ttl+死信队列可实现延迟队列效果
  • 死信队列
  • 延迟队列
  • 消息可靠性投递
  • Consumer ACK

rabbitMq为了确保消息投递的可靠性提供了两种方式 confirm和return

rabbitmq整个消息投递的路径为
producer--->rabbitmq broker--->exchange--->queue--->consumer
1.消息从producer到exchange则会返回一个confirmCallback.
2.消息从exchange到queue投递失败则会返回一个returnCallBack.
我们将利用这两个callback控制消息的可靠性投递

Consumer ACK

ack指acknowledge,确认。表示消费者端接收到消息后的确认方式
有三种方式确认:
    自动确认:acknowledge="none"
    手动确认:acknowledge="manual"
    根据异常情况确认:acknowledge="auto"
    
自动确认指,当消息一旦被消费者接收到,则自动确认收到,并将相应的message从mq的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认模式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,
则调用channel.basicNack()方法,让其自动重新发送消息。    

我这里学习了前面五种

1:简单模式
2:工作队列模式
3:发布订阅模式
4:路由模式
5:主题模式

简单模式:即一条线一个发送到队列,队列发送到接收者
工作队列模式:即有一个发送者发送信息到队列,队列发给多个接收者,比如群发
发布订阅模式:这个是使用的最多的,发布者需要先发送到交换机,交换机再发送到与之绑定的队列, 然后队列在发送到与之绑定队列的接收者
路由模式:路由模式在发布订阅上增加了条件筛选,在消息到达交换机后发送队列时进行条件匹配,匹配成功才能发送给对应绑定的队列,最后再发送给接收者
主题模式:主题模式在路由模式上面进行升级,条件可进行模糊匹配,通配符规则 #可以匹配多个词 * 只能匹配一个词 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two

先安装rabbitMq,不同的环境可安装相关的版本,我这里已经安装好了
在这里插入图片描述
然后运行sbin下面的rabbitmq-server.bat

在这里插入图片描述
然后网页localhost:15672,如下页面即安装成功

在这里插入图片描述
在这里插入图片描述
然后去rabbitmq的官网
在这里插入图片描述

在这里插入图片描述
左边是下载右边是文档

文档中也会有一些代码案例,点击文档可以看到mq有七种方式
在这里插入图片描述

第一个是在测试的时候需要引入的包,第二个是在springboot上需要引入的包

com.rabbitmq
amqp-client
5.3.0

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

一:简单模式

我给mq的连接封装在工具类里,一些队列名放在常量类里了
工具类代码:

package com.lansi.realtynavi.test.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description 描述
 * @Date 2021/3/23 11:22
 * @Created by huyao
 */
public class RabbitUtils {

    public static ConnectionFactory factory = new ConnectionFactory();
    static {
        factory.setHost("localhost");
    }

    public static Connection getConnection() throws Exception{
        Connection connection = null;
        try {
            //获取长连接
            connection  = factory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }/*finally {
            connection.close();
        } */
        return connection;
    }


}

常量类代码:

package com.lansi.realtynavi.test.constant;

/**
 * @Description 描述
 * @Date 2021/3/23 11:01
 * @Created by huyao
 */
public class MqConstant {

    public static final String MQ_HELLO_WORD = "helloWord";
    public static final String MQ_PUBLISH = "publish";
    public static final String MQ_ROUTING = "routing";
    public static final String MQ_TOPICS = "topics";
    public static final String MQ_WORK_QUEUES = "workQueues";


    public static final String MQ_QUEUE_BAIDU = "baidu";
    public static final String MQ_QUEUE_XINLANG = "xinlang";



    public static final String MQ_PUBLISH_JHJ = "jiaohuanji";
    public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";
    public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";


}

生产者代码

package com.lansi.realtynavi.test.helloWord;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @Description 简单模式
 * @Date 2021/3/22 17:19
 * @Created by huyao
 */
public class Producer {


    public static void main(String[] args) throws Exception{


        Channel channel = null;

        Connection connection = null;
        try {
            //获取长连接
            connection = RabbitUtils.getConnection();
            channel = connection.createChannel();

            channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);
            String message = "这是我发送的第三个队列消息";
            //第一个参数是交换机信息   简单队列不需要交换机  第二个参数队列名称 ,第三个额外信息,第四个需要发布的信息
            channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());
            System.out.println("[x] Send ‘" + message + "’");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            channel.close();
            connection.close();
        }

    }

}

消费者代码:

package com.lansi.realtynavi.test.helloWord;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/22 17:27
 * @Created by huyao
 */
public class Consumer {

    public static void main(String[] argv) throws Exception {
        //连接
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明并创建一个队列
        //参数1 队列ID
        //参数2 是否持久化,false对应不持久化数据,mq停掉数据就会丢失
        //参数3 是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
        //参数4 是否自动删除, false代表连接停掉后不自动删除这个队列
        // 其他额外的参数,null
        channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);


        //从MQ服务器中获取数据

        //创建一个消息消费者
        //参数1:队列ID
        //参数2:代表是否自动确认收到消息,false代表手动编程来确认消息,这是mq的推荐做法
        //参数3:参数要传入的DefaultConsumer的实现类
        channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));
    }
}

class Reciver extends DefaultConsumer {
    private Channel channel;
    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中用到
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("消费者接收到的消息:"+message);
        System.out.println("消息的ID:"+envelope.getDeliveryTag());

        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);

    }
}

测试的时候队列需要手动去创建,不过springboot的话可以自动创建
在这里插入图片描述
这里已经手动创建好了
运行接收者,运行启动者
在这里插入图片描述
在这里插入图片描述
这里接收者自动接收消息

二:工作队列模式

	 一个队列多个接收者

生产者代码:

package com.lansi.realtynavi.test.workQueues;

import com.google.gson.Gson;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @Description 工作队列模式
 * @Date 2021/3/22 17:33
 * @Created by huyao
 */
public class Producer {


    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);

        for(int i = 1; i<=20; i++){
            SMS sms = new SMS("乘客" + i, "123456789", "你的车票已预订成功");
            String message = new Gson().toJson(sms);
            channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());
        }

        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }

}

封装对象代码:

package com.lansi.realtynavi.test.workQueues;

/**
 * @Description 描述
 * @Date 2021/3/23 11:28
 * @Created by huyao
 */
public class SMS {

    private String name;
    private String mobile;
    private String content;

    public SMS(String name, String mobile, String content) {
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

三个接收者代码

接收者1

package com.lansi.realtynavi.test.workQueues;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/23 11:33
 * @Created by huyao
 */
public class Consumer1 {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);

        //如果不写baiscQos(1) 则自动mq会将所有请求平均发送给所有消费者
        //baiscQos,mq不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("smsConsumer1-短信发送成功:"+message);

                //服务器好的话可以在这里睡眠   这里可动态配置开启和设置睡眠时间
                /*try {
                    Thread.sleep(10);
                }catch (Exception e){
                    e.printStackTrace();
                }*/

                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

接收者2

package com.lansi.realtynavi.test.workQueues;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/23 11:40
 * @Created by huyao
 */
public class Consumer2 {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);

        channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("smsConsumer2-短信发送成功:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}

接收者3

package com.lansi.realtynavi.test.workQueues;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 描述
 * @Date 2021/3/23 11:41
 * @Created by huyao
 */
public class Consumer3 {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);

        channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("smsConsumer1-短信发送成功:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}

启动三个接收类,启动发送类
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

三个接收都拿到了数据,我学习的时候队列是以轮询的方式给三个消费者发送数据,这里出现了接收数据不均衡的情况应该是缓存没用清理,给队列删掉重新创建就好了

三:发布订阅模式

生成者代码:

这里和前面两种模式不同,发送者绑定了交换机,没用绑定队列,需要消费者绑定交换机和队列

package com.lansi.realtynavi.test.publish;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Scanner;

/**
 * @Description 发布订阅模式
 * @Date 2021/3/23 13:31
 * @Created by huyao
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);

        String input = new Scanner(System.in).next();


        //第一个参数交换机名字,其他参数和之前一样
        channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());

        channel.close();
        connection.close();

    }
}

接收者1代码:

package com.lansi.realtynavi.test.publish;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerXinLang {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);


        //队列绑定交换机
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者新浪收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}

接收者2代码:

package com.lansi.realtynavi.test.publish;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerBaiDu {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);


        //队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者百度收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}

启动生产者消费者,在生产者控制台输入信息:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
两个消费者都接收到了

在这里插入图片描述
在这里插入图片描述

四 路由模式

路由模式发送需要携带路由key,用作接收者进行判断

生产者代码:

package com.lansi.realtynavi.test.routing;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Description 路由模式
 * @Date 2021/3/23 13:31
 * @Created by huyao
 *
 *
 * 交换机类型:fanout广播(发布订阅)   direct转发(路由)  topic通配符(通配模式)
 *
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);
        LinkedHashMap<String, String> map = new LinkedHashMap<>();
        map.put("test1","测试一数据");
        map.put("test2","测试二数据");
        map.put("test3","测试三数据");
        map.put("test4","测试四数据");
        map.put("test5","测试五数据");
        map.put("test6","测试六数据");
        map.put("test7","测试七数据");
        Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, String> next = iterator.next();
            //第一个参数交换机名字,第二个参数指定rout_key
            channel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());
        }



        channel.close();
        connection.close();

    }



}

接收者1:

package com.lansi.realtynavi.test.routing;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerBaiDu {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);


        //队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者百度收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}

接收者二

package com.lansi.realtynavi.test.routing;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerXinLang {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);


        //队列绑定交换机
        //参数1:队列名,参数2:交换机名,参数3:路由key
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者新浪收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}

在这里插入图片描述
在这里看到百度接收者只接受test1、test2,所以只接收到了1和2的数据,新浪同理
在这里插入图片描述

五 主题模式

	在路由的基础上增加了通配符匹配
	通配符规则  #可以匹配多个词  * 只能匹配一个词

生产者代码:

package com.lansi.realtynavi.test.topics;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Description 通配符模式
 * @Date 2021/3/23 13:31
 * @Created by huyao
 *
 *
 * 交换机类型:fanout广播(发布订阅)   direct转发(路由)  topic通配符(通配模式)
 *
 *  通配符规则  #可以匹配多个词  * 只能匹配一个词
 *              test.#  test.one.tow  test.one.q.wqe  /  test.*   test.one test.two
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, false, false, false, null);
        LinkedHashMap<String, String> map = new LinkedHashMap<>();
        map.put("test.one","测试一数据");
        map.put("test2.two.one","测试二数据");
        map.put("test.wqe","测试三数据");
        map.put("test4.com.hash.oqp","测试四数据");
        map.put("test5.com.code.oqp","测试五数据");
        map.put("test6.com.code.oqp","测试六数据");
        Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, String> next = iterator.next();
            //第一个参数交换机名字,第二个参数指定rout_key
            channel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());
        }
        channel.close();
        connection.close();

    }



}

接收者1代码:

package com.lansi.realtynavi.test.topics;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerBaiDu {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);


        //队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建
        //参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)
        channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者百度收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}

接收者2代码

package com.lansi.realtynavi.test.topics;

import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Description 消费者
 * @Date 2021/3/23 13:50
 * @Created by huyao
 */
public class ConsumerXinLang {

    public static void main(String[] args) throws Exception{

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);


        //队列绑定交换机
        //参数1:队列名,参数2:交换机名,参数3:路由key
        channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");
        channel.basicQos(1);

        channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者新浪收到消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


    }



}

在这里插入图片描述
在这里插入图片描述

最后就是springboot上整合rabbitmq

需要用到的依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
然后配置rabbitmq连接
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#发送者开启confirm确认机制
spring.rabbitmq.publisher-confirms=true
#发送者开启return确认机制
spring.rabbitmq.publisher-returns=true

#开启ack

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false

接下来一个rabbitmq的配置

package com.lansi.realtynavi.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description mq的配置
 * @Date 2021/3/24 14:19
 * @Created by huyao
 */
@Configuration
public class RabbitMqConfig {

    //定义交换机的名字
    public static final String EXCHANGE_NAME = "boot_topic_exchange";

    public static final String QUEUE_NAME = "boot_queue";

    //1.声明交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2.声明队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3.绑定
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

接收者

package com.lansi.realtynavi.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Description mq监听/消费者手动签收消息
 * @Date 2021/3/24 14:44
 * @Created by huyao
 *
 *rabbitmq给了两种消息的可靠性  confirm和return
 *
 */
@Component
public class RabbitMqConsumer {


    //可监听分布式其他项目,只要mq连接的地址相同监听的队列名存在即可
    //消费者
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message, Channel channel) throws Exception{

        System.out.println("消费者接收到消息:"+new String(message.getBody()));

        
        try{
            //开始业务处理

            System.out.println("开始业务处理");

            //int i = 5/0;

            System.out.println("业务处理完成");
            //业务处理完成确认收到消息  , 第二个参数为true支持多消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

        }catch (Exception e){
            System.out.println("业务处理异常");
            //业务异常,拒收消息,请求重发    参数三为true则重回队列发送
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
        }


    }

}

这里的生产者我写的一个controller中的列子(错误示范,只能调用一次)

testTopic1 是测试mq的高级特性,这里只用到testTopic就可以

package com.lansi.realtynavi.rabbitmq;

import com.lansi.realtynavi.config.RabbitMqConfig;
import com.lansi.realtynavi.dev.helloWord.HelloSender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description 描述
 * @Date 2021/3/24 13:46
 * @Created by huyao
 */
@RestController
@RequestMapping("api/rabbitMq")
public class RabbitMqController {


    @Autowired
    private HelloSender helloSender;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("helloWorld")
    public void hello(){
        helloSender.send();
    }

    @GetMapping("testTopic")
    public void testTopic(){
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh", "topic的mq.......");
    }

    //mq的可靠性机制,必须要在配置文件中开启
    @GetMapping("testTopic1")
    public void testTopic1(){

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm方法被执行了。。。");
                if(b){
                    System.out.println("交换机确认成功!!");
                } else {
                    System.out.println("交换机确认失败!!");
                }

            }
        });

        //设置交换机处理失败消息的模式,为true的时候,消息打到不了队列时,会将消息重新返回给生产者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * @param message 消息对象
             * @param returnCode 错误码
             * @param returnText 错误信息
             * @param exchange 交换机
             * @param routingKey 路由键
             *
             * */
            @Override
            public void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {
                System.out.println("return被执行了。。。");
                System.out.println("message:"+new String(message.getBody()));
                System.out.println("错误码:"+returnCode);
                System.out.println("错误信息:"+returnText);
                System.out.println("交换机:"+exchange);
                System.out.println("路由键:"+routingKey);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh", "topic的mq.......");
    }

}

运行后掉对应的接口,消费者接收
在这里插入图片描述
在这里插入图片描述
这样rabbitmq就整合进springboot中了

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/5300.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!