1、实现步骤
- jdk1.8
- 构建一个maven工程
- 导入rabbitmq的maven依赖
- 启动rabbitmq-server服务
- 定义生产者
- 定义消费者
- 观察消息的在rabbitmq-server服务中的过程
2、构建一个maven工程
3、导入rabbitmq依赖
1)Java原生依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
2)spring依赖
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-amqp -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.2</version>
</dependency>
3)springboot依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.4</version>
</dependency>
可以根据自已的项目环境选择上面三种依赖形式
4、启动rabbitmq-server服务
systemctl start rabbitmq-server
# 或者 docker启动
docker start myrabbit
5、定义生产者
package com.tuwer.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 土味儿
* Date 2022/3/23
* @version 1.0
* -------------
* 生产者
* 供应商(provider)
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.19.101");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2、创建连接
Connection connection = null;
// 3、获取通道
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
// 4、通过通道声明队列queue存储消息
String queueName = "queue1";
/**
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他(私有的),如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息
* @param arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
*/
channel.queueDeclare(queueName, false, false, false, null);
// 5、准备消息
String message = "Hello World! " + LocalDateTime.now();
// 6、发送消息
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
6、定义消费者
package com.tuwer.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 土味儿
* Date 2022/3/23
* @version 1.0
* ------------
* 消费者
*/
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.19.101");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
// 2、创建连接
Connection connection = null;
// 3、获取通道
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
channel = connection.createChannel();
// 4、通过通道接收消息
String queueName = "queue1";
/**
* @param queue 队列名称
* @param autoAck 自动应答;消费者接收到消息后告知服务器
* @param deliverCallback 接收处理
* @param cancelCallback 接收失败时处理
*/
channel.basicConsume(
queueName,
true,
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到的消息:" + new String(message.getBody()));
}
},
new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失败!");
}
}
);
// 循环读取
System.out.println("开始接收消息....");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 5、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 6、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- lambada表达式简便写法
// 省略...
// 4、通过通道接收消息
String queueName = "queue1";
// 接收处理
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("收到的消息:" + new String(message.getBody()));
};
// 接收失败处理
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息接收失败!");
};
/**
* @param queue 队列名称
* @param autoAck 自动应答;消费者接收到消息后告知服务器
* @param deliverCallback 接收处理
* @param cancelCallback 接收失败时处理
*/
channel.basicConsume(
queueName,
true,
deliverCallback,
cancelCallback
);
// 省略...
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/70408.html