RabbitMQ-RabbitMQ的安装及入门

RabbitMQ

什么是MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

RabbitMQ的优点

由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,社区活跃度高,更新频率快(但是商业版需要收费)

Rabbit的概念

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

四大核心概念

生产者

  • 产生数据发送消息的程序

消费者

  • 消费者大多时候是一个等待接收消息的程序,生产者消费者很多时候都不在同一个机器,同一个机器可以即是生产者又是消费者

交换机

  • 一方面接收生产者的消息,另一方面将消息推送到队列中,可以将消息推送到特定队列或者多个队列,或者将消息丢弃

队列

  • RabbitMQ存储消息的一种数据结构,生产者发送的消息只能存存储在队列中,队列仅仅受主机内存以及磁盘的限制约束,本质上是一个很大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以从一个队列接收数据。

RabbitMQ的安装

  1. 官网地址

    • https://www.rabbitmq.com/download.html
  2. 文件上传

    • 上传到/usr/local/software 目录下(如果没有 software 需要自己创建)
  3. 安装文件(分别按照以下顺序安装)

    • rpm -ivh erlang-21.3-1.el7.x86_64.rpm
    • yum install socat -y
    • rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
  4. 常用命令(按照以下顺序执行)

    • 添加开机启动 RabbitMQ 服务 chkconfig rabbitmq-server on
    • 启动服务 /sbin/service rabbitmq-server start
    • 查看服务状态  /sbin/service rabbitmq-server status
    • 停止服务(选择执行) /sbin/service rabbitmq-server stop
    • 开启 web 管理插件(在页面访问RabbitMQ的管理界面)rabbitmq-plugins enable rabbitmq_management
  5. 访问RabbitMQ的管理界面 http://{安装RabbitMQ机器的IP地址}:15672/,用默认的账号密码guest会出现权限问题(关闭防火墙或开放端口)RabbitMQ-RabbitMQ的安装及入门

    不关闭防火墙的话,需要开放15672和5672两个端口,一个是连接控制台,一个是连接服务 firewall-cmd –zone=public –add-port=15672/tcp –permanent firewall-cmd –zone=public –add-port=5672/tcp –permanent firewall-cmd –reload 如果是云服务器的话,还需要在管理端开放两个端口

    • 创建账号  rabbitmqctl add_user admin 123
    • 设置用户角色 rabbitmqctl set_user_tags admin administrator
    • 设置用户权限

      set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
      rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    • 此处添加新的用户访问
    • 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色 rabbitmqctl list_users
    • 再次使用admin登陆。RabbitMQ-RabbitMQ的安装及入门
  6. 重置命令

    • 关闭应用的命令为 rabbitmqctl stop_app
    • 清除的命令为 rabbitmqctl reset
    • 重新启动命令为 rabbitmqctl start_app

RabbitMQ的入门程序

1. 导入Maven依赖

 <dependencies>
        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

2. 创建消息的生产者

public class Producer {
    // 队列名称
    public static final String QUERE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂IP,连接RabbitMQ队列(安装RabbitMQ机器的IP地址)
        factory.setHost("xxx.xxx.xxx.xxx");
        //用户名
        factory.setUsername("username");
        //密码
        factory.setPassword("password");
        // 创建连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
        /**
         * 声明一个队列
         *  1. 队列名称
         *  2. 队列中的消息是否需要持久话(磁盘),默认情况消息存储在内存中
         *  3. 该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false只能一个消费者消费
         *  4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 true自动删除,false则相反
         *  5. 其他参数
         */

        channel.queueDeclare(QUERE_NAME,false,false,false,null);
        String message = "hello world ";
        /**
         * 发送一个消息
         *  1. 发送到那个哪个交换机
         *  2. 路由的key值是哪个,本次是队列名称
         *  3. 其他参数信息
         *  4. 发送消息的消息体
         */

        channel.basicPublish("",QUERE_NAME,null,message.getBytes());

    }å
}

3. 创建消息的消费者

/**
 * 消费者,接受消息
 */

public class Consumer {

    // 队列名称
    public static final String QUERE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂IP,连接RabbitMQ队列
        factory.setHost("xxx.xxx.xxx.xxx");
        //用户名
        factory.setUsername("username");
        //密码
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         *  1. 消费哪个队列
         *  2. 消费成功之后是否要自动应答 true 代表自动 false 代表手动
         *  3. 推送的消息如何进行消费的接口回调
         *  4. 消费者取消消费的回调
         */

        channel.basicConsume(QUERE_NAME,true,deliverCallback,cancelCallback);
        
    }
}

4. 启动消费者与生产者查看结果

  • 生产者Consumer输出hello worldRabbitMQ-RabbitMQ的安装及入门
  • RabbitMQ管理页面有生成的新队列RabbitMQ-RabbitMQ的安装及入门

RabbitMQ的工作队列

轮流接收消息

  • 仿照上面的程序写两个消费者消费同一个队列或者启动两个工作线程,我这里使用idea启动两个工作线程,勾选下方的标识,允许启动多个线程RabbitMQ-RabbitMQ的安装及入门
/**
 * 测试一个工作线程(相当于一个消费者)
 */

public class Worker01 {
    // 队列名称
    public static final String QUERE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
           xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx

        // 推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到的消息:" + new String(message.getBody()));
        };
        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };
        System.out.println("c1等待接收消息。。。。。");
        //消息接收
        channel.basicConsume(QUERE_NAME, true, deliverCallback, cancelCallback);
    }
}

RabbitMQ-RabbitMQ的安装及入门
在这里插入图片描述
  • 编写生产者并启动
/**
 * 生产者 发送大量消息
 */

public class Task01 {

    public static final String QUERE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
          xxxxxx上述的方式连接RabbitMQ,生成信道xxxxxxx
        // 队列的声明
        channel.queueDeclare(QUERE_NAME,false,false,false,null);

        // 从控制台接收信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUERE_NAME,null,message.getBytes());
            System.out.println("发送消息完成:"+message);
        }
    }
}

  • 通过生产者发送多个消息,查看两个消费者的消费情况,通过程序执行发现生产者总共发送 6个消息,消费者 1 和消费者 2 分别分得3个消息,并且是按照有序的一个接收一次消息RabbitMQ-RabbitMQ的安装及入门RabbitMQ-RabbitMQ的安装及入门RabbitMQ-RabbitMQ的安装及入门

尚硅谷B站RabbitMQ教程:尚硅谷RabbitMQ


原文始发于微信公众号(卫颓废):RabbitMQ-RabbitMQ的安装及入门

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/47529.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!