目录
四、JMS的可靠性 (重要)
一、JMS是什么
Java平台上的专业技术规范
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
那么什么是Java消息服务?
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
二、MQ中间件的其他落地产品
(一)种类
(二)对比
三、JMS的组成结构和特点
(一)JMS provider
实现JMS接口和规范的消息中间件,也就是我们的MQ服务器
(二)JMS producer
消息生产者,创建和发送JMS消息的客户端应用
(三)JMS consumer
消息消费者,接收和处理JMS消息的客户端应用
(四)JMS message
(1)消息头
- JMSDestination:消息目的地
- JMSDeliveryMode:消息持久化模式
- JMSExpiration:消息过期时间
- JMSPriority:消息的优先级
- JMSMessageID:消息的唯一标识符。
-
package com.at.activemq.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsProduce_topic { public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; public static final String TOPIC_NAME = "topic01"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer messageProducer = session.createProducer(topic); for (int i = 1; i < 4 ; i++) { TextMessage textMessage = session.createTextMessage("topic_name--" + i); // 这里可以指定每个消息的目的地 textMessage.setJMSDestination(topic); /* 持久模式和非持久模式。 一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。 一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。 */ textMessage.setJMSDeliveryMode(0); /* 可以设置消息在一定时间后过期,默认是永不过期。 消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。 如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。 如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。 */ textMessage.setJMSExpiration(1000); /* 消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。 JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。 */ textMessage.setJMSPriority(10); // 唯一标识每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。 textMessage.setJMSMessageID("ABCD"); // 上面有些属性在send方法里也能设置 messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****"); } }
(2)消息体
TextMessage ——普通字符串消息,包含一个string
MapMessage ——Map类型的消息,key为string类型,而值为Java的基本类型
BytesMessage ——二进制数组消息,包含一个byte[]
StreamMessage ——Java数据流消息,用标准流操作来顺序的填充和读取。
ObjectMessage ——对象消息,包含一个可序列化的Java对象
尝试TextMessage和MapMessage
生产者
//6.通过MessageProducer生成3条消息到MQ队列中
for(int i = 1 ; i <= 3; i ++ ) {
TextMessage textMessage = session.createTextMessage("MessageListener--" + i);
producer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("k1","v"+i);
producer.send(mapMessage);
}
消费者
//通过监听方式读取消息
/*
异步非阻塞方式(监听器onMessage())
订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器
当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。*/
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
if(null != message && message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("监听到消息:"+mapMessage.getString("k1"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
(3)消息属性
如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。
他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
尝试StringProperty、
生产者
消费者
注:setStringProperty对应getStringProperty
四、JMS的可靠性
(一)PERSISTENT:持久性
(1)参数说明
①持久化(服务器宕机后,消息依旧存在,activemq默认选择)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
②非持久化(服务器宕机后,消息依旧存在)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
注:测试的时候人为的关闭虚拟机中的activemq
(2)持久化的topic
topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。
生产者
public static final String ACTIVEMQ_URL = "tcp://192.179.123.10:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for(int i = 1 ; i <= 3; i ++ ) {
TextMessage textMessage = session.createTextMessage("TextMessage--" + i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("topicMessage消息发布到MQ完成……");
}
消费者
public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
// 设置客户端ID。向MQ服务器注册自己的名称
connection.setClientID("marrry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
// 之后再开启连接
connection.start();
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
订阅者z3接收消息后下线(offline)
z3下线期间再发送3条topic
z3上线后收到消息
注意:
- 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。
- 然后再运行生产者发送消息。
- 之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。
(二)事务
(1)生产者
①true
创建会话时选择开启事务,则消息会被放入队列中,只有当执行了session.commit后才会发送到activemq
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
session.commit();
②false
创建会话时选择关闭事务,则当消息生产者调用send方法后,消息会立即发送到activemq
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
③好处
当业务复杂时,如果在程序执行过程中出错可以选择rollback,保证了消息的可靠性和程序的事务性
try {
//程序正常 session.commit()
} catch (){
//异常回滚 session.rollback()
}finally {
if(null != session)
session.close();
}
(2)消费者
①true
如果没有加上session.commit(), 那么消息会被重复消费
②false
无需commit,不会被重复消费
(三)Acknowledge:签收
(1)事务
Session.SESSION_TRANSACTED 与上面事务选择true搭配
(2)非事务
①自动签收
Session.AUTO_ACKNOWLEDGE
②手动签收
Session.CLIENT_ACKNOWLEDGE 需要客户端调用acknowledge()方法,不然会重复消费
//3.创建会话session //3.1 两个参数 ①事务 ②签收 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); //4.创建目的地(queue or topic) Queue queue = session.createQueue(QUEUE_NAME); //5.创建消费者 MessageConsumer consumer = session.createConsumer(queue); while(true) { //2000ms timeout时间 ActiveMQ有五种消息类型,这里发送时是TextMessage,接收时强转一下 TextMessage textMessage = (TextMessage) consumer.receive(2000); if(null != textMessage) { System.out.println("接收到Tx消息:" + textMessage.getText()); textMessage.acknowledge(); } else { break; } }
③允许重复消息(了解即可)
Session.DUPS_OK_ACKNOWLEDGE
生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。
消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。
消费者不开启事务,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)
问:消费者和生产者需要同时操作事务才行吗?
答:消费者和生产者的事务,完全没有关联,各自是各自的事务。
五、JMS点对点总结
六、JMS发布订阅总结
(一)非持久化订阅
(二) 持久化订阅
(三)用哪个
当所有的消息都必须被接收,则使用持久化订阅。当丢失消息能够被容忍,则使用非持久化订阅
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/118475.html