如何使用MQ(java代码实现)
<!--指定jdk版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.7</version>
</dependency>
</dependencies>
生产者 – 消息队列 – 消费者
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/*
生产者,目标是发消息
*/
public class Product {
//队列名称
public static final String QUEUE_NAME="hello";
//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//建立一个连接方式
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ的队列
factory.setHost("www.littlehei.fun");
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//创建链接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
//生成一个队列
/*
生成队列
参数1,队列名称
参数2,队列里边的消息是否持久化(磁盘)默认情况消息存储在内存中
参数3,该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false:只能一个消费者消费
参数4,是否自动删除,最后一个消费者断开链接以后,该队是否自动删除,true,自动删除,false不删除
参数5,其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message = "hello world";//初次使用
/*
发送一个消费
参数1,发送到哪个交换机
参数2.路由的key值是哪个? 本次是队列的名称
参数3,其他参数配置
参数4,发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送成功");
}
}
消费者:用来接受生成者产生的消息。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
消费者,用来消费生成者产生的代码
*/
public class Consumer {
//队列的名称:
private static final String QUEUE_NAME="hello";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("www.littlehei.fun");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明
DeliverCallback deliverCallback = (consumerTag,message)->{
// String message = new String("自己手动去创建一个消息,但是不推荐");
System.out.println(new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/*
消费者接收消息
参数1,消费哪个队列
参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
参数3,消费者未成功消费的回调
参数4,消费者取录消费的回到
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作队列
生成者 —大量发消息– 队列 – 接到消息–工作线程1或者工作线程2.。。。
轮训处理消息,你一个,我下一个,他下下个。
不同工作线程之间的关系是竞争关系
创建链接工具类:
public class GetConnection {
//建立一个工具类,每次都直接使用,减少代码重复量
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip地址");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
轮训分发代码:
/*
一个工作线程,可以多个创建,可以多线程创建,具体看你自己如何定于
*/
public class WorkThread1 {
//首先还是创建一个队列名称
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
/*
消费者接收消息
参数1,消费哪个队列
参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
参数3,消费者未成功消费的回调
参数4,消费者取录消费的回到
*/
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收到的消息为" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消息被取消消费接口回调");
};
System.out.println("C1等待接收消息...");
//消息接收
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
生产者:
/*
生产者,可以发送大量消息
*/
public class Product1 {
//队列名称
public static final String QUEUE_NAEM="hello";
//发送大量消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//队列声明
channel.queueDeclare(QUEUE_NAEM,false,false,false,null);
//发送消息
//从控制台中输入
Scanner sc = new Scanner(System.in);
//判断是否有下一个消息输入
while (sc.hasNext()){
String name = sc.next();
channel.basicPublish("",QUEUE_NAEM,null,name.getBytes());
System.out.println("发送完成: "+name);
}
}
}
----------结果----------
nihap
发送完成nihap
wp1
发送完成wp1
ci1
发送完成ci1
wooda
发送完成wooda
我喜欢你
发送完成我喜欢你
------------------
C1等待接收消息...
接收到的消息为nihap
接收到的消息为ci1
接收到的消息为我喜欢你
--------------------
C2等待接收消息...
接收到的消息为wp1
接收到的消息为wooda
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197989.html