交换机(exchanges)
当使用到交换机的时候,我们用的就不是普通的模式了,而是发布订阅模式了。
生产者生成的消息不会直接发送到队列,而是直接将消息先发送到交换机,并且只能发送到交换机,之钱的我们可以直接发送到队列(事实上我们走的是默认交换机),然后队列交给消费者,现在不行了,改成先发送给交换机,交换机在发给队列。
交换机的工作方式很简单,他接收来自生产者的消息,另一方面将他们推送到队列。
交换机的类型
直接(direct),主题(topic) ,标题(heads),扇出(fanout)
无名exchanges
事实上就是默认类型,我们通过空字符(“”)串进行标识。
channel.basicPublic("","hello",null,message.getBytes());
实际上第一个参数就是交换机的名字,空字符串表示默认或者无名称交换机,消息能路由发送到队列中,其实是由routing(bindingkey)绑定key指定的,如果它存在的话。
临时队列
实际上是自定义的,一旦断开链接,这个队列就会被删除。
绑定
实际上就是交换机和队列的桥梁,他告诉我们交换机和那个队列进行绑定关系
Fanout
fanout是一种交换机的类型,这种类型非常简单,正如名称中猜想的那样,他是将接收的所有消息广播到他知道的所有队列中,系统中默认有exchanges交换机类型。
/*
发消息 交换机
*/
public class EmitLog {
//交换机的名称
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:" + message);
}
}
}
/*
消息接收,客户端
*/
public class ReceiveLog {
//交换机的名称
public static final String EXCHANG_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANG_NAME,"fanout");
//声明一个队列,来一个临时队列
/*
队列的名称是随机的。
当消费者断开与队列的连接的时候,队列就可以自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/*
绑定交换机和队列
*/
channel.queueBind(queueName,EXCHANG_NAME,"");
System.out.println("等待接受消息,把消息显示在屏幕上");
//接受消息
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println("Receive控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
};
//消费者取消消息时回调接口
channel.basicConsume(queueName,true,deliverCallback,consumer->{
});
}
}
Direct
从图上我们可以看到,X绑定了两个队列,绑定类型是direct,队列Q1绑定建为orange,队列2绑定键有两个,一个为black,另一个为green.
在这种情况下,发布这发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1绑定键为blackgreen的消息会被发布到队列Q2,其他消息类型就被丢弃。
多重绑定
当然如果exchange的绑定类型是direct,但是他绑定的多个队列的key如果多相同在这种情况下虽然绑定类型是direct,但是他表现的就和fanout有点想类似了,就和广播差不多了。
这边就可以指定发给谁了,我不发给谁他就收不到消息,只有我允许他接收他才能得到接收。
public class DirectLogs {
//交换机的名称
public static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:" + message);
}
}
}
public class ReceiveLogDirect01 {
public static final String EXCHANGE_NAME ="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个交换机
channel.queueDeclare("console",false,false,false,null);
/*
绑定交换机和队列
*/
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
//接受消息
DeliverCallback deliverCallback = (consumer, message)->{
System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
};
//消费者取消消息时回调接口
channel.basicConsume("console",true,deliverCallback,consumer->{});
}
}
public class ReceiveLogDirect02 {
public static final String EXCHANGE_NAME ="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个交换机
channel.queueDeclare("disk",false,false,false,null);
/*
绑定交换机和队列
*/
channel.queueBind("disk",EXCHANGE_NAME,"error");
//接受消息
DeliverCallback deliverCallback = (consumer, message)->{
System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
};
//消费者取消消息时回调接口
channel.basicConsume("disk",true,deliverCallback,consumer->{});
}
}
Topic
对比上面两种交换机更加完美,当存在我们要接受日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个·时候上边两种交换机就做不到了。此时我们采用的就是topic交换机类型了。
发送类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,他必须是一个单纯列表,以点号隔开,这个单次可以是任意单词。”nysc.xxx”等。限制要求单词列表不能超过255个字节。
需要注意的是
‘ * ’ 代表的是一个单词
‘#’ 代表的是可以替换零个或多个单词。
下图绑定关系如下
Q1–> 绑定的是
中间代orange带3个单词的字符串(* .orange. *)
Q2–> 绑定的是
最后一个单词是rabbit的3个单词(* . *rabbit)
第一个单词是lazy的多个单词(lazy.#)
quick.orange.rabbit 被队列Q1Q2接受到
lazy.orange.elephant 被队列Q1Q2接受到
quick.orange.fox 被队列Q1接受到
lazy.brown.fox 被队列Q2接受到
lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接受一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配Q2
上述加粗的是符合TOPIC类型交换机的单词指令
是最强大的,也是使用最广的
- 当队列绑定关系是#,那么这个队列将接收所有的数据,有点像Fanout了。
- 如果队列绑定的键中没有#和*出现,那么该队列绑定类类型就是direct了。
消费者1
/*
声明主题交换机
*/
public class ReceiveLogsTopic01 {
//老样子定义一个交换机名
public static final String EXCHANGE_NAME= "topic_logs";
//接受消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
//绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接受消息");
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接受队列:——> " +queueName +"绑定键:"+ message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName,true,deliverCallback,consumer->{});
}
}
等待接受消息
被队列Q1Q2接受到
接受队列:——> Q1绑定键:lazy.orange.elephant
被队列Q1Q2接受到
接受队列:——> Q1绑定键:quick.orange.rabbit
被队列Q1接受到
接受队列:——> Q1绑定键:quick.orange.fox
消费者2
/*
声明主题交换机
*/
public class ReceiveLogsTopic02 {
//老样子定义一个交换机名
public static final String EXCHANGE_NAME= "topic_logs";
//接受消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
//绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接受消息");
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接受队列:——> " +queueName +"绑定键:"+ message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName,true,deliverCallback,consumer->{});
}
}
等待接受消息
被队列Q1Q2接受到
接受队列:——> Q2绑定键:lazy.orange.elephant
被队列Q2接受到
接受队列:——> Q2绑定键:lazy.brown.fox
被队列Q1Q2接受到
接受队列:——> Q2绑定键:quick.orange.rabbit
虽然满足两个绑定但只被队列Q2接受一次
接受队列:——> Q2绑定键:lazy.pink.rabbit
是四个单词但匹配Q2
接受队列:——> Q2绑定键:lazy.orange.male.rabbit
生产者
/*
生产者
*/
public class EmitLogTopic {
//老样子,还是定义一个交换机名字,注意这个交换
// 机名字需要和消费者里边定义的交换机一样,避免出错
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道链接
Channel channel = GetConnection.getChannel();
/*
Q1--> 绑定的是
中间代orange带3个单词的字符串(* .orange. *)
Q2--> 绑定的是
最后一个单词是rabbit的3个单词(* . *rabbit)
第一个单词是lazy的多个单词(lazy.#)
quick.orange.rabbit 被队列Q1Q2接受到
lazy.orange.elephant 被队列Q1Q2接受到
quick.orange.fox 被队列Q1接受到
lazy.brown.fox 被队列Q2接受到
lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接受一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配Q2
*/
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","被队列Q1Q2接受到");
map.put("lazy.orange.elephant","被队列Q1Q2接受到");
map.put("quick.orange.fox","被队列Q1接受到");
map.put("lazy.brown.fox","被队列Q2接受到");
map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接受一次");
map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
for (Map.Entry<String, String> mapEntry : map.entrySet()) {
String routingKey = mapEntry.getKey();
String message = mapEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息: " + message);
}
}
}
生产者发送消息: 是四个单词不匹配任何绑定会被丢弃
生产者发送消息: 不匹配任何绑定不会被任何队列接收到会被丢弃
生产者发送消息: 被队列Q1Q2接受到
生产者发送消息: 被队列Q2接受到
生产者发送消息: 被队列Q1Q2接受到
生产者发送消息: 被队列Q1接受到
生产者发送消息: 虽然满足两个绑定但只被队列Q2接受一次
生产者发送消息: 是四个单词但匹配Q2
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197985.html