消息中间件之ActiveMQ的基本使用
ActiveMQ实现步骤
建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址
通过ConnectionFactory对象创建一个Connection连接
通过Connection对象创建Session会话
通过Session对象创建Destination对象。在点对点模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)
通过Session对象创建消息的发送和接收对象
发送消息
关闭资源
点对点模式
点对点模式的概述
点对点模式涉及:消息队列(Queue) 发送者(Sender) 接收者(Receiver)
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
比如生产方发了 10条消息到 activeMQ 服务器, 而此时有多个 消费方,那么这些消费方就会瓜分这些10条消息,一条消息只会被一个消费方得到。
点对点的特点
1. 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
2. 发送者和接收者之间在时间上没有依赖性,也就是当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
3. 接收者在成功接收消息之后需向队列应答成功
引入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
生产者
public class Producer {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String QUEUE = "myQueue";
public static void main(String[] args) throws JMSException {
//创建ActiveMQConnectionFactory会话工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//启动连接
connection.start();
//Session:一个发送或接收消息的线程
//false:不开启消息事物,true: 表示以事务提交 设置消息可靠性
//Session.AUTO_ACKNOWLEDGE :表示消息自动签收
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建一个队列
Queue queue = session.createQueue(QUEUE);
//MessageProducer:消息生产者
MessageProducer producer = session.createProducer(queue);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送消息
send(producer, session);
System.out.println("发送成功!");
session.close();
connection.close();
}
static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println("我是消息" + i);
//创建文本消息
TextMessage textMessage = session.createTextMessage("我是消息" + i);
//通过消息生产者发出消息
producer.send(textMessage);
}
}
}
消费者
public class receiver {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String QUEUE = "myQueue";
public static void main(String[] args) throws JMSException {
//创建ActiveMQConnectionFactory会话工厂 创建连接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
//启动连接
connection.start();
//不开消息启事物,自动签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建队列
Queue queue = session.createQueue(QUEUE);
//MessageConsumer:消息消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println("接受到消息:" + textMessage.getText());
} else {
break;
}
}
session.close();
connection.close();
}
}
发布订阅模式
发布订阅模式概述
发布订阅模式涉及:主题(Topic) 发布者(Publisher) 订阅者(Subscriber)
发布者将消息发送到Topic,系统将这些消息传递给多个订阅者
特点
1.每个消息可以有多个消费者
2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
3.为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
4.如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
引入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
生产者
public class Producer {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "myTopic";
public static void main(String[] args) throws JMSException {
//创建ActiveMQConnectionFactory会话工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
//创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//启动连接
connection.start();
//Session:一个发送或接收消息的线程
//false:不开启消息事物,true: 表示以事务提交 设置消息可靠性
//Session.AUTO_ACKNOWLEDGE :表示消息自动签收
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建一个队列
//Queue queue = session.createQueue(QUEUE);
Topic topic = session.createTopic(TOPIC);
//MessageProducer:消息生产者
MessageProducer producer = session.createProducer(topic);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送消息
send(producer, session);
System.out.println("发送成功!");
session.close();
connection.close();
}
static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println("我是消息" + i);
//创建文本消息
TextMessage textMessage = session.createTextMessage("我是消息" + i);
//通过消息生产者发出消息
producer.send(textMessage);
}
}
}
消费者
public class receiver {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "myTopic";
public static void main(String[] args) throws JMSException {
//创建ActiveMQConnectionFactory会话工厂 创建连接
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
//启动连接
connection.start();
//不开消息启事物,自动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//创建队列
//Queue queue = session.createQueue(QUEUE);
Topic topic = session.createTopic(TOPIC);
//MessageConsumer:消息消费者
MessageConsumer consumer = session.createConsumer(topic);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println("接受到消息:" + textMessage.getText());
//手动应答
textMessage.acknowledge();
} else {
break;
}
}
session.close();
connection.close();
}
}
消息可靠机制
ActiveMQ消息签收机制:客戶端成功接收一条消息的标志是一条消息被签收,成功应答。
消息的签收情形分两种:带事务的session与不带事务的session
带事务的session
如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
生成者
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//创建文本消息
TextMessage textMessage = session.createTextMessage("我是消息" + i);
//通过消息生产者发出消息
producer.send(textMessage);
//提交消息
session.commit();
消费者
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println("接受到消息:" + textMessage.getText());
session.commit();
}
不带事务的session
不带事务的session的签收方式,取决于session的配置。
1.Session.AUTO_ACKNOWLEDGE
Session.AUTO_ACKNOWLEDGE 消息自动签收
2.Session.CLIENT_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge()方法手动签收
生成者
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
//创建文本消息
TextMessage textMessage = session.createTextMessage("我是消息" + i);
//通过消息生产者发出消息
producer.send(textMessage);
消费者
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
if (textMessage != null) {
System.out.println("接受到消息:" + textMessage.getText());
//手动应答
textMessage.acknowledge();
}
3.Session.DUPS_OK_ACKNOWLEDGE
Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。
在第二次重新传送消息的时候,消息只有在被确认之后,才认为已经被成功地消费了。
ActiveMQ与Spring集成
引入依赖
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
<!--activemq-client中引入的jms是1.x,5.xSpring需要2.x版本的jms-->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
</dependencies>
xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="cn.ybzy"></context:component-scan>
<!-- 产生Connection的ConnectionFactory,由对应的JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!-- Spring用于管理ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--点对点模式 -->
<bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue"/>
</bean>
<!--发布、订阅模式-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 监听类 -->
<bean id="myMessageListener" class="cn.ybzy.MyMessageListener"></bean>
<!-- 消息监听容器,会伴随spring的启动 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="textDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
生产者
@Component
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination textDestination;
public void sendTextMessage(final String text){
jmsTemplate.send(textDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
消费者监听
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("消费者接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring_jms.xml")
public class TestProducer {
@Autowired
private Producer producer;
@Test
public void testSend(){
for (int i = 0; i < 100; i++) {
producer.sendTextMessage("消息 " + i);
}
}
}
ActiveMQ与SpringBoot集成
引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.8.RELEASE</version>
</parent>
<!-- springboot整合activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 集成发送邮件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
生产者
##activemq连接信息
spring:
activemq:
broker-url: tcp://localhost:61616
in-memory: true
pool:
enabled: false
##队列
messages:
queue: mail_queue
发送消息
@Service("registerMailboxProducer")
public class RegisterMailboxProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate ;
public void send(Destination destination,String json){
jmsMessagingTemplate.convertAndSend(destination, json);
}
}
发送消息内容封装
@Value("${messages.queue}")
private String MESSAGES_QUEUE;
@Override
public void regist(User user) {
//TODO 注册逻辑,注册成功后发送邮件
// 指定队列
Destination activeMQQueue = new ActiveMQQueue(MESSAGES_QUEUE);
// 消息内容
String mailMessage = mailMessage(user.getEmail(), user.getUserName());
log.info("###regist() 注册发送邮件内容:{}", mailMessage);
// 发送消息
registerMailboxProducer.send(activeMQQueue, mailMessage);
}
private String mailMessage(String email, String userName) {
JSONObject root = new JSONObject();
JSONObject header = new JSONObject();
header.put("interfaceType", "sms_mail");
JSONObject content = new JSONObject();
content.put("mail", email);
content.put("userName", userName);
root.put("header", header);
root.put("content", content);
return root.toJSONString();
}
消费者
spring:
application:
name: message
activemq:
broker-url: tcp://localhost:61616
in-memory: true
pool:
enabled: false
mail:
host: smtp.163.com
username: XXX@163.com
password: 授权码
消息分发接口
public interface MessageAdapter {
/**
* 消息分发
* @param jsonObject
*/
public void distribute(JSONObject jsonObject);
}
消息监听,进行分发消息
@Slf4j
@Component
public class ConsumerDistribute {
@Autowired
private SMSMailboxService smsMailboxService;
@JmsListener(destination = "mail_queue")
public void distribute(String json) {
log.info("###收到消息,消息内容 json:{}", json);
if (StringUtils.isEmpty(json)) {
return;
}
JSONObject jsonObject = JSON.parseObject(json);
JSONObject header = jsonObject.getJSONObject("header");
String interfaceType = header.getString("interfaceType");
MessageAdapter messageAdapter = null;
switch (interfaceType) {
// 发送邮件
case "sms_mail":
messageAdapter=smsMailboxService;
break;
default:
break;
}
JSONObject content=jsonObject.getJSONObject("content");
messageAdapter.distribute(content);
}
}
发送邮件实现
@Slf4j
@Service
public class SMSMailboxService implements MessageAdapter {
@Autowired
private JavaMailSender mailSender;
@Value("${spring.mail.username}")
private String EMAIL_NAME;
@Override
public void distribute(JSONObject jsonObject) {
String mail=jsonObject.getString("mail");
String userName=jsonObject.getString("userName");
log.info("###消费者收到消息... mail:{},userName:{}",mail,userName);
// 发送邮件
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(EMAIL_NAME);
message.setTo(mail);
message.setSubject("恭喜您注册成功。");
message.setText("恭喜您"+userName+",成为XXX的新用户!");
log.info("###发送短信邮箱 mail:{}", mail);
mailSender.send(message);
}
}
测试
http://127.0.0.1:8762/regist
{
"userName":"小白",
"email":"55005500@qq.com"
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136937.html