1、简介
工作队列(又称任务队列),主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
2、实现工作队列
默认轮询分发(按人头分配,表面公平,其实不公平)
1)抽取工具类
package com.tuwer.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 土味儿
* Date 2022/3/24
* @version 1.0
*/
public class RabbitMqUtils {
private Connection connection = null;
private Channel channel = null;
/**
* 获取通道
*
* @param host 主机名
* @param port 端口
* @param username 用户名
* @param password 密码
* @param virtualHost 虚拟机路径
* @param connectionName 连接名
* @return
*/
public Channel getChannel(
String host,
int port,
String username,
String password,
String virtualHost,
String connectionName) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
try {
connection = connectionFactory.newConnection(connectionName);
System.out.println("获取连接!");
channel = connection.createChannel();
System.out.println("获取通道!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return channel;
}
/**
* 关闭通道及连接
*/
public void close() {
if (channel != null && channel.isOpen()) {
try {
channel.close();
System.out.println("通道关闭!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
System.out.println("连接关闭!");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2)两个工作线程
两个工作线程只有连接名称不一样:工作线程1、工作线程2
为了能够循环接收消息,连接不关闭
package com.tuwer.rabbitmq.work.polling;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 土味儿
* Date 2022/3/24
* @version 1.0
*/
public class Work01 {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello";
/**
* 处理消息
*/
public static void main(String[] args) {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 获取通道
Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "工作线程1");
// 接收消息处理方法
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到的消息:" + new String(message.getBody()));
};
// 接收失败处理方法
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息接收失败!" + consumerTag);
};
System.out.println("正在接收信息...");
try {
// 接收处理
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭:关闭后就不能再接收后面的信息
//mqUtils.close();
}
}
}
3)一个发送线程
package com.tuwer.rabbitmq.work.polling;
import com.rabbitmq.client.Channel;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 土味儿
* Date 2022/3/24
* @version 1.0
*/
public class Producer {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello";
/**
* 发送消息
*
* @param args
*/
public static void main(String[] args) {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 获取通道
Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "生产者");
// 声明队列
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
} catch (IOException e) {
e.printStackTrace();
}
// 消息
String message = "";
try {
// 循环发送消息
for (int i = 1; i < 21; i++) {
message = "Hello World! " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("第" + i + "条消息已发送!");
try {
// 休眠1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("发送结束!");
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭
mqUtils.close();
}
}
}
4)测试
- 开启RabbitMq
- 开启发送线程
- 开启两个工作线程开始接收
3、消息应答
1)概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务,并且只完成了部分,突然它挂掉了,会发生什么情况。
RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,RabbitMQ引入消息 应答机制
,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。
2)自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
3)手动应答
(1)方法
Channel.basicAck
肯定确认;RabbitMQ已知道该消息被成功处理,可以将其丢弃Channel.basicNack
否定确认Channel.basicReject
否定确认;与Channel.basicNack相比少了一个参数,不处理该消息了,直接拒绝,可以将其丢弃了。
(2)批量 Multiple
手动应答的好处是
可以批量应答并且减少网络拥堵
true
表示批量应答channel上未应答的消息,比如:channel上有传送tag的消息5,6,7,8,,当前tag是8,那么此时5-8的这些还未应答的消息就会被确认收到消息应答(累积确认)false
同上面相比只会应答tag = 8的消息,5,6,7这三个消息依然不会被确认收到消息应答
(3)重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
(4)实现手动应答
- 消费者(工作线程)
修改消费者(工作线程)代码
- channel.basicConsume()中的自动应答中改为 false
- 消息处理方法DeliverCallback中加入手工应答
public class Work01 {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello";
/**
* 处理消息
*/
public static void main(String[] args) {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 获取通道
Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "工作线程1");
// 接收消息处理方法
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到的消息:" + new String(message.getBody()));
// 休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// ------ 手工确认应答 ------
// 消息标记
long deliveryTag = message.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag,false);
System.out.println(deliveryTag+ " 已处理!");
};
// 接收失败处理方法
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息接收失败!" + consumerTag);
};
System.out.println("正在接收信息...");
try {
// 接收处理
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭:关闭后就不能再接收后面的信息
//mqUtils.close();
}
}
}
(5)测试
4、持久化
刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当RabbitMQ服务停掉以后,未消费的消息不丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:队列
和 消息
都标记为持久化
1)队列持久化
在声明队列的时候把 durable
参数设置为持久化 true
channel.queueDeclare(队列名称, true, 排他性, 自动删除, 其它参数);
需要注意的是如果之前声明的队列不是持久化的,需要把原先队列先删除或者重新创建一个持久化的队列,不然就会出现错误
inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'
2)消息持久化
在发送消息时设置 props
为 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(交换机, 队列名称, MessageProperties.PERSISTENT_TEXT_PLAIN, 消息);
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,就需要发布确认。
5、非公平分发
按劳分配(表面是不公平的,其实是公平的,能者多劳)
- 在消费端设置
// ----- 非公平分发 -----
// prefetchSize 默认为0,表示轮询分发;1就表示非公平分发
// fetch:取来,拿来;prefetch:预取值
int prefetchSize = 1;
// Qos表示服务质量;即为按劳分配
channel.basicQos(prefetchSize);
- 测试
6、预取值
本身消息的发送就是异步的,所以在任何时候,channel上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。
因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
这个时候就可以通过使用 basicQos
方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。
例如:假设在通道上有未确认的消息5、6、7,8,并且通道的预取计数设置为4,此时RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被ack。比方说tag=6这个消息刚刚被确认ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。
消息应答和QoS预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。
虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗(随机存取存储器),应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同100到300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
预取值为1是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/70405.html