消息中间件之ActiveMQ的入门使用
一、JMS入门
消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)
常见消息中间件
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
JMS简介
JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS定义一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。类似于 JDBC,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。
JMS 能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
TextMessage:一个字符串对象
MapMessage:一套名称-值对
ObjectMessage:一个序列化的 Java 对象
BytesMessage:一个字节的数据流
StreamMessage : Java 原始值的数据流
JMS消息传递类型
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
二、ActiveMQ下载与安装
浏览器访问地址:http://IP:8161/,进入ActiveMQ管理页面
点击Manage ActiveMQ broker进入管理页面
用户名和密码均为 admin
主界面
创建一个队列
列表各列信息含义:
Number Of Pending Messages :等待消费的消息 ,是当前未出队列的数量。
Number Of Consumers :消费者,是消费者端的消费者数量
Messages Enqueued :进入队列的消息 ,进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息,是消费者消费掉的数量。
三、ActiveMQ入门
1.引入依赖
<dependencies>
<!--ActiveMQ的依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.16.0</version>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.2.9.RELEASE</version>
</dependency>
<!--Spring需要JMS 2.0版本,但activemq-client依赖1.0版本,spring-jms又没有自动导入2.0版本,于是手动引入-->
<!--不引入则可能报错:Caused by: java.lang.NoClassDefFoundError: Ljavax/jms/Destination;
->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- test -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.9.RELEASE</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
2.点对点模式
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
消息生产者
//点对点发送消息
public static void main(String[] args) throws Exception{
//创建链接对象工厂ConnectionFactory JMS只定义了规范
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP:61616");
//创建链接对象Connection
Connection connection = connectionFactory.createConnection();
//开启链接对象
connection.start();
/**
* 创建会话对象Session
*
* transacted :是否开启事务
*
* acknowledgeMode: 应答模式
* 1.AUTO_ACKNOWLEDGE:自动应答模式,客户端手动确认
* 2.CLIENT_ACKNOWLEDGE:客户端应答,客户端手动确认,不会有重复数据
* 3.DUPS_OK_ACKNOWLEDGE:客户端应答,自动批量确认,容易产生重复数据
* 4.SESSION_TRANSACTED: 事务提交并确认
*
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello ActiveMQ!"+Math.random()*1000);
//指定消息发送的目标地址,发送到指定队列中
Queue queue = session.createQueue("hello_activemq");
//创建消息发送对象
MessageProducer messageProducer = session.createProducer(queue);
//消息发送实现
messageProducer.send(textMessage);
//资源关闭
session.close();
connection.close();
}
通过ActiveMQ管理界面查询
消息消费者
public static void main(String[] args) throws Exception{
//创建链接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://119.23.62.62:61616");
//创建链接对象Connection
Connection connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建会话对象Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//指定接收的队列地址
Queue queue = session.createQueue("hello_activemq");
//创建消息接收对象
MessageConsumer consumer = session.createConsumer(queue);
//方式一
//监听模式实现消息接收 创建MessageListener的实现类
//监听模式相当于开了一个线程
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message!=null){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("方式一监听收到的消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
});
//让主线程sleep一会儿,保证收到消息后连接才断开
Thread.sleep(8000);
//方式二
/* while (true){
//接收消息,等待10秒,在10秒内一直处于接收消息状态
Message message = consumer.receive(10000);
if(message!=null){
if(message instanceof TextMessage){
//将消息强转TextMessage对象
TextMessage textMessage = (TextMessage) message;
System.out.println("方式二监听收到的消息:"+textMessage.getText());
//跳出循环
break;
}
}
}*/
//关闭资源
session.close();
connection.close();
}
执行测试
当同时开启多个消费者,再运行生产者,只会有一个消费者接收到消息。
3.发布/订阅模式
消息生产者
public static void main(String[] args) throws Exception{
//创建链接对象工厂ConnectionFactory JMS只定义了规范
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://ip:61616");
//创建链接对象Connection
Connection connection = connectionFactory.createConnection();
//开启链接对象
connection.start();
/**
* 创建会话对象Session
*
* transacted :是否开启事务
*
* acknowledgeMode: 应答模式
* 1.AUTO_ACKNOWLEDGE:自动应答模式,客户端手动确认
* 2.CLIENT_ACKNOWLEDGE:客户端应答,客户端手动确认,不会有重复数据
* 3.DUPS_OK_ACKNOWLEDGE:客户端应答,自动批量确认,容易产生重复数据
* 4.SESSION_TRANSACTED: 事务提交并确认
*
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello Topic!"+Math.random()*1000);
//指定消息发送的目标地址,发送到指定队列中
Topic topic = session.createTopic("topic_test");
//创建消息发送对象
MessageProducer messageProducer = session.createProducer(topic);
//消息发送实现
messageProducer.send(textMessage);
//资源关闭
session.close();
connection.close();
}
消息消费者
public static void main(String[] args) throws Exception{
//创建链接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://119.23.62.62:61616");
//创建链接对象Connection
Connection connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建会话对象Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//指定接收的队列地址
Topic topic = session.createTopic("topic_test");
//创建消息接收对象
MessageConsumer consumer = session.createConsumer(topic);
while (true){
//等待10秒,在10秒内一直处于接收消息状态
Message message = consumer.receive(10000);
if(message!=null){
if(message instanceof TextMessage){
//将消息转换成TextMessage
TextMessage textMessage = (TextMessage) message;
System.out.println("监听收到的消息:"+textMessage.getText());
break;
}
}
}
//关闭资源
session.close();
connection.close();
}
执行测试
当同时开启多个消费者,再运行生产者,每个消费者都会接收到消息。
四、Spring整合ActiveMQ
1.spring-mq.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--包扫描-->
<context:component-scan base-package="cn.ybzy" />
<!-- 产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.211.128:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送,类似RedisTemplate,SolrTemplate,JdbcTemplate,JmsTemplate实现消息发送-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- connectionFactory对应的是定义Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--点对点模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_text"/>
</bean>
<!--消息监听对象-->
<bean class="cn.ybzy.mq.myMessageListener" id="myMessageListener" />
<!--消息监听容器-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--需要工厂包装链接对象-->
<property name="connectionFactory" ref="connectionFactory" />
<!--监听消息的地址-->
<property name="destination" ref="queueDestination" />
<!--监听消息的类-->
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
2.发送简单文本消息
生产者
实现text文本类型数据发送
@Component
public class MessageProducer {
//消息发送对象JmsTemplate
@Autowired
private JmsTemplate jmsTemplate;
//发送地址
@Autowired
private Destination destination;
/**
* 发送消息
* @param msg
*/
public void sendTextMessage(String msg){
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//创建消息对象
TextMessage textMessage = session.createTextMessage();
//设置消息内容
textMessage.setText(msg);
return textMessage;
}
});
}
}
消费者
创建消息监听类
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//文本消息
if(message instanceof TextMessage){
//将消息转成文本消息
TextMessage textMessage = (TextMessage) message;
//获取消息
try {
System.out.println("监听到的消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
单元测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-mq.xml")
public class SpringProducerTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSendTextMessage()throws Exception{
messageProducer.sendTextMessage("send message!");
//线程休眠10s,让消费者监听收到消息
Thread.sleep(10000);
}
}
3.发送MapMessage类型消息
生产者
实现Map数据类型发送
@Component
public class MessageProducer {
//消息发送对象JmsTemplate
@Autowired
private JmsTemplate jmsTemplate;
//发送地址
@Autowired
private Destination destination;
public void sendMapMessage(Map<String,String> dataMap){
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//创建一个MapMessage类型
MapMessage mapMessage = session.createMapMessage();
//设置键值对数据 MapMessage:key=value
mapMessage.setObject("dataMap",dataMap);
return mapMessage;
}
});
}
}
消费者
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//接收MapMessage类型
if(message instanceof MapMessage){
//强转MapMessage类型
MapMessage mapMessage = (MapMessage) message;
try {
//获取对应key的值
Map<String,String> dataMap = (Map<String, String>) mapMessage.getObject("dataMap");
System.out.println(dataMap);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
单元测试
/***
* 发送MapMessage类型测试
*/
@Test
public void testSendMapMessage() throws Exception{
Map<String,String> dataMap = new HashMap<String,String>();
dataMap.put("name","小白");
dataMap.put("age","22");
dataMap.put("phone","666666");
messageProducer.sendMapMessage(dataMap);
//线程休眠10s,让消费者监听收到消息
Thread.sleep(10000);
}
4.发送ObjectMessage类型消息
生产者
创建发送对象User
public class User implements Serializable {
private int id;
private String name;
private int age;
private String phone;
}
@Component
public class MessageProducer {
//消息发送对象JmsTemplate
@Autowired
private JmsTemplate jmsTemplate;
//发送地址
@Autowired
private Destination destination;
public void sendObjectMessage(User user){
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//创建ObjectMessage
ObjectMessage objectMessage = session.createObjectMessage();
//设置发送的数据
objectMessage.setObject(user);
return objectMessage;
}
}
消费者
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(message instanceof ObjectMessage){
//强转成ObjectMessage
ObjectMessage objectMessage = (ObjectMessage) message;
try {
//获取数据,并强转成JavaBean
User user = (User) objectMessage.getObject();
System.out.println(user);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
指定受信任的解析包
ActiveMQ中ObjectMessage类型需要指定受信任的消息解析包,若不进行配置指定则对获得的对象进行强转将报错
javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.itheima.domain.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
spring-mq.xml配置如下,在ActiveMQConnectionFactory注入两个参数中任选其中一中方式。
<!-- 产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!--设置受信任的包,只有这些包下的类型才能被传输解析-->
<!--<property name="trustedPackages">
<list>
<value>java.lang</value>
<value>java.util</value>
<value>cn.ybzy.model</value>
</list>
</property>-->
<!--所有包全部设置成受信任的包-->
<property name="trustAllPackages" value="true" />
<property name="brokerURL" value="tcp://IP:61616"/>
</bean>
单元测试
@Test
public void testSendObjectMessage() throws Exception{
User user = new User();
user.setName("小黑");
user.setAge(22);
user.setPhone("123456");
messageProducer.sendObjectMessage(user);
//线程休眠10s,让消费者监听收到消息
Thread.sleep(10000);
}
5.使用订阅模式
使用订阅模式只需修改spring-mq.xml两处地方,其余不变。
1.更改队列目标地址为Topic类型
<!--订阅模式-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text"/>
</bean>
2.更该消息监听容器监听的消息地址
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerCo
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
执行点对点模式中任意测试,发送/接受正常。
五、ActiveMQ的应用
消息队列主要应用场景:解耦、异步、削峰
这里演示说明“解耦”,多个系统间通信必然存在耦合的情况,引入消息队列就可以解决多系统间的耦合。
如A系统与B系统通信,使用MQ后,A系统只需要向B系统发送消息,B系统接收消息再处理。
消息封装对象
public class MessageInfo implements Serializable {
/**
* 定义不同的业务类型
*/
public static final int USER_SELECT = 1;
public static final int USER_ADD = 2;
public static final int CAR_UPDATE = 3;
public static final int PROESS_DELETE = 4;
/**
* 消息监听器根据type进行不同业务处理
*/
private int type;
/**
* 要发送的内容
*/
private Object context;
public MessageInfo(int type, Object context) {
this.type = type;
this.context = context;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public Object getContext() {
return context;
}
public void setContext(Object context) {
this.context = context;
}
}
A系统创建生产者
@Component
public class MessageProducer {
//消息发送对象JmsTemplate
@Autowired
private JmsTemplate jmsTemplate;
//发送地址
@Autowired
private Destination destination;
public void sendObjectMessage(MessageInfo messageInfo){
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//创建ObjectMessage
ObjectMessage objectMessage = session.createObjectMessage();
//设置发送的数据
objectMessage.setObject(messageInfo);
return objectMessage;
}
});
}
}
B系统创建消费者
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(message instanceof ObjectMessage){
//消息类型强转
ObjectMessage objectMessage = (ObjectMessage) message;
//强转成JavaBean
try {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
//判断对应的操作
if(messageInfo.getType()==MessageInfo.USER_ADD){
//TODO 执行User保存操作
User user = (User) messageInfo.getContext();
System.out.println("save user ...."+user);
}else if(messageInfo.getType()==MessageInfo.CAR_UPDATE){
//TODO 执行Car修改操作
System.out.println("update Car ....");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
执行测试
@Test
public void test()throws Exception{
User user = new User();
user.setName("小黑");
user.setAge(22);
user.setPhone("123456");
//准备发送的消息
MessageInfo messageInfo = new MessageInfo(MessageInfo.USER_ADD,user);
//发送消息
messageProducer.sendObjectMessage(messageInfo);
//线程休眠10s,让消费者监听收到消息
Thread.sleep(10000);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137048.html