0、安装前环境准备
本篇是基于Linux操作系统中的安装,故先准备一台干净的Linux操作系统,并在系统上先安装好JDK和Maven,本文中所有的操作基于CentOS8进行安装演示;
1、官网下载RocketMQ安装包
cd /usr/local/
mkdir source
cd source/
wget https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
2、解压RocketMQ安装包,阅读其中的README.md文件
unzip rocketmq-all-4.9.2-bin-release.zip
cd rocketmq-4.9.2/
vim README.md
3、启动RocketMQ服务
cd bin/
./mqnamesrv
4、新起一个终端窗口,修改runbroker.sh文件
cd /usr/local/source/rocketmq-4.9.2/bin
vim runbroker.sh
默认设置的虚拟机内存参数过大,正常人的虚拟机根本跑不起来,改小点可以启动。
./mqbroker -n localhost:9876
5、新起一个终端窗口,测试RocketMQ功能
修改tools.sh文件
cd /usr/local/source/rocketmq-4.9.2/bin
vim tools.sh
执行 ./tools.sh org.apache.rocketmq.example.quickstart.Producer 启动MQ生产者
新起一个终端窗口启动MQ消费者
cd /usr/local/source/rocketmq-4.9.2/bin
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
6、RocketMQ中各角色的解读
NameServer:主要的功能是用来收集其它角色的信息,相当于一个中介,维护了一个服务的列表,知道有哪些服务还存活。底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点;
Broker:面向Producer和Consumer接受和发送消息,向NameServer提交自己的信息。是消息中间件的消息存储、转发服务器。每个Broker节点在启动时,都会遍历NameServer列表,于每个NameServer建立长连接,注册自己的信息,之后定时上报;
Producer:消息生产者。通过集群中一个节点建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等;
Consumer:消息消费者。通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
7、RocketMQ的HelloWorld
生产者Producer代码:
package com.feenix.rocketmq.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.nio.charset.StandardCharsets; /** * @Author: Feenix * @CreateTime: 2022-02-24 16:20 * @Version: 1.0 * @Description: */ public class Producer { private static final String PRODUCER_GROUP = "feenix_group"; private static final String NAMESRV_ADDR = "192.168.159.149:9876"; private static final String TOPIC = "feenix_topic"; public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); // 设置NameServer的地址 producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); // Topic >> 消息将要发送到的地址 // body >> 真正流转的消息内容 String body = "Hello World By RocketMQ!"; Message message = new Message(TOPIC, body.getBytes()); SendResult result = producer.send(message); System.out.println("SendResult:" + result); // producer.shutdown(); } }
消费者Consumer代码:
package com.feenix.rocketmq.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @Author: Feenix * @CreateTime: 2022-02-24 16:44 * @Version: 1.0 * @Description: */ public class Consumer { private static final String CONSUMER_GROUP = "feenix_consumer_group"; private static final String NAMESRV_ADDR = "192.168.159.149:9876"; private static final String TOPIC = "feenix_topic"; private static final String FILTER = "*"; public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); // 每个consumer只能订阅一个topic consumer.subscribe(TOPIC, FILTER); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : messageExtList) { System.out.println(new String(messageExt.getBody())); } // 默认情况下这条消息只会被一个consumer消费 // 当被消费之后,告诉服务器消费成功,broker会将成功消费的消息剔除掉不再消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("consumer start....."); } }
8、RocketMQ中的事务处理
package com.feenix.rocketmq.transaction; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.nio.charset.StandardCharsets; /** * @Author: Feenix * @CreateTime: 2022-03-01 17:50 * @Version: 1.0 * @Description: */ public class Producer { private static final String PRODUCER_GROUP = "feenix_group_transaction"; private static final String TOPIC = "feenix_topic_transaction"; private static final String NAMESRV_ADDR = "192.168.159.149:9876"; public static void main(String[] args) throws MQClientException { TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); // 回调 producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { // 执行本地事务 System.out.println("----- executeLocalTransaction -----"); System.out.println("message:" + new String(message.getBody())); System.out.println("message:" + message.getTransactionId()); // 事务执行成功 return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // Broker端回调,检查事务 System.out.println("----- checkLocalTransaction -----"); System.out.println("message:" + new String(messageExt.getBody())); System.out.println("message:" + messageExt.getTransactionId()); // 事务执行成功 return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); TransactionSendResult sendResult = producer.sendMessageInTransaction(new Message(TOPIC, "带事务的消息!".getBytes()), null); System.out.println("sendResult:" + sendResult); /*producer.shutdown(); System.out.println("生产者已停机!");*/ } }
9、RocketMQ中的消息重试机制
// 同步发送时,重试次数,默认2 producer.setRetryTimesWhenSendAsyncFailed(2); // 是否向其它broker发送请求,默认false producer.setRetryAnotherBrokerWhenNotStoreOK(false);
10、RocketMQ中的消息顺序保证
Producer代码:
package com.feenix.rocketmq.queue; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import java.nio.charset.StandardCharsets; import java.util.List; /** * @Author: Feenix * @CreateTime: 2022-03-02 11:37 * @Version: 1.0 * @Description: */ public class Producer { private static final String PRODUCER_GROUP = "feenix_group_queue"; private static final String TOPIC = "feenix_topic_queue"; private static final String NAMESRV_ADDR = "192.168.159.149:9876"; public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 20; i++) { Message message = new Message(TOPIC, ("message-orderly-" + i).getBytes(StandardCharsets.UTF_8)); producer.send( // 待发送消息 message, // queue选择器,手动选择一个queue new MessageQueueSelector() { @Override public MessageQueue select( // 当前topic中包含的所有queue List<MessageQueue> list, // 具体要发送的消息 Message message, // 对应到send方法中的参数 Object obj) { // 向固定的一个queue中写消息 MessageQueue messageQueue = list.get((Integer) obj); return messageQueue; } }, 0, 2000); } } }
Consumer代码:
package com.feenix.rocketmq.queue; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @Author: Feenix * @CreateTime: 2022-03-02 13:51 * @Version: 1.0 * @Description: */ public class Consumer { private static final String CONSUMER_GROUP = "feenix_consumer_queue"; private static final String NAMESRV_ADDR = "192.168.159.149:9876"; private static final String TOPIC = "feenix_topic_queue"; private static final String FILTER = "*"; public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, FILTER); /*consumer.registerMessageListener(new MessageListenerConcurrently() { // 并发消费,开启多个线程 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { return null; } });*/ // 最大开启线程数 // consumer.setConsumeThreadMax(); // 最小开启线程数 // consumer.setConsumeThreadMin(); consumer.registerMessageListener(new MessageListenerOrderly() { // 顺序消费,对每个queue都会开启一个线程 @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt messageExt : list) { System.out.println(new String(messageExt.getBody()) + Thread.currentThread().getName()); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/111929.html