SpringCloudStream集成RocketMQ


SpringCloudStream集成RocketMQ


SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MO消息中间件产品。目前官方支持RabbitMQ、Apache Kafka和Kafka Streams。我们来看看SpringCloudstream如何来集成RocketMQ.

框架封装的目的就是尽可能的简化开发代码,让开发人员只需要专注于自己的业务中。

引入依赖

需要注意版本,由于我们环境搭建的是4.7.1的RocketMQ,因此需要修改对应的版本。

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.2.3.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rockemq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rockemq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.3.RELEASE</version>
        </dependency>

编码

使用sc-stream的代码非常简单,该框架本身就是为了简化消息对接的开发量,个性化的属性配置都在配置文件中进行。

//定义消费者
@Component
public class ScConsumer {
    //关于消费者过滤消息的条件,可以在@StreamListener的condition属性中写SPEL表达式配置
    //但是一般建议还是在配置文件中来配置
    //@StreamListener(value=Sink.INPUT,condition= "headers['"+RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS+"']=='testTag')
    @StreamListener(value = Sink.INPUT)
    public void onMessage(String message){
        System.out.println("recieved message :"+message+"from binding:"+Sink.INPUT );
    }
}
//定义生产者
@Component
public class ScProducer {
    @Resource
    private Source source;
    public void sendMessage(String message){
        Map<String,Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS,"testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> msg = MessageBuilder.createMessage(message,messageHeaders);
        this.source.output().send(msg);

//        MessageBuilder<String> builder = MessageBuilder.withPayload(message)
//                .setHeader(RocketMQHeaders.TAGS,"testTag")
//                .setHeader(RocketMQHeaders.KEYS,"myKey")
//                .setHeader("DELAY","1");
//        Message<String> messages = builder.build();
//        this.source.output().send(messages);
    }
}

//业务测试
@RestController
@RequestMapping("/MQTest")
public class RocketMQController {
    @Resource
    private ScProducer scProducer;
    @GetMapping("/send")
    public String sendMessage(String message){
        scProducer.sendMessage(message);
        return "消息发送完成";
    }
}
//启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding({Source.classSink.class})
@SpringBootApplication
public class ScRocketMQApplication 
{
    public static void main(String[] args) {
        SpringApplication.run(ScRocketMQApplication.class,args);
    }
}

配置文件

这个是sc-stream的重点

#ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#RocketMQ的个性化配置以Spring.cloud.stream.rocketmq开头
#Binder配置
spring.cloud.stream.rocketmq.binder.name-server=192.168.3.210:9876
#spring.cloud.stream.rocketmq.binder.access-key=
#spring.cloud.stream.rocketmq.binder.secret-key=
##是否为Producer和Consumer开启消息轨迹
#spring.cloud.stream.rocketmq.binder.enable-msg-trace=false
##消息轨迹开启后存储的topic名称
#spring.cloud.stream.rocketmq.binder.customized-trace-topic=

#消费者配置
#spring.cloud.stream.rocketmq.bindings.input.consumer.enabled=true
#spring.cloud.stream.rocketmq.bindings.input.consumer.tags=testTag
#spring.cloud.stream.rocketmq.bindings.input.consumer.sql=
#spring.cloud.stream.rocketmq.bindings.input.consumer.broadcasting=false
#spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true
#spring.cloud.stream.rocketmq.bindings.input.consumer.delayLevelWhenNextConsume=-1or0or>0
##同步消费消息模式下消费失败后再次消费的时间间隔
#spring.cloud.stream.rocketmq.bindings.input.consumer.suspendCurrentQueueTimeMillis=testTag

#生产者配置
#spring.cloud.stream.rocketmq.bindings.output.producer.enabled=true
#spring.cloud.stream.rocketmq.bindings.output.producer.group=sendGroup
#spring.cloud.stream.rocketmq.bindings.output.producer.maxMessageSize=true
#spring.cloud.stream.rocketmq.bindings.output.producer.transactional=true
#spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
##是否在VIP channel上发送消息
#spring.cloud.stream.rocketmq.bindings.output.producer.vipChannelEnabled=true
##发送消息的超时时间(毫秒)
#spring.cloud.stream.rocketmq.bindings.output.producer.sendMessageTimeOut=3000
##消息体压缩阈值(当消息体超过4K的时候会被压缩)
#spring.cloud.stream.rocketmq.bindings.output.producer.compressMessageBodyThreshold=4096
##在同步发送消息的模式下,消息发送失败的重试次数
#spring.cloud.stream.rocketmq.bindings.output.producer.retryTimesWhenSendFailed=2
##在同步发送消息的模式下,消息发送失败的重试次数
#spring.cloud.stream.rocketmq.bindings.output.producer.retryTimesWhenSendAsyncFailed=2
##消息发送失败的情况下是否重试其他的broker
#spring.cloud.stream.rocketmq.bindings.output.producer.retryNextServer=true
server.port=8099

注意

关于SpringCloudStream,这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。

例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka, kafka Stream, RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。

所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0,这个差距就非常大了。

另一方面, RocketMQ这帮大神不屑于写文档的问题也特别严重, SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。总之,对于RocketMQ来说, SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。

总结

虽然目前阿里巴巴来维护的rocketmq的stream并不是很实用,但是考虑到stream的这种设计思路,我还是希望自己来花时间实现对于rocketmq的支持。


原文始发于微信公众号(云户):SpringCloudStream集成RocketMQ

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/35295.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!