rabbitMQ学习-pom配置和生产者消费者

如果你不相信努力和时光,那么成果就会是第一个选择辜负你的。不要去否定你自己的过去,也不要用你的过去牵扯你现在的努力和对未来的展望。不是因为拥有希望你才去努力,而是去努力了,你才有可能看到希望的光芒。rabbitMQ学习-pom配置和生产者消费者,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

如何使用MQ(java代码实现)

   <!--指定jdk版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.7</version>
        </dependency>
    </dependencies>

生产者 – 消息队列 – 消费者

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/*
生产者,目标是发消息
 */
public class Product {
    //队列名称

    public static  final  String QUEUE_NAME="hello";

    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一个连接方式
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接RabbitMQ的队列
        factory.setHost("www.littlehei.fun");
        //用户名
        factory.setUsername("guest");

        //密码
        factory.setPassword("guest");

        //创建链接
        Connection connection = factory.newConnection();

        //获取信道
        Channel channel = connection.createChannel();

        //生成一个队列
        /*
        生成队列
        参数1,队列名称
        参数2,队列里边的消息是否持久化(磁盘)默认情况消息存储在内存中
        参数3,该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false:只能一个消费者消费
        参数4,是否自动删除,最后一个消费者断开链接以后,该队是否自动删除,true,自动删除,false不删除
        参数5,其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello world";//初次使用

        /*
        发送一个消费
        参数1,发送到哪个交换机
        参数2.路由的key值是哪个?  本次是队列的名称
        参数3,其他参数配置
        参数4,发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送成功");
    }

}

在这里插入图片描述
消费者:用来接受生成者产生的消息。

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
消费者,用来消费生成者产生的代码
 */
public class Consumer {
    //队列的名称:
    private static  final String QUEUE_NAME="hello";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("www.littlehei.fun");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection =  factory.newConnection();
        Channel channel =  connection.createChannel();

        //声明
        DeliverCallback deliverCallback = (consumerTag,message)->{
         //  String message = new String("自己手动去创建一个消息,但是不推荐");
            System.out.println(new String(message.getBody()));
        };

        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };

        /*
        消费者接收消息
        参数1,消费哪个队列
        参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
        参数3,消费者未成功消费的回调
        参数4,消费者取录消费的回到
         */

      channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

工作队列

生成者 —大量发消息– 队列 – 接到消息–工作线程1或者工作线程2.。。。
轮训处理消息,你一个,我下一个,他下下个。

不同工作线程之间的关系是竞争关系

创建链接工具类:

public class GetConnection {
    //建立一个工具类,每次都直接使用,减少代码重复量
    public  static Channel getChannel() throws IOException, TimeoutException {
        //创建一个链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("ip地址");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

轮训分发代码:

/*
一个工作线程,可以多个创建,可以多线程创建,具体看你自己如何定于
 */
public class WorkThread1 {
    //首先还是创建一个队列名称
    public  static final   String  QUEUE_NAME = "hello";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

          /*
        消费者接收消息
        参数1,消费哪个队列
        参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
        参数3,消费者未成功消费的回调
        参数4,消费者取录消费的回到
         */
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("接收到的消息为" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println("消息被取消消费接口回调");
        };
        System.out.println("C1等待接收消息...");

        //消息接收
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

生产者:

/*
生产者,可以发送大量消息
 */
public class Product1 {

    //队列名称
    public  static final  String QUEUE_NAEM="hello";

    //发送大量消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAEM,false,false,false,null);

        //发送消息
        //从控制台中输入
        Scanner sc = new Scanner(System.in);
        //判断是否有下一个消息输入

        while (sc.hasNext()){
            String name = sc.next();
            channel.basicPublish("",QUEUE_NAEM,null,name.getBytes());
            System.out.println("发送完成: "+name);
        }

    }

}
----------结果----------
nihap
发送完成nihap
wp1
发送完成wp1
ci1
发送完成ci1
wooda
发送完成wooda
我喜欢你
发送完成我喜欢你
------------------
C1等待接收消息...
接收到的消息为nihap
接收到的消息为ci1
接收到的消息为我喜欢你
--------------------
C2等待接收消息...
接收到的消息为wp1
接收到的消息为wooda

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/197989.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!