RocketMQ 过滤消息、事务消息
之前已经学习了几种常见的消息类型,今天介绍的这两种消息应该是RocketMQ中比较特色的,尤其是事务消息。
过滤消息
在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。
使用Tag过滤消息的消息生产者案例见:
org.apache.rocketmq.example.filter.TagFilterProducer
使用Tag过滤消息的消息消费者案例见:
org.apache.rocketmq.example.filter.TagFilterConsumer
主要是看消息消费者. consumer.subscribe(TagFilterTest”,”TagA 111TagC”;这句只订阅TagA和TagC的消息。
TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一应用可以就用一个Topic,而应用中的不同
业务就用TAG来区分。
但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。这时候,可以使用SQL表达式来对
消息进行过滤。
SQL过滤的消息生产者案例见:
org.apache.rocketmq.example.filter.SqlFilterProducer
SQL过滤的消息消费者案例见:
org.apache.rocketmq.example.filter.SąlFilterConsumer
这个模式的关键是在消费者端使用MessageSelector.bySq(String sql)返回的一个MessageSelector,这里面的sql语句是按照SQL92标准来
执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的?
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
值比较,比如: >, >=, <, <=, BETWEEN, =;
字符比较,比如: =, <>, IN;
15 NULL或者IS NOT NULL辑符号AND, OR, NOT;
常量支持类型为:
•数值,比如: 123, 3.1415;
字符,比如: ‘abe,必须用单引号包裹起来;
NULL,特殊的常量
•值, TRUE或FALSE
使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
事务消息
rocketmq最强大、最复杂的消息
是RocketMQ提供的一个非常有特色的功能,需要着重理解。
首先,我们了解下什么是事务消息。
官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉
及到消息发送者,对于消息消费者来说,并没有什么特别的。
事务消息生产者的案例见:
org.apache.rocketmq.example.transaction.TransactionProducer
# TransactionProducer
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
# TransactionListenerImpl
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事务消息的关键是在TransactionMQProducer中指定了一个Transactiontistener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例
然后,我们要了解下事务消息的使用限制:
1、事务消息不支持延迟消息和批量消息。
2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15次,但是用户可以通过Broker配置文件的transactioncheckMax参数来修改此限制,如果已经检查某条消息超过N次的话(N= transactioncheckMax)则Broker将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
3、事务消息将在Broker配置文件中的参数transactionMsgTimeout这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECKIMMUNITY TIME IN SECONDS来改变这个限制,该参数优先于transactionMsgTimeout参数
4、事务性消息可能不止一次被检查或消费。
5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6、事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者ID查询到消费者。事务消息的实现机制,参见下图:
事务消息是保证整个分布式事务的一半。
事务消息机制的关键是在发送消息时,会将消息转为一half半消息,并存入RocketMQ内部的一个RMQ SYS TRANS HALF TOPIC这个
Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
最后,我们还需要思考下事务消息的作用。
大家想一下这个事务消息跟分布式事务有什么关系?为什么扑到了分布式事务相
具体业务场景
1、订单服务确认消息可用
2、订单服务保存本地事务
3、订单服务制定定时检查支付情况(一定时间内,执行15次),若最终未支付则丢弃事务,本地订单修改状态;若过程中支付成功,则通知下游服务,本地订单我修改状态
通过事务消息,可以高效率、简洁地实现多步骤、延迟场景的事务整体一致性。
原文始发于微信公众号(云户):撸掉RocketMQ 过滤消息、事务消息
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/35307.html