在springboot中使用事件监听实现消息订阅发布

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。在springboot中使用事件监听实现消息订阅发布,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

前面文章介绍过使用guava中的eventbus 实现发布订阅功能 ,对于普通项目已经能够很好的实现代码解耦,其实在spring中也提供了类似的功能,可以在spring项目中不需要引入第三方依赖的情况下实现发布订阅功能,在spring中主要通过ApplicationContext方法中的publishEvent()方法发布消息,再通过ApplicationListener的实现类接收消息。下面就介绍一下如何使用:

首先需要引入springboot相关依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.11</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.7</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

在主启动类上面开启异步,这样在消费者上添加异步注解时就可以实现与生产者线程的解耦,实现异步消费,否则生产者和消费者都是在同一个线程中,不能达到异步效果:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @Author xingo
 * @Date 2023/11/1
 */
@SpringBootApplication
@EnableAsync
public class ProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProviderApplication.class, args);
    }
}

生产者生产消息可以封装到一个实体类中,这个实体类可以是一个普通的pojo类,这种消息需要通过@EventListener注解实现消息的消费;二是继承ApplicationEvent的实体类,这种消息可以通过@EventListener注解进行消费,也可以通过实现ApplicationListener接口进行消费,下面创建两个用于消息载体的实体类:

import org.springframework.context.ApplicationEvent;

/**
 * 事件实体类1
 *
 * @Author xingo
 * @Date 2023/11/1
 */
public class MyEvent1 extends ApplicationEvent {

    private String message;

    public MyEvent1(Object source) {
        super(source);
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}
/**
 * 事件实体类2
 *
 * @Author xingo
 * @Date 2023/11/1
 */
public class MyEvent2 {

    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

消息的消费者有两种:第一种方式是通过实现ApplicationListener接口,在泛型中指定消息类型,通过onApplicationEvent方法接收消息并消费:

import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 事件监听方式1
 *
 * @Author xingo
 * @Date 2023/11/1
 */
@Async
@Component
public class MyApplicationListener implements ApplicationListener<MyEvent1> {

    @Override
    public void onApplicationEvent(MyEvent1 event) {
        System.out.println("onApplicationEvent -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }
}

第二种方式是通过@EventListener注解方式消费数据,它消费的消息不需要继承ApplicationEvent就可以被消费:

import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 事件监听
 *
 * @Author xingo
 * @Date 2023/11/1
 */
@Component
public class MyEventListener {

    @Async
    @EventListener(MyEvent1.class)
    public void listener01(MyEvent1 event) {
        System.out.println("listener01 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }

    @Async
    @EventListener(MyEvent2.class)
    public void listener02(MyEvent2 event) {
        System.out.println("listener02 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }

    @EventListener(MyEvent1.class)
    public void listener03(MyEvent1 event) {
        System.out.println("listener03 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }

    @EventListener(MyEvent2.class)
    public void listener04(MyEvent2 event) {
        System.out.println("listener04 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }
}

上面的方法添加@Async后就可以实现异步消费功能,如果不添加该注解,消费者与生产者会在同一个线程中执行。

生产者就比较简单了,只需要在类中注入ApplicationContext,通过publishEvent()方法发送事件消息:

import org.example.pojo.ApiResult;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author xingo
 * @Date 2023/11/1
 */
@RestController
public class MessageController {

    @Autowired
    private ApplicationContext applicationContext;

    @GetMapping("/send/message")
    public ApiResult sendMessage(String message) {
        System.out.println("sendMessage -> thread : " + Thread.currentThread().getName() + " | message : " + message);

        MyEvent1 event1 = new MyEvent1(this);
        event1.setMessage(message);
        MyEvent2 event2 = new MyEvent2();
        event2.setMessage(message);

        applicationContext.publishEvent(event1);
        applicationContext.publishEvent(event2);
        return ApiResult.success(null);
    }
}

通过上面简单几步就实现了在同一个进程中的事件订阅发布功能,相对来说还是比较简单的,启动程序调用接口:

可以看到控制台打印如下内容:

sendMessage -> thread : http-nio-9523-exec-1 | message : hello,world
listener03 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world","timestamp":1698824483483}
onApplicationEvent -> thread : task-1 | receive : {"message":"hello,world","timestamp":1698824483483}
listener04 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world"}
listener01 -> thread : task-2 | receive : {"message":"hello,world","timestamp":1698824483483}
listener02 -> thread : task-3 | receive : {"message":"hello,world"}

可以看到,添加@Async注解的方法接收消息时会开启一个新的线程,没有添加该注解的方法接收消息与发布消息在同一个线程中。如果要实现真正的解耦,那么在方法上添加@Async是必需的。

这种方式实现的订阅发布,并不能替代真正的消息队列的发布订阅,首先这种方式的生产者和消费者都在一个进程中,不能实现扩容,虽然可以通过配置线程池增加线程数量,但也给系统增加了负担;第二、因为这种发布订阅方式是基于内存的,默认并没有提供持久化来保证消息的不丢失,一旦系统崩溃或重启,对于没有消费的数据就会存在丢失的风险,这种情况就需要在业务中自己实现数据的一致性

框架提供的订阅发布模式将相关功能做了封装,使用场景更通用,如果我们不使用框架,一样可以实现相关功能,只是鸡肋了一些,比如我可以基于阻塞队列达到同样的效果:

import com.alibaba.fastjson.JSONObject;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 基于阻塞队列实现发布订阅功能
 *
 * @Author wangxixin
 * @Date 2023/11/1
 */
public class MyPubSub {

    private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

    /**
     * 发布事件
     * @param event
     */
    public void publish(String event) {
        queue.add(event);
    }

    /**
     * 监听事件
     */
    public void listener() {
        new Thread(() -> {
            while (true) {
                try {
                    String take = queue.take();
                    System.out.println("rec : " + JSONObject.toJSONString(take));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void main(String[] args) {
        MyPubSub myPubSub = new MyPubSub();
        myPubSub.listener();

        myPubSub.publish("Hello,world!");
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181847.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!