目录
前言
什么是消息中间件
例如,有三个系统A、B、C,A和B、C系统之间存在调用,如果是非异步的情况下,A必须要等待B或C响应之后再进行下一业务。
假设我们加入了消息中间件,那就不一样了,我们可以把消息直接扔给消息中间件,由消息中间件把我们的消息转发给相应的系统即可。
常见的消息中间件:
ActiveMQ 、RabbitMQ、kafka、RcoketMQ
消息中间件的使用场景
异步与解耦
减少系统之间的耦合性,系统之间调用出现问题时不会相互影响。
流量削峰(削峰填谷)
将流量压力从系统分散到MQ之中,保证系统可用性。
数据分发
可以通过MQ实现数据的分发操作,假设A系统作为服务方接口,需要由B、C、D、E四个系统调用,此时引入消息中间件,A系统可以将数据放入MQ之中,由B、C、D、E四个系统去MQ之中取数据即可。
RocketMQ
简介
RocketMQ的角色
NameServer:服务注册与发现中心,消费者的服务和生产者的服务都在此注册。
BroberCluster: 负责消息的存储,如果存在集群架构,则以数据同步的方式将消息从主节点同步到从节点,生产者服务发送消息到它,它存储数据,消费者服务从它里面拿取消息,一般都从主节点发送消息和拿消息。
Producer: 消息生产者,首先,它会向nameServer中注册服务,发送消息时将消息发送给broberCluster。
Consumer: 消息消费者,它会向nameServer中注册服务,从broberCluster中取消息。
RocketMQ中的术语
Topic主题
标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。
Producer Group 生产者分组
标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不支持事务消息。
Consumer Group 消费者分组
标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。
Message Queue 消息队列
简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若Topic同时创建在不通的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。
无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。
每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
Offset 偏移量
RocketMQ中,有很多offset的概念。但通常我们只关心暴露到客户端的offset。一般我们不特指的话,就是指逻辑Message Queue下面的offset。
注: 逻辑offset的概念在RocketMQ中字面意思实际上和真正的意思有一定差别,这点在设计上显得有点混乱。祥见下面的解释。
可以认为一条逻辑的message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset。
max offset
字面上可以理解为这是标识message queue中的max offset表示消息的最大offset。但是从源码上看,这个offset实际上是最新消息的offset+1,即:下一条消息的offset。
min offset:
标识现存在的最小offset。而由于消息存储一段时间后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了,无法被消费。
consumer offset
字面上,可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。
消费者拉取消息的时候需要指定offset,broker不主动推送消息, offset的消息返回给客户端。
consumer刚启动的时候会获取持久化的consumer offset,用以决定从哪里开始消费,consumer以此发起第一次请求。
每次消息消费成功后,这个offset在会先更新到内存,而后定时持久化。在集群消费模式下,会同步持久化到broker,而在广播模式下,则会持久化到本地文件
Tag 标签
RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。
Windows系统下的下载与安装
地址:
前置条件:
Windows系统是64位的
装了64位的JDK1.8
1. 下载
2. 解压
3. 配置环境变量:ROCKETMQ_HOME 配置安装目即可
4. 进入bin目录,打开命令行,执行命令,命令执行时,会自动打开黑窗口:
start mqnamesrv.cmd
5. 启动broker:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
6. 至此,安装完成。
Linux下的下载和安装
1. 使用rz命令将压缩包上传。
2. 使用命令进行解压,解压到指定目录:
unzip rocketmq-all-4.8.0-bin-release.zip -d /usr/local/rocketMQ
3. 启动nameserver
nohup sh mqnamesrv &
4. 查看日志:
tail -f ~/logs/rocketmqlogs/namesrv.log
5. 启动borker,首先需要修改配置文件
vi broker.conf
6. 加入一行内容
7. 修改bin目录下的runbroker.sh
8. 启动broker命令:
nohup sh mqbroker -c ./broker.conf -n 39.98.46.94:9876 autoCreateTopicEnable=true &
控制台的安装
下载地址:
地址1:https://codeload.github.com/apache/rocketmq-externals/zip/master
地址2:https://github.com/apache/rocketmq-dashboard
- 将下载的项目导入到idea。
- 修改dashbaord项目中的application.perporties文件,将ip端口改为本地的。
3. 将整个工厂打包成jar文件
4. 通过java-jar的方式启动jar包,浏览器访问地址:http://localhost:8089/#/
5. 如需修改ip和端口,修改以下文件即可
6. 如果不想麻烦,可以直接下载jar文件,jar包下载地址:
https://download.csdn.net/download/weixin_43195884/87409368
消息的发送
普通消息的发送分为三种类型:
同步消息发送:发送后同步等待响应
异步消息发送:发送后无需同步等待响应
单项消息发送:单向发送消息不获取响应结果
同步消息发送
1. 创建springboot项目,或maven项目都行,如果不会请参考:
关于SpringBoot框架,看这篇就够了。_只为code醉的博客-CSDN博客
2. 引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
3. 创建发送消息的类,代码如下:
/**
* 测试rocketMQ发送消息案例
*/
public class SendMassage {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者 DefaultMQProducer 创建时指定分组
DefaultMQProducer groupTest = new DefaultMQProducer("group_test");
//2. 设置NameServer的地址、开启故障延迟
groupTest.setNamesrvAddr("127.0.0.1:9876");
groupTest.setSendLatencyFaultEnable(true);
//3. 启动DefaultMQProducer的实例
groupTest.start();
//4. 循环10次,每次都发送一次消息
for (int i = 0; i < 10; i++) {
//5. 创建消息 指定主题topic 标签tag 和消息体 message 消息体需要转换为byties
Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//6. 发送消息
SendResult send = groupTest.send(message);
//7. 打印出发送消息结果
System.out.println("result: "+send);
}
//8. 不再发送消息 就关闭Producer实例
groupTest.shutdown();
}
}
4. 同步消息发送适合场景:短信通知等重要场景,这时启动nameserver、启动broker、启动仪表盘项目,执行上面的测试类:
5. 测试类执行结果,消息发送成功
6. 访问仪表盘,查看主题的状态,可以看到发送消息的偏移量maxOffset,加一起就是10,验证消息已经发送成功。
异步消息发送
直接将消息发出,并不持续等待响应,而是通过回调函数去执行成功和失败方法
1. 启动nameserver、启动broker、启动仪表盘jar包。
2. 新增测试类:
/**
* 测试rocketMQ发送消息案例----异步发送消息并获取响应
*/
public class SendMassageAsyncModel {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者 DefaultMQProducer 创建时指定分组
DefaultMQProducer groupTest = new DefaultMQProducer("group_test");
//2. 设置NameServer的地址、开启故障延迟
groupTest.setNamesrvAddr("127.0.0.1:9876");
//3. 启动DefaultMQProducer的实例
groupTest.start();
//4. 循环10次,每次都发送一次消息
for (int i = 0; i < 10; i++) {
//5. 创建消息 指定主题topic 标签tag 和消息体 message 消息体需要转换为byties
Message message = new Message("TopicTest", "TagA",
("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//6. 发送消息
groupTest.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//7. 打印出发送消息结果
System.out.println("sendResult: "+sendResult);
}
@Override
public void onException(Throwable throwable) {
//8. 打印出发送消息异常
System.out.println("throwable: "+throwable.getMessage());
throwable.printStackTrace();
}
});
}
//8. 不再发送消息 就关闭Producer实例
groupTest.shutdown();
}
}
3. 执行测试类,查看仪表盘项目是否发送成功
单向发送
单向发送消息,不获取响应结果。
/**
* 测试rocketMQ发送消息案例----单向发送消息 并不获取响应结果
*/
public class SendMassageOneWayModel {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者 DefaultMQProducer 创建时指定分组
DefaultMQProducer groupTest = new DefaultMQProducer("group_test");
//2. 设置NameServer的地址、开启故障延迟
groupTest.setNamesrvAddr("127.0.0.1:9876");
//3. 启动DefaultMQProducer的实例
groupTest.start();
//4. 循环10次,每次都发送一次消息
for (int i = 0; i < 10; i++) {
//5. 创建消息 指定主题topic 标签tag 和消息体 message 消息体需要转换为byties
Message message = new Message("TopicTest", "TagA",
("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//6. 单向发送消息
groupTest.sendOneway(message);
}
//8. 不再发送消息 就关闭Producer实例
groupTest.shutdown();
}
}
消息的消费
集群消费
发送的信息依次均摊发送给集群中的A、B、C节点,由各个节点均摊消费消息。
集群消费代码实现:
/**
* rocketMQ中的集群消费模式
*/
public class SendMassageByGroup {
public static void main(String[] args) throws Exception {
//1. 实例化消费者,指定分组名称
DefaultMQPushConsumer groupConsumer = new DefaultMQPushConsumer("group_consumer");
//2. 指定nameserver的信息
groupConsumer.setNamesrvAddr("127.0.0.1:9876");
//3. 通过主题订阅消息 topic
groupConsumer.subscribe("TopicTest","*");
//4. 集群模式消费
groupConsumer.setMessageModel(MessageModel.CLUSTERING);
//5. 注册回调函数,处理消息 监听消息 如果有消息进来就进行处理
groupConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
//6. 循环接收的list 打印消息
for (MessageExt m:list) {
String topic = m.getTopic();
String msg = null;
msg = new String(m.getBody(), "utf-8");
String tags = m.getTags();
System.out.println("收到消息:"+"topic: "+topic+" msg: "+msg+" tags: "+tags);
}
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//7. 启动消费者
groupConsumer.start();
//8. 注销消费者
groupConsumer.shutdown();
}
}
测试步骤:
- 启动生产者服务
- 启动三个集群消费者服务
- 在生产者服务中发送消息
- 查看三个集群消费者服务是否收到消息
- 消息被均摊到消费者服务中
广播消费
发送的所有的消息都被依次发送到A、B、C三个服务中,消息不再均摊发送,而是通知集群中的所有的节点都消费消息,适用于持久化消息到实例中的场景。
广播消费模式代码:
/**
* rocketMQ中的广播消费模式
*/
public class SendMassageByBroad {
public static void main(String[] args) throws Exception {
//1. 实例化消费者,指定分组名称
DefaultMQPushConsumer groupConsumer = new DefaultMQPushConsumer("group_consumer");
//2. 指定nameserver的信息
groupConsumer.setNamesrvAddr("127.0.0.1:9876");
//3. 通过主题订阅消息 topic
groupConsumer.subscribe("TopicTest","*");
//4. 广播模式消费
groupConsumer.setMessageModel(MessageModel.BROADCASTING);
//5. 注册回调函数,处理消息 监听消息 如果有消息进来就进行处理
groupConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
//6. 循环接收的list 打印消息
for (MessageExt m:list) {
String topic = m.getTopic();
String msg = null;
msg = new String(m.getBody(), "utf-8");
String tags = m.getTags();
System.out.println("收到消息:"+"topic: "+topic+" msg: "+msg+" tags: "+tags);
}
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//7. 启动消费者
groupConsumer.start();
//8. 注销消费者
groupConsumer.shutdown();
}
}
测试方法:
- 启动生产者服务
- 启动三个集群消费者服务
- 在生产者服务中发送消息
- 查看三个集群消费者服务是否收到消息
- 消息都被发送到集群中的各个节点上
RocketMQ的顺序消息
顺序消息生产和消费
在创建一个topic主题时,主题中只包含一个Queue,Queue本身就是先进先出,这样就能保障消息的顺序性。
假如存在6条消息,6条消息中分为两类消息,一类是付款、一类是生成订单,一类是发货,此时将订单分类存放在queue中,由三个消费者去消费,消费时确保一个消费者消费一个订单的全流程,这样就保障了顺序性。
生产部分顺序消息:
/**
* 顺序消费 ----部分顺序消息-----生成部分顺序消息
*/
public class ProducerMessageInOrder {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者 DefaultMQProducer 创建时指定分组
DefaultMQProducer orderProducer = new DefaultMQProducer("OrderProducer");
//2. 设置NameServer的地址
orderProducer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动DefaultMQProducer的实例
orderProducer.start();
//4. 生成订单列表
List<OrderInfo> orderInfos = producerOrderList();
//5. 循环订单列表 发送消息
for (int i = 0; i < orderInfos.size(); i++) {
//6. 将对象转换为字符串
String body = orderInfos.get(i).toString();
//7. 创建消息对象
Message mesg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
//8. 发送消息
SendResult send = orderProducer.send(mesg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//9. 根据订单id选择发送queue
Long id = (Long) o;
long index = id % list.size();
return list.get((int) index);
}
}, orderInfos.get(i).getOrderID());
System.out.println("result : "+send.getSendStatus()+
" message queue: "+send.getMessageQueue().getQueueId()+
" body : "+body);
}
//9. 关闭连接
orderProducer.shutdown();
}
public static List<OrderInfo> producerOrderList(){
List<OrderInfo> list=new ArrayList<>();
OrderInfo orderInfo1 = new OrderInfo("订单1", "创建完成",1);
OrderInfo orderInfo2 = new OrderInfo("订单2", "创建完成",1);
OrderInfo orderInfo3 = new OrderInfo("订单3", "创建完成",1);
list.add(orderInfo1);
list.add(orderInfo2);
list.add(orderInfo3);
OrderInfo orderInfo4 = new OrderInfo("订单1", "待付款",2);
OrderInfo orderInfo5 = new OrderInfo("订单2", "待付款",2);
OrderInfo orderInfo6 = new OrderInfo("订单3", "待付款",2);
list.add(orderInfo4);
list.add(orderInfo5);
list.add(orderInfo6);
OrderInfo orderInfo7 = new OrderInfo("订单1", "已付款",3);
OrderInfo orderInfo8 = new OrderInfo("订单2", "已付款",3);
OrderInfo orderInfo9 = new OrderInfo("订单3", "已付款",3);
list.add(orderInfo7);
list.add(orderInfo8);
list.add(orderInfo9);
return list;
}
}
消费部分顺序消息:
/**
* 顺序消费 ---------消费者部分顺序消费消息
*/
public class ConsumerMessageInOrder {
public static void main(String[] args) throws Exception {
//1. 创建消费者对象
DefaultMQPushConsumer orderConsumer1 = new DefaultMQPushConsumer("OrderConsumer1");
//2. 设置远程地址
orderConsumer1.setNamesrvAddr("127.0.0.1:9876");
//3. 设置偏移量
orderConsumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//4. 订阅哪个topic下的哪些消息
orderConsumer1.subscribe("PartOrder","*");
//5. 注册消息监听 顺序消费
orderConsumer1.registerMessageListener(new MessageListenerOrderly() {
Random random= new Random();
//6. 消费消息
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(true);
for (MessageExt msg:list) {
//7. 可以看到每个queue有唯一的consume线程消费 订单对每个queue有序
System.out.println("counsumeThread: "+Thread.currentThread().getName()+
",queueId="+msg.getQueueId()+",content:"+new String(msg.getBody()));
}
//8. 这里可以加上自定义业务处理逻辑 消费队列中的消息
// consumeMessage(list);
return ConsumeOrderlyStatus.SUCCESS;
}
});
//9. 开启
orderConsumer1.start();
}
}
RocketMQ的延时消息
生产者在生产完消息后将消息存储在mq之中,并不希望消费者直接进行消费,而是延时一段时间后推送给消费者。
生产者生产消息,并设置延迟时间
/**
* 延时消息---------生产延时消息
*/
public class ProducerDelayMessage {
public static void main(String[] args) throws Exception {
//1. 创建消费者对象
DefaultMQProducer orderConsumer1 = new DefaultMQProducer("ScheduledProducer");
//2. 设置远程地址
orderConsumer1.setNamesrvAddr("127.0.0.1:9876");
//3. 开启实例
orderConsumer1.start();
int totalMessage=10;
for (int i = 0; i < totalMessage; i++) {
Message scheduledTopic = new Message("ScheduledTopic", ("Hello Scheduled" + i).getBytes());
//4. 设置延时等级
//1-18个等级 1S 5S 10S 30S 1M 2M 3M 4M 5M 6M 7M 8M 9M 10M 20M 30M 1H 2H
scheduledTopic.setDelayTimeLevel(4);
//5. 发送消息
orderConsumer1.send(scheduledTopic);
}
//6. 关闭生产者
orderConsumer1.shutdown();
}
}
生产者消费消息
/**
* 延时消息 ----消费延时消息
*/
public class ConsumerMessageDelay {
public static void main(String[] args) throws Exception {
//1. 创建消费者对象
DefaultMQPushConsumer orderConsumer1 = new DefaultMQPushConsumer("OrderConsumer1");
//2. 设置远程地址
orderConsumer1.setNamesrvAddr("127.0.0.1:9876");
//4. 订阅哪个topic下的哪些消息
orderConsumer1.subscribe("ScheduledTopic","*");
//5. 注册消息监听 顺序消费
orderConsumer1.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg:list) {
//6. 打印消息内容,查看消息是多久之前发过来的
System.out.println("receive message: "+"{"+msg.getMsgId()+"}"+
(msg.getStoreTimestamp()-msg.getBornTimestamp())+"ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//6. 开启
orderConsumer1.start();
//7. 注销消费者
orderConsumer1.shutdown();
}
}
RocketMQ的批量消息
将一条条的数据打成包,批量的发送到MQ。
批量消息生产:
/**
* 批量消息----消息生产者---list大小不超过4m
*/
public class BatchProducerMessage {
public static void main(String[] args) throws MQClientException {
//1. 实例化消息生产者
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
//2. 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动Producer实例
producer.start();
//4. 制作批量消息 将消息放入list
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 2".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 3".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID004", "Hello world 4".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID005", "Hello world 5".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID006", "Hello world 6".getBytes()));
//5. 发送消息
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
//6. 关闭Producer实例。
producer.shutdown();
}
}
批量消息消费:
/**
* 批量消息----消息消费者---list大小不超过4m
*/
public class BatchConsumeMessage {
public static void main(String[] args) throws Exception {
// 1. 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
// 2. 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3. 订阅Topic
consumer.subscribe("BatchTest", "*");
//4. 负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
//5. 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//6. 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
批量消费中,将消息拆分成小于4M的类:
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
/**
* 批量消息-超过4m-生产者
*/
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
String topic = "BatchTest";
//使用List组装
List<Message> messages = new ArrayList<>(100 * 1000);
//10万元素的数组
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//把大的消息分裂成若干个小的消息(1M左右)
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
Thread.sleep(100);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;//1M
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) { this.messages = messages; }
@Override
public boolean hasNext() { return currIndex < messages.size(); }
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {//单个消息超过了最大的限制(1M),否则会阻塞进程
nextIndex++; //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则退出循环
}
break;
}
if (tmpSize + totalSize > sizeLimit) { break; }
else { totalSize += tmpSize; }
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
RocketMQ的过滤消息
生产者在生产消息时将消息打上标签和属性,消费者在消费时对符合自己要求的消息进行过滤,分为tag过滤和sql过滤两种方法。
Tag过滤
过滤消息,消息生产者,在新增消息时,打上自己的tag标签。
/**
* 生产出tag标签的消息
*/
public class ProducerTagMessage {
public static void main(String[] args) throws Exception {
//1. 创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
//2. 设置地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 开启实例
producer.start();
//4. 设定三种tag标签
String[] tags = new String[] {"TagA", "TagB", "TagC"};
//5. 循环发送消息
for (int i = 0; i < 3; i++) {
//6. 在创建消息对象时 参数中带上tag标签
// 参数1 主题标签 参数2 tag标签 参数3 消息主体
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//6. 关闭实例
producer.shutdown();
}
}
过滤消息,消息消费者,消费时取出tag标签进行消费,订阅时就可以利用正则表达式进行过滤。
/**
* 消费tag类型消息 过滤消息
*/
public class ConsumerTagMessage {
public static void main(String[] args) throws Exception {
//1. 创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
//2. 指定远程地址、订阅消费主题、订阅时增加过滤条件
consumer.setNamesrvAddr("127.0.0.1:9876");
//3. 参数1 订阅的主题 参数2 过滤tag的表达式(可以是正则)
consumer.subscribe("TagFilterTest", "TagA || TagB");
//4. 开启消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
//5. 消费消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
//6. 循环 打印消息
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
//7. 可以取出生产者打上的tag标签和自定义属性
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : "
+ msgPro +" ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//8. 开启实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
SQL过滤
通过类似于sql查询的方法去实现对条件的过滤。
Sql过滤,消息的生产者:
/**
* sql----消息消费者sql过滤
*/
public class ProducerSqlMessage {
public static void main(String[] args) throws Exception {
//1. 创建消息生产对象
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
//2. 设置远程地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 开启实例对象
producer.start();
//4. 定义tag标签
String[] tags = new String[] {"TagA", "TagB", "TagC"};
//5. 循环发送消息
for (int i = 0; i < 10; i++) {
//6. 新增消息对象时 增加tag标签属性
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//7. 同时设置sql过滤的属性
msg.putUserProperty("a", String.valueOf(i));
//8. 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//9. 关闭实例
producer.shutdown();
}
}
Sql过滤,消息的消费者:
/**
* 使用sql对消息进行条件过滤
*/
public class ConsumerSqlMessage {
public static void main(String[] args) throws Exception {
//1. 创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
//2. 设置远程地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//3. 订阅消息 同时利用sql对条件进行过滤
//4. 查询标签为TAGA和TAGB、并且属性A的值在0和3之间的消息
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
//5. 注册消息消费者监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
//6. 循环消息 打印出接收的消息
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
//7. 打印出自定义的属性
String msgPro = msg.getProperty("a");
//8. 打印出tag标签
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro +" ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//9. 开启实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
RocketMQ在消息发送时的重要方法和属性
生产消息
/***
* 生产者生产消息的模板
*/
public class ProducerExample {
public static void main(String[] args) throws Exception{
//todo producerGroup:生产者所属组(针对 事务消息 高可用)
DefaultMQProducer producer = new DefaultMQProducer("produce_details");
//todo 默认主题在每一个Broker队列数量(对于新创建主题有效)
producer.setDefaultTopicQueueNums(8);
//todo 发送消息默认超时时间,默认3s (3000ms)
producer.setSendMsgTimeout(1000*3);
//todo 消息体超过该值则启用压缩,默认4k
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
//todo 同步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendFailed(2);
//todo 异步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendAsyncFailed(2);
//todo 消息重试时选择另外一个Broker时(消息没有存储成功是否发送到另外一个broker),默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
//todo 允许发送的最大消息长度,默认为4M
producer.setMaxMessageSize(1024 * 1024 * 4);
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");//106.55.246.66
// 启动Producer实例
producer.start();
//todo 0 查找该主题下所有消息队列
List<MessageQueue> MessageQueue = producer.fetchPublishMessageQueues("TopicTest");
for (int i = 0; i < MessageQueue.size(); i++) {
System.out.println(MessageQueue.get(i).getQueueId());
}
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//todo 单向发送
//todo 1.1发送单向消息
producer.sendOneway(msg);
//todo 1.2指定队列单向发送消息(使用select方法)
producer.sendOneway(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null);
//todo 1.3指定队列单向发送消息(根据之前查找出来的主题)
producer.sendOneway(msg,MessageQueue.get(0));
//todo 同步发送
//todo 2.1同步发送消息
SendResult sendResult0 = producer.send(msg);
//todo 2.1同步超时发送消息(属性设置:sendMsgTimeout 发送消息默认超时时间,默认3s (3000ms) )
SendResult sendResult1 = producer.send(msg,1000*3);
//todo 2.2指定队列同步发送消息(使用select方法)
SendResult sendResult2 = producer.send(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null);
//todo 2.3指定队列同步发送消息(根据之前查找出来的主题队列信息)
SendResult sendResult3 = producer.send(msg,MessageQueue.get(0));
//todo 异步发送
//todo 3.1异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
//todo 3.1异步超时发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
},1000*3);
//todo 3.2选择指定队列异步发送消息(根据之前查找出来的主题队列信息)
producer.send(msg,MessageQueue.get(0),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
//todo 3.3选择指定队列异步发送消息(使用select方法)
producer.send(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费消息
/**
* 消费者
*/
public class ConsumerExample {
public static void main(String[] args) throws Exception {
//todo 属性
//todo consumerGroup:消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("king");
//todo 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
//todo 消息消费模式(默认集群消费)
consumer.setMessageModel(MessageModel.CLUSTERING);
//todo 指定消费开始偏移量(上次消费偏移量、最大偏移量、最小偏移量、启动时间戳)开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//todo 消费者最小线程数量(默认20)
consumer.setConsumeThreadMin(20);
//todo 消费者最大线程数量(默认20)
consumer.setConsumeThreadMax(20);
//todo 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装)
consumer.setPullInterval(0);
//todo 推模式下任务拉取的条数,默认32条(一批批拉)
consumer.setPullBatchSize(32);
//todo 消息重试次数,-1代表16次 (超过 次数成为死信消息)
consumer.setMaxReconsumeTimes(-1);
//todo 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)
consumer.setConsumeTimeout(15);
//todo 获取消费者对主题分配了那些消息队列
Set<MessageQueue> MessageQueueSet = consumer.fetchSubscribeMessageQueues("TopicTest");
Iterator iterator = MessageQueueSet.iterator();
while(iterator.hasNext()){
MessageQueue MessageQueue =(MessageQueue)iterator.next();
System.out.println(MessageQueue.getQueueId());
}
//todo 方法-订阅
//todo 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//todo 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
//todo 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest",MessageSelector.byTag("tagA|TagB"));
//todo 取消消息订阅
consumer.unsubscribe("TopicTest");
//todo 注册监听器
//todo 注册并发事件监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
//没有成功 -- 到重试队列中来
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//todo
}
});
//todo 注册顺序消息事件监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//todo 这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
//不加此项可能会导致消息无序
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
RocketMQ消息存储结构
RocketMQ因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储。所以RocketMQ 采用文件进行存储。
存储文件
- commitLog:消息存储目录
- config:运行期间一些配置信息
- consumerqueue:消息消费队列存储目录
- index:消息索引文件存储目录
- abort:如果存在改文件则Broker非正常关闭
- checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
消息存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
- CommitLog:存储消息的元数据
- ConsumerQueue:存储消息在CommitLog的索引
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
CommitLog
CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home} \store\$ { commitlog} \ $ { fileName}。在CommitLog 中,一个消息的存储长度是不固定的, RocketMQ采取一些机制,尽量向CommitLog 中顺序写 ,但是随机读。commitlog 文件默认大小为lG ,可通过在 broker 置文件中设置 mappedFileSizeCommitLog属性来改变默认大小。
Commitlog文件存储的逻辑视图如下,每条消息的前面4个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。
每个 CommitLog 文件的大小为 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。
每台Rocket只会往一个commitlog文件中写,写完一个接着写下一个。
indexFile 和 ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上。
ConsumeQueue
ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件, 文件地址在$ {$storeRoot} \consumequeue\$ {topicName} \$ { queueld} \$ {fileName}。
ConsumeQueue中存储的是消息条目,为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,消息条目如下:
ConsumeQueue 即为Commitlog 文件的索引文件, 其构建机制是 当消息到达 Commitlog 文件后 由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。
存储机制这样设计有以下几个好处:
1 ) CommitLog 顺序写 ,可以大大提高写入效率。
(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方 但是磁盘随机写的速度只有大概lOOKB/s,和顺序写的性能相差 6000 倍!)
2 )虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。
3 )为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构 ,因为ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog和ConsumeQueue 的一致性, CommitLog 里存储了 Consume Queues 、Message Key、 Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。
IndexFile
RocketMQ还支持通过MessageID或者MessageKey来查询消息;使用ID查询时,因为ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。但是对于用MessageKey来查询消息,RocketMQ则通过构建一个index来提高读取速度。
index 存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 ,提高根据主题与消息检索消息的速度 ,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。(这里不做过多解释)
Config
config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。
topics.json : topic 配置属性
subscriptionGroup.json :消息消费组配置信息。
delayOffset.json :延时消息队列拉取进度。
consumerOffset.json :集群消费模式消息消进度。
consumerFilter.json :主题消息过滤信息。
其他
abort :如果存在 abort 文件说明 Broker 非正常闭,该文件默认启动时创建,正常退出之前删除
checkpoint :文件检测点,存储 commitlog 文件最后一次刷盘时间戳、 consumequeue最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。
RocketMQ的分布式事务解决方案
RocketMQ事务消息流程概要
1、发送消息(half消息).
2、服务端响应消息写入结果。
3、根据发送结果执行本地事物(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
4、根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引, 消息对消费者可见)
补偿流程
5、对没有Commit/Rollback的事务消息(pending状态的消息),定时任务从服务端发起一次“回查”。
6、Producer收到回查消息,检查回查消息对应的本地事务的状态。
7、根据本地事务状态,重新Commit或者Rollback。
RocketMQ事务消息设计
一阶段:Prepared阶段(预备阶段)事务消息在一阶段对用户不可见
发送half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC
消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
二阶段:Commit和Rollback操作(确认阶段)
Commit : 在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;
Rollback : 需要撤销一阶段的消息。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息。
分布式事务代码实现
事务生产者:
/**
* 事务生产者
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//1. 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//2. 创建MQ事务生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
//3. 设置远程服务地址
producer.setNamesrvAddr("127.0.0.1:9876");
//4. 创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//5. 设置生产者回查线程池
producer.setExecutorService(executorService);
//6. 生产者设置监听器
producer.setTransactionListener(transactionListener);
//7. 启动消息生产者
producer.start();
//8. 半事务的发送
try {
Message msg =
new Message("TransactionTopic", null, ("A向B系统转100块钱 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println(sendResult.getSendStatus()+"-"+df.format(new Date()));//半事务消息是否成功
} catch (MQClientException | UnsupportedEncodingException e) {
//todo 回滚rollback
e.printStackTrace();
}
//9. 半事务的发送成功
//10. 一些长时间等待的业务(比如输入密码,确认等操作):需要通过事务回查来处理
for (int i = 0; i < 1000; i++) {
Thread.sleep(1000);
}
//11. 注销实例
producer.shutdown();
}
}
事务消费者:
/**
* 事务消费者
*/
public class TranscationComuser {
public static void main(String[] args) throws Exception {
//1. 创建MQ消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TranscationComsuer");
//2. 设置远程服务地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//3. 订阅消息
consumer.subscribe("TransactionTopic", "*");
//4. 设置消费模式为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//5. 注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
//todo 开启事务
for(MessageExt msg : msgs) {
//todo 执行本地事务 update B...(幂等性)
System.out.println("update B ... where transactionId:"+msg.getTransactionId());
//todo 本地事务成功
System.out.println("commit:"+msg.getTransactionId());
System.out.println("执行本地事务成功,确认消息");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("执行本地事务失败,重试消费,尽量确保B处理成功");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//6. 启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
事务监听器,可在其中加入处理本地事务的逻辑。
/**
* 事务监听器
*/
public class TransactionListenerImpl implements TransactionListener {
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
//todo 执行本地事务 update A...
System.out.println("update A ... where transactionId:"+msg.getTransactionId() +":"+df.format(new Date()));
//System.out.println("commit");
//todo 情况1:本地事务成功
//return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况2:本地事务失败
//System.out.println("rollback");
//return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3:业务复杂,还处于中间过程或者依赖其他操作的返回结果,就是unknow
System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
return LocalTransactionState.UNKNOW;
}
//事务回查 默认是60s,一分钟检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
System.out.println("checkLocalTransaction:"+df.format(new Date()));// new Date()为获取当前系统时间
//todo 情况3.1:业务回查成功!
System.out.println("业务回查:执行本地事务成功,确认消息");
return LocalTransactionState.COMMIT_MESSAGE;
//todo 情况3.2:业务回查回滚!
// System.out.println("业务回查:执行本地事务失败,删除消息");
// return LocalTransactionState.ROLLBACK_MESSAGE;
//todo 情况3.3:业务回查还是UNKNOW!
//System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
//return LocalTransactionState.UNKNOW;
}
}
RocketMQ选择队列的策略
默认策略:轮询+规避
- TopicA创建在双主中,BrokerA和BrokerB中,每一个Broker中有4个队列
- 选择队列是,默认是使用轮训的方式,比如发送一条消息A时,选择BrokerA中的Q4
- 如果发送成功,消息A发结束。
- 如果消息发送失败,默认会采用重试机制
retryTimesWhenSendFailed 同步模式下内部尝试发送消息的最大次数 默认值是2
retryTimesWhenSendAsyncFailed 异步模式下内部尝试发送消息的最大次数 默认值是2
默认不启用Broker故障延迟机制(规避策略):如果是BrokerA宕机,上一次路由选择的是BrokerA中的Q4,那么再次重发的队列选择是BrokerA中的Q1。但是这里的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
注意,这里的规避仅仅只针对消息重试,例如在一次消息发送过程中如果遇到消息发送失败,规避 broekr-a,但是在下一次消息发送时,即再次调用 DefaultMQProducer 的 send 方法发送消息时,还是会选择 broker-a 的消息进行发送,只有继续发送失败后,重试时再次规避 broker-a。
为什么会默认这么设计?
- 某一时间段,从NameServer中读到的路由中包含了不可用的主机
- 不正常的路由信息也是只是一个短暂的时间而已。
生产者每隔30s更新一次路由信息,而NameServer认为broker不可用需要经过120s。
所以生产者要发送时认为broker不正常(从NameServer拿到)和实际Broker不正常有延迟
故障延迟机制策略
开启延迟规避机制,一旦消息发送失败(不是重试的)会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。这个延迟时间就是通过 notAvailableDuration、latencyMax 共同计算的,就首先先计算本次消息发送失败所耗的时延,然后对应 latencyMax 中哪个区间,即计算在 latencyMax 的下标,然后返回 notAvailableDuration 同一个下标对应的延迟值。
比如:在发送失败后,在接下来的固定时间(比如5分钟)内,发生错误的BrokeA中的队列将不再参加队列负载,发送时只选择BrokerB服务器上的队列。
如果所有的 Broker 都触发了故障规避,并且 Broker 只是那一瞬间压力大,那岂不是明明存在可用的 Broker,但经过你这样规避,反倒是没有 Broker 可用来,那岂不是更糟糕了。所以RocketMQ默认不启用Broker故障延迟机制。
负载均衡
Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。
Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/116520.html