你需要什么
- 大约 15 + 15 分钟
- IntelliJ IDEA或其他编辑器
- JDK 1.8或更高版本
- Maven 3.2+
- RabbitMQ服务器
你会建立什么
您将构建一个使用Spring AMQP
的RabbitTemplate
发布消息并使用MessageListenerAdapter
在POJO
上订阅消息的应用程序。
构建步骤
1、安装 RabbitMQ
。
2、添加 Maven
依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
同时配置一把application.properties
:
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=192.168.56.101
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
3、创建一个RabbitMQ消息接收器。
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Receiver是一个简单的POJO,它定义了一种接收消息的方法。当你注册它接收消息时,你可以任意命名它。
4、注册侦听器
Spring AMQP
的RabbitTemplate
提供了用RabbitMQ
发送和接收消息所需的一切。具体来说,你需要配置:
- 消息侦听器容器(message listener container)
- 声明队列,交换机和它们之间的绑定(Declare the queue, the exchange, and the binding between them)
import com.sqlb.guiderabbitmq.bean.Receiver;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public static final String topicExchangeName = "spring-boot-exchange";
public static final String queueName = "spring-boot";
/** 队列*/
@Bean
Queue queue() {
return new Queue(queueName, false);
}
/**交换机*/
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
/**绑定队列和交换机*/
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
/**消息监听器*/
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
在listenerAdapter()
方法中定义的bean
在container()
中定义的容器中注册为消息侦听器。它将侦听“spring-boot”
队列中的消息。由于Receiver
类是POJO
,因此需要将其包装在MessageListenerAdapter
中,在其中指定它以调用receiveMessage
方法。
在这种情况下,我们使用TopicExchange,队列绑定到路由键 foo.bar.#
。 这意味着任何使用以 foo.bar.
开头的路由键发送的消息,将被路由到队列中。
发送测试消息
import java.util.concurrent.TimeUnit;
import com.sqlb.guiderabbitmq.bean.Receiver;
import com.sqlb.guiderabbitmq.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(RabbitMqConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
测试结果
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16046.html