前面文章介绍过使用guava中的eventbus 实现发布订阅功能 ,对于普通项目已经能够很好的实现代码解耦,其实在spring中也提供了类似的功能,可以在spring项目中不需要引入第三方依赖的情况下实现发布订阅功能,在spring中主要通过ApplicationContext方法中的publishEvent()方法发布消息,再通过ApplicationListener的实现类接收消息。下面就介绍一下如何使用:
首先需要引入springboot相关依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.11</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
在主启动类上面开启异步,这样在消费者上添加异步注解时就可以实现与生产者线程的解耦,实现异步消费,否则生产者和消费者都是在同一个线程中,不能达到异步效果:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* @Author xingo
* @Date 2023/11/1
*/
@SpringBootApplication
@EnableAsync
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
生产者生产消息可以封装到一个实体类中,这个实体类可以是一个普通的pojo类,这种消息需要通过@EventListener注解实现消息的消费;二是继承ApplicationEvent的实体类,这种消息可以通过@EventListener注解进行消费,也可以通过实现ApplicationListener接口进行消费,下面创建两个用于消息载体的实体类:
import org.springframework.context.ApplicationEvent;
/**
* 事件实体类1
*
* @Author xingo
* @Date 2023/11/1
*/
public class MyEvent1 extends ApplicationEvent {
private String message;
public MyEvent1(Object source) {
super(source);
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
/**
* 事件实体类2
*
* @Author xingo
* @Date 2023/11/1
*/
public class MyEvent2 {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
消息的消费者有两种:第一种方式是通过实现ApplicationListener接口,在泛型中指定消息类型,通过onApplicationEvent方法接收消息并消费:
import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 事件监听方式1
*
* @Author xingo
* @Date 2023/11/1
*/
@Async
@Component
public class MyApplicationListener implements ApplicationListener<MyEvent1> {
@Override
public void onApplicationEvent(MyEvent1 event) {
System.out.println("onApplicationEvent -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
}
}
第二种方式是通过@EventListener注解方式消费数据,它消费的消息不需要继承ApplicationEvent就可以被消费:
import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 事件监听
*
* @Author xingo
* @Date 2023/11/1
*/
@Component
public class MyEventListener {
@Async
@EventListener(MyEvent1.class)
public void listener01(MyEvent1 event) {
System.out.println("listener01 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
}
@Async
@EventListener(MyEvent2.class)
public void listener02(MyEvent2 event) {
System.out.println("listener02 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
}
@EventListener(MyEvent1.class)
public void listener03(MyEvent1 event) {
System.out.println("listener03 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
}
@EventListener(MyEvent2.class)
public void listener04(MyEvent2 event) {
System.out.println("listener04 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
}
}
上面的方法添加@Async后就可以实现异步消费功能,如果不添加该注解,消费者与生产者会在同一个线程中执行。
生产者就比较简单了,只需要在类中注入ApplicationContext,通过publishEvent()方法发送事件消息:
import org.example.pojo.ApiResult;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author xingo
* @Date 2023/11/1
*/
@RestController
public class MessageController {
@Autowired
private ApplicationContext applicationContext;
@GetMapping("/send/message")
public ApiResult sendMessage(String message) {
System.out.println("sendMessage -> thread : " + Thread.currentThread().getName() + " | message : " + message);
MyEvent1 event1 = new MyEvent1(this);
event1.setMessage(message);
MyEvent2 event2 = new MyEvent2();
event2.setMessage(message);
applicationContext.publishEvent(event1);
applicationContext.publishEvent(event2);
return ApiResult.success(null);
}
}
通过上面简单几步就实现了在同一个进程中的事件订阅发布功能,相对来说还是比较简单的,启动程序调用接口:
可以看到控制台打印如下内容:
sendMessage -> thread : http-nio-9523-exec-1 | message : hello,world
listener03 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world","timestamp":1698824483483}
onApplicationEvent -> thread : task-1 | receive : {"message":"hello,world","timestamp":1698824483483}
listener04 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world"}
listener01 -> thread : task-2 | receive : {"message":"hello,world","timestamp":1698824483483}
listener02 -> thread : task-3 | receive : {"message":"hello,world"}
可以看到,添加@Async注解的方法接收消息时会开启一个新的线程,没有添加该注解的方法接收消息与发布消息在同一个线程中。如果要实现真正的解耦,那么在方法上添加@Async是必需的。
这种方式实现的订阅发布,并不能替代真正的消息队列的发布订阅,首先这种方式的生产者和消费者都在一个进程中,不能实现扩容,虽然可以通过配置线程池增加线程数量,但也给系统增加了负担;第二、因为这种发布订阅方式是基于内存的,默认并没有提供持久化来保证消息的不丢失,一旦系统崩溃或重启,对于没有消费的数据就会存在丢失的风险,这种情况就需要在业务中自己实现数据的一致性
框架提供的订阅发布模式将相关功能做了封装,使用场景更通用,如果我们不使用框架,一样可以实现相关功能,只是鸡肋了一些,比如我可以基于阻塞队列达到同样的效果:
import com.alibaba.fastjson.JSONObject;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 基于阻塞队列实现发布订阅功能
*
* @Author wangxixin
* @Date 2023/11/1
*/
public class MyPubSub {
private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
/**
* 发布事件
* @param event
*/
public void publish(String event) {
queue.add(event);
}
/**
* 监听事件
*/
public void listener() {
new Thread(() -> {
while (true) {
try {
String take = queue.take();
System.out.println("rec : " + JSONObject.toJSONString(take));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) {
MyPubSub myPubSub = new MyPubSub();
myPubSub.listener();
myPubSub.publish("Hello,world!");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181847.html