传入自定义的监听类,该类继承DefaultConsumer类,重写handleDelivery方法就行。
channel.basicConsume(quequName,false,new MyConsumer(channel));
生产端:
package com.wy.testrabbitmq.zidingyiconsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-12 17:20
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingkey = "test.consumer";
String msg = "test consumer message";
for (int i = 0; i < 7; i++) {
channel.basicPublish(exchangeName, routingkey, true, null, msg.getBytes());
}
}
}
消费端:
package com.wy.testrabbitmq.zidingyiconsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-12 17:21
*/
public class Consumer {
public static void main(String[] args)throws Exception {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName="test_consumer_exchange";
String routingkey="test.*";
String quequName="test_consumer_queueName";
channel.exchangeDeclare(exchangeName,"topic",true,false,null);
channel.queueDeclare(quequName,true,false,false,null);
channel.queueBind(quequName,exchangeName,routingkey);
channel.basicConsume(quequName,true,new TestConsumer(channel));
}
}
自定义类:
/**自定义消费端监听类
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-18 15:31
*/
public class TestConsumer extends DefaultConsumer {
public TestConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
System.out.println("s:"+s);
System.out.println("envelope:"+envelope);
System.out.println("basicProperties:"+basicProperties);
System.out.println("body:"+new String(body));
}
}
测试如下:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/115811.html