RabbitMQ
什么是MQ
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
RabbitMQ的优点
由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,社区活跃度高,更新频率快(但是商业版需要收费)
Rabbit的概念
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
四大核心概念
生产者
-
产生数据发送消息的程序
消费者
-
消费者大多时候是一个等待接收消息的程序,生产者消费者很多时候都不在同一个机器,同一个机器可以即是生产者又是消费者
交换机
-
一方面接收生产者的消息,另一方面将消息推送到队列中,可以将消息推送到特定队列或者多个队列,或者将消息丢弃
队列
-
RabbitMQ存储消息的一种数据结构,生产者发送的消息只能存存储在队列中,队列仅仅受主机内存以及磁盘的限制约束,本质上是一个很大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以从一个队列接收数据。
RabbitMQ的安装
-
官网地址
-
https://www.rabbitmq.com/download.html -
文件上传
-
上传到/usr/local/software 目录下(如果没有 software 需要自己创建) -
安装文件(分别按照以下顺序安装)
-
rpm -ivh erlang-21.3-1.el7.x86_64.rpm -
yum install socat -y -
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm -
常用命令(按照以下顺序执行)
-
添加开机启动 RabbitMQ 服务 chkconfig rabbitmq-server on
-
启动服务 /sbin/service rabbitmq-server start
-
查看服务状态 /sbin/service rabbitmq-server status
-
停止服务(选择执行) /sbin/service rabbitmq-server stop
-
开启 web 管理插件(在页面访问RabbitMQ的管理界面)r abbitmq-plugins enable rabbitmq_management
-
访问RabbitMQ的管理界面 http://{安装RabbitMQ机器的IP地址}:15672/,用默认的账号密码guest会出现权限问题(关闭防火墙或开放端口)
“
不关闭防火墙的话,需要开放15672和5672两个端口,一个是连接控制台,一个是连接服务 firewall-cmd –zone=public –add-port=15672/tcp –permanent firewall-cmd –zone=public –add-port=5672/tcp –permanent firewall-cmd –reload 如果是云服务器的话,还需要在管理端开放这两个端口
”
-
创建账号 rabbitmqctl add_user admin 123
-
设置用户角色 rabbitmqctl set_user_tags admin administrator
-
设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" -
此处添加新的用户访问 -
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色 rabbitmqctl list_users
-
再次使用admin登陆。 -
重置命令
-
关闭应用的命令为 rabbitmqctl stop_app
-
清除的命令为 rabbitmqctl reset
-
重新启动命令为 rabbitmqctl start_app
RabbitMQ的入门程序
1. 导入Maven依赖
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
2. 创建消息的生产者
public class Producer {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ队列(安装RabbitMQ机器的IP地址)
factory.setHost("xxx.xxx.xxx.xxx");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/**
* 声明一个队列
* 1. 队列名称
* 2. 队列中的消息是否需要持久话(磁盘),默认情况消息存储在内存中
* 3. 该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false只能一个消费者消费
* 4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 true自动删除,false则相反
* 5. 其他参数
*/
channel.queueDeclare(QUERE_NAME,false,false,false,null);
String message = "hello world ";
/**
* 发送一个消息
* 1. 发送到那个哪个交换机
* 2. 路由的key值是哪个,本次是队列名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/
channel.basicPublish("",QUERE_NAME,null,message.getBytes());
}å
}
3. 创建消息的消费者
/**
* 消费者,接受消息
*/
public class Consumer {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ队列
factory.setHost("xxx.xxx.xxx.xxx");
//用户名
factory.setUsername("username");
//密码
factory.setPassword("password");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1. 消费哪个队列
* 2. 消费成功之后是否要自动应答 true 代表自动 false 代表手动
* 3. 推送的消息如何进行消费的接口回调
* 4. 消费者取消消费的回调
*/
channel.basicConsume(QUERE_NAME,true,deliverCallback,cancelCallback);
}
}
4. 启动消费者与生产者查看结果
RabbitMQ的工作队列
轮流接收消息
/**
* 测试一个工作线程(相当于一个消费者)
*/
public class Worker01 {
// 队列名称
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
System.out.println("c1等待接收消息。。。。。");
//消息接收
channel.basicConsume(QUERE_NAME, true, deliverCallback, cancelCallback);
}
}
-
编写生产者并启动
/**
* 生产者 发送大量消息
*/
public class Task01 {
public static final String QUERE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
// 队列的声明
channel.queueDeclare(QUERE_NAME,false,false,false,null);
// 从控制台接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUERE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
“
尚硅谷B站RabbitMQ教程:尚硅谷RabbitMQ
”
原文始发于微信公众号(卫颓废):RabbitMQ-RabbitMQ的安装及入门
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/47529.html