要实现延时处理消息,常用的选型方案有消息队列、Redis中的zset。这两种方案都项目解耦,并且能够很好的进行分布式扩展,对于大型项目是首选的方案。而小项目中我们不必要为了一个小需求而搭建消息队列或Redis,如果只有几条需要延迟处理的消息,我们可以选择jdk提供的DelayQueue队列,非常小巧而且能够满足我们大部分的业务需求。
要使用延迟队列,首先我们要定义一个实体类,这个实体类必须实现java.util.concurrent.Delayed
接口:
import lombok.Data;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延迟消息事件
*
* @Author xingo
* @Date 2023/11/17
*/
@Data
public class DelayEvent implements Delayed {
/**
* 触发时间戳
*/
private long triggerTime;
/**
* 数据内容
*/
private String data;
/**
* 构建延迟消息事件
* @param data 事件内容
* @param timeout 触发时间:当前时间距离触发事件的剩余时间
* @param unit 时间格式
*/
public DelayEvent(String data, int timeout, TimeUnit unit) {
this.data = data;
this.triggerTime = unit.toMillis(timeout) + System.currentTimeMillis();
}
/**
* 构建延迟消息事件
* @param data 事件内容
* @param triggerTime 触发时间:事件发生的毫秒时间戳
*/
public DelayEvent(String data, long triggerTime) {
this.data = data;
this.triggerTime = triggerTime;
}
/**
* 返回延迟事件的剩余时间,如果结果返回0或者负数,表示延迟时间已经到了,事件会被触发
* @param unit 数据需要被转换的时间类型
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 通过与当前时间比较来判断是否到达执行时间
long diff = this.triggerTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
/**
* 队列排序比较器,用来对入队数据进行排序,通常都是升序排列:
* 当此对象小于、等于或大于指定对象时,返回负整数、零或正整数。
* @param obj 被比较对象
* @return
*/
@Override
public int compareTo(Delayed obj) {
DelayEvent data = (DelayEvent) obj;
long diff = this.triggerTime - data.triggerTime;
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
这样当对象加入DelayQueue队列时就可以自动实现按照触发时间进行排序,DelayQueue底层通过调用getDelay()方法来判断是否到达执行时间来决定是否要返回队列头部对象。
定义一个DelayQueue对象,用于存放延迟执行的任务,同时需要开启一个线程异步消费到期需要执行的数据:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
/**
* 事件延时队列
*
* @Author xingo
* @Date 2023/11/17
*/
@Slf4j
public class DelayEventList {
/**
* 延迟队列
*/
private final DelayQueue<DelayEvent> queue = new DelayQueue<>();
/**
* 将延迟执行的事件加入队列
* @param event 处理事件对象
*/
public void add(DelayEvent event) {
try {
// 添加数据到延迟队列
this.queue.add(event);
log.info("添加延迟处理数据|{}|{}", event.getTriggerTime(), event.getData());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 开启一个线程异步消费到期数据
*/
public void run() {
new Thread(() -> {
while (true) {
try {
// 阻塞等待数据,有数据时会被唤醒
DelayEvent event = queue.take();
log.info("消费延迟处理数据|{}|{}", event.getTriggerTime(), event.getData());
} catch (Exception e) {
e.printStackTrace();
}
}
}, "delay-consumer").start();
}
}
上面两个类定义好后就实现了一个小型的延迟消息处理功能,下面测试一下延迟处理功能:
import java.util.concurrent.TimeUnit;
public class DelayEventTest {
public static void main(String[] args) {
DelayEventList delay = new DelayEventList();
delay.run();
// 1分钟后触发事件
delay.add(new DelayEvent("Hello,world1!-1", 1, TimeUnit.MINUTES));
// 2秒后触发事件
long triggerTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS);
delay.add(new DelayEvent("Hello,world!-2", triggerTime));
}
}
执行这个main方法后可以看到输出如下内容:
15:00:43.979 [main] INFO org.example.utils.DelayEventList - 添加延迟处理数据|1700204503978|Hello,world1!-1
15:00:43.982 [main] INFO org.example.utils.DelayEventList - 添加延迟处理数据|1700204445981|Hello,world!-2
15:00:45.981 [delay-consumer] INFO org.example.utils.DelayEventList - 消费延迟处理数据|1700204445981|Hello,world!-2
15:01:43.979 [delay-consumer] INFO org.example.utils.DelayEventList - 消费延迟处理数据|1700204503978|Hello,world1!-1
添加的两个数据分别在2s后和1min后执行,达到了延迟消费的目的。
这种延迟数据的处理方式非常适合小型项目,不用额外的组件,而且代码也简洁易懂,我们可以通过定时任务+延迟队列的方式来满足场景需求:通过定时任务将数据库中要执行的任务放入延迟队列,延迟时间可以根据订单时间来指定。这种方案降低了应用服务缓存过多数据导致的内存压力,同时也把延迟时间降低到了可接受的范围。
但这种延迟队列不方便扩展,对于集群式部署还是要选择第三方支持的延迟队列,不但达到了服务解耦而且更容易扩容。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/181832.html