一. 简介
springboot提供了对activemq的集成,只需要几个简单的注解就可以使用,方便。
该案例可以同时使用queue和topic, 测试无问题。有疑问欢迎留言,必回复。
activemq的环境搭建可以参考我的另一篇文章: linux下安装activemq
源码已上传到github上,路径为: https://github.com/1956025812/activemqdemo
项目down下来运行的时候记得要改mq连接信息哈~
二. 整合步骤
2.1 pom.xml
需要引入:spring-boot-starter-activemq和activemq-pool和fastjson
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yzx</groupId>
<artifactId>activemqdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>activemqdemo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- activemq -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2 application.yml
server:
port: 8080
spring:
activemq:
broker-url: tcp://IP:61616
user: admin
password: admin
pool:
enabled: true
packages:
trust-all: true # 如果使用ObjectMessage传输对象,必须要加上这个信任包,否则会报ClassNotFound异常
jms:
pub-sub-domain: true # 启动主题消息
2.3 ActiveMqConfig
注: 如果需要同时使用queue和topic,则需要引用该配置中指定的containerFactory.
package com.yzx.activemqdemo.demo1;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
@Configuration
public class ActiveMqConfig {
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
2.4 MqProducer
注: 消息生产类,需要使用的时候直接注入即可。
目前支持:字符串, 字符串集合, 对象, 对象集合的队列和主题
package com.yzx.activemqdemo.demo1;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.util.List;
@Service
public class MqProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 发送字符串消息队列
*
* @param queueName 队列名称
* @param message 字符串
*/
public void sendStringQueue(String queueName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
}
/**
* 发送字符串集合消息队列
*
* @param queueName 队列名称
* @param list 字符串集合
*/
public void sendStringListQueue(String queueName, List<String> list) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
}
/**
* 发送对象消息队列
*
* @param queueName 队列名称
* @param obj 对象
*/
public void sendObjQueue(String queueName, Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
}
/**
* 发送对象集合消息队列
*
* @param queueName 队列名称
* @param objList 对象集合
*/
public void sendObjListQueue(String queueName, List<Serializable> objList) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
}
/**
* 发送字符串消息主题
*
* @param topicName 主题名称
* @param message 字符串
*/
public void sendStringTopic(String topicName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);
}
/**
* 发送字符串集合消息主题
*
* @param topicName 主题名称
* @param list 字符串集合
*/
public void sendStringListTopic(String topicName, List<String> list) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);
}
/**
* 发送对象消息主题
*
* @param topicName 主题名称
* @param obj 对象
*/
public void sendObjTopic(String topicName, Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);
}
/**
* 发送对象集合消息主题
*
* @param topicName 主题名称
* @param objList 对象集合
*/
public void sendObjListTopic(String topicName, List<Serializable> objList) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);
}
}
2.5 QueueConsumer
注: 队列消费类,只需要使用@JmsListener注解就可以指定监听的通道,方便。
package com.yzx.activemqdemo.demo1;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage;
import java.util.List;
@Component
public class QueueConsumer {
@JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(String msg) {
System.out.println("接收到消息...." + msg);
}
@JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveStringListQueue(List<String> list) {
System.out.println("接收到集合队列消息...." + list);
}
@JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
System.out.println("接收到对象队列消息...." + objectMessage.getObject());
}
@JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
System.out.println("接收到的对象队列消息..." + objectMessage.getObject());
}
}
2.6 TopicConsumer
注: A主题消费者,用来接收主题类消息。目前搭了俩个消费者,一个A,一个B。
2.6.1 ATopicConsumer
package com.yzx.activemqdemo.demo1;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage;
import java.util.List;
@Component
public class ATopicConsumer {
@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("ATopicConsumer接收到消息...." + msg);
}
@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("ATopicConsumer接收到集合主题消息...." + list);
}
@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject());
}
@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
}
}
2.6.2 BTopicConsumer
package com.yzx.activemqdemo.demo1;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage;
import java.util.List;
@Component
public class BTopicConsumer {
@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("BTopicConsumer接收到消息...." + msg);
}
@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("BTopicConsumer接收到集合主题消息...." + list);
}
@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject());
}
@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
}
}
2.7 User类
注: 用来测试对象的类,复写了toString方法。必须实现Serializable接口。
package com.yzx.activemqdemo.demo1;
import java.io.Serializable;
public class User implements Serializable {
private String id;
private String name;
private Integer age;
public User() {
}
public User(String id, String name, Integer age) {
this.id = id;
this.name = name;
this.age = age;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
2.8 启动类
package com.yzx.activemqdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ActivemqdemoApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqdemoApplication.class, args);
}
}
2.9 单元测试类
package com.yzx.activemqdemo;
import com.yzx.activemqdemo.demo1.MqProducer;
import com.yzx.activemqdemo.demo1.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {
@Autowired
private MqProducer mqProducer;
@Test
public void testStringQueue() {
for (int i = 1; i <= 100; i++) {
System.out.println("第" + i + "次发送字符串队列消息");
mqProducer.sendStringQueue("stringQueue", "消息:" + i);
}
}
@Test
public void testStringListQueue() {
List<String> idList = new ArrayList<>();
idList.add("id1");
idList.add("id2");
idList.add("id3");
System.out.println("正在发送集合队列消息ing......");
mqProducer.sendStringListQueue("stringListQueue", idList);
}
@Test
public void testObjQueue() {
System.out.println("正在发送对象队列消息......");
mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));
}
@Test
public void testObjListQueue() {
System.out.println("正在发送对象集合队列消息......");
List<Serializable> userList = new ArrayList<>();
userList.add(new User("1", "小明", 21));
userList.add(new User("2", "小雪", 22));
userList.add(new User("3", "小花", 23));
mqProducer.sendObjListQueue("objListQueue", userList);
}
@Test
public void testStringTopic() {
for (int i = 1; i <= 100; i++) {
System.out.println("第" + i + "次发送字符串主题消息");
mqProducer.sendStringTopic("stringTopic", "消息:" + i);
}
}
@Test
public void testStringListTopic() {
List<String> idList = new ArrayList<>();
idList.add("id1");
idList.add("id2");
idList.add("id3");
System.out.println("正在发送集合主题消息ing......");
mqProducer.sendStringListTopic("stringListTopic", idList);
}
@Test
public void testObjTopic() {
System.out.println("正在发送对象主题消息......");
mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));
}
@Test
public void testObjListTopic() {
System.out.println("正在发送对象集合主题消息......");
List<Serializable> userList = new ArrayList<>();
userList.add(new User("1", "小明", 21));
userList.add(new User("2", "小雪", 22));
userList.add(new User("3", "小花", 23));
mqProducer.sendObjListTopic("objListTopic", userList);
}
}
2.10 最终目录结构
三. 测试
3.1 测试队列
3.1.1 测试字符串队列
运行testStringQueue方法,输出内容如下:
第1次发送字符串队列消息
第2次发送字符串队列消息
接收到消息....消息:1
接收到消息....消息:2
第3次发送字符串队列消息
接收到消息....消息:3
.........
接收到消息....消息:99
第100次发送字符串队列消息
接收到消息....消息:100
3.1.2 测试字符串集合队列
运行testStringListQueue方法,输出内容如下:
正在发送集合队列消息ing......
接收到集合队列消息....[id1, id2, id3]
3.1.3 测试对象队列
运行testObjQueue方法,输出内容如下:
正在发送对象队列消息......
接收到对象队列消息....User{id='1', name='小明', age=20}
3.1.4 测试对象集合队列
运行testObjListQueue方法,输出内容如下:
正在发送对象集合队列消息......
接收到的对象队列消息...[User{id='1', name='小明', age=21}, User{id='2', name='小雪', age=22}, User{id='3', name='小花', age=23}]
3.2 测试主题
3.2.1 测试字符串主题
运行testStringTopic方法,输出内容如下:
第1次发送字符串主题消息
第2次发送字符串主题消息
ATopicConsumer接收到消息....消息:1
BTopicConsumer接收到消息....消息:1
第3次发送字符串主题消息
BTopicConsumer接收到消息....消息:2
ATopicConsumer接收到消息....消息:2
.......
第99次发送字符串主题消息
ATopicConsumer接收到消息....消息:98
BTopicConsumer接收到消息....消息:98
第100次发送字符串主题消息
ATopicConsumer接收到消息....消息:99
BTopicConsumer接收到消息....消息:99
BTopicConsumer接收到消息....消息:100
ATopicConsumer接收到消息....消息:100
3.2.2 测试字符串集合主题
运行testStringListTopic方法,输出内容如下:
正在发送集合主题消息ing......
ATopicConsumer接收到集合主题消息....[id1, id2, id3]
BTopicConsumer接收到集合主题消息....[id1, id2, id3]
3.2.3 测试对象主题
运行testObjTopic方法,输出内容如下:
正在发送对象主题消息......
ATopicConsumer接收到对象主题消息....User{id='1', name='小明', age=20}
BTopicConsumer接收到对象主题消息....User{id='1', name='小明', age=20}
3.2.4 测试对象集合主题
运行testObjListTopic方法,输出内容如下:
正在发送对象集合主题消息......
BTopicConsumer接收到的对象集合主题消息...[User{id='1', name='小明', age=21}, User{id='2', name='小雪', age=22}, User{id='3', name='小花', age=23}]
ATopicConsumer接收到的对象集合主题消息...[User{id='1', name='小明', age=21}, User{id='2', name='小雪', age=22}, User{id='3', name='小花', age=23}]
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17793.html