SpringBoot整合RocketMQ实录
使用SpringBoot开发的小伙伴也很多,因此spring官方也集成了RocketMQ调用模板,在理解原生API调用的基础上使用会有助于开发效率的提升。
SpringBoot整合
这部分我们看下SpringBoot如何快速集成RocketMQ.
在使用SpringBoot的tarter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-bootstarter.2.0.4版本开发的代码,升级到目前最新的rocketmq-springboot-starter.2.1.1后,基本就用不了了。
需要注意
SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。
apache有一个官方的rocketmq-spring示例,地址: https://github.com/apache/rocketmq-spring.git以后如果版本更新了,可以参考下这个示例代码。
配置信息(生产者)

消费者的配置
消费者的配置都在注释中,@RocketMQMessageListener
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup",topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("Recieved message :"+s);
}
}
//RocketMQMessageListener
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
String consumerGroup();
String topic();
SelectorType selectorType() default SelectorType.TAG;//过滤消息,支持TAG和SQL92
String selectorExpression() default "*";
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;//顺序消费和并发消费
MessageModel messageModel() default MessageModel.CLUSTERING;//集群和广播
int consumeThreadMax() default 64;
long consumeTimeout() default 15L;
String accessKey() default "${rocketmq.consumer.access-key:}";//acl权限控制
String secretKey() default "${rocketmq.consumer.secret-key:}";//acl权限控制
boolean enableMsgTrace() default true;//消息轨迹
String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";//消息轨迹
String nameServer() default "${rocketmq.name-server:}";
String accessChannel() default "${rocketmq.access-channel:}";
}
消息体
SpringBoot的消息体为spring的消息对象,因此获取一些属性时会有些变化。
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
String tags = message.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
//RocketMQHeaders
public class RocketMQHeaders {
public static final String PREFIX = "rocketmq_";
public static final String KEYS = "KEYS";
public static final String TAGS = "TAGS";
public static final String TOPIC = "TOPIC";
public static final String MESSAGE_ID = "MESSAGE_ID";
public static final String BORN_TIMESTAMP = "BORN_TIMESTAMP";
public static final String BORN_HOST = "BORN_HOST";
public static final String FLAG = "FLAG";
public static final String QUEUE_ID = "QUEUE_ID";
public static final String SYS_FLAG = "SYS_FLAG";
public static final String TRANSACTION_ID = "TRANSACTION_ID";
public RocketMQHeaders() {
}
}
多个事务处理的情况
需要新创建一个rocketMQtemplate
@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
将对应的事务处理逻辑对象指向该rocketMQtemplate
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
public class MyTransactionListenerImpl implements RocketMQLocalTransactionListener
原文始发于微信公众号(云户):SpringBoot整合RocketMQ实录
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25812.html