做一个消息延时发送的需求,类似于订单超时取消,设定好时间,定时发送短信,因为没有什么并发,也不需要一些其他的中间件,像 MQ 这些增加复杂度,所以就使用 JDK 的提供的延时队列功能。
如果项目中需要延时处理的功能或需求还比较多,并且有大量的请求,建议还是使用中间件,引入适合项目的 MQ 来处理。
JDK(Java Development Kit)中的延时队列(DelayQueue)是一个实现了Delayed接口的无界阻塞队列。它允许我们在一定的延时时间之后获取元素,通常用于实现定时任务调度或按照延时顺序处理任务的场景。
延时队列的实现方案如下:
-
实现 Delayed 接口:Delayed 接口是一个用于定义延时元素的接口,它包含两个方法: getDelay(TimeUnit unit)
和compareTo(Delayed o)
。在实现该接口时,需要根据自己的需求来定义延时时间和比较逻辑。 -
创建延时元素类:延时元素类是实现了Delayed接口的具体类,它表示队列中的每个延时元素。延时元素类需要实现接口中的方法,并提供相应的构造方法来初始化延时时间和其他相关信息。 -
创建延时队列对象:使用DelayQueue类来创建延时队列对象,该类是JDK提供的一个线程安全的无界阻塞队列实现。 -
将延时元素添加到延时队列:通过调用 offer(E e)
或put(E e)
方法将延时元素添加到延时队列中。这些方法会根据延时时间自动将元素放置在合适的位置上。 -
获取延时元素:使用 take()
或poll()
方法从延时队列中获取延时元素。这些方法会在元素到达延时时间之前一直阻塞,并在延时时间到达后返回元素。 -
处理延时任务:获取延时元素后,可以执行相应的延时任务逻辑。延时任务的处理方式因具体需求而异,可以在获取到元素后执行相应的操作或将元素传递给其他模块进行处理。
需要注意的是,延时队列中的延时时间是相对时间,通常使用纳秒作为单位。在实际应用中,可以根据具体需求进行单位转换和逻辑处理。另外,为了使延时队列正常工作,需要保证延时元素的延时时间是正确设置的,否则可能导致延时队列无法按预期工作。
实现步骤:
定义短信消息实体
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 短信消息发送参数
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SmsSendReq {
/**
* 短信文本内容
*/
private String message;
/**
* 手机号
*/
private String phone;
/**
* 发功时间(yyyy-MM-dd HH:mm:ss)
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date sendTime;
}
定义 DelayTask 类,实现 Delayed 接口,代码如下
import lombok.ToString;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延时执行任务
*/
@ToString
public class DelayTask implements Delayed {
private final SmsSendReq data;
//任务延时时间(ms)从加入到延迟队列时开始计时,多少毫秒后执行该任务
private final long expireTime;
public long getExpireTime() {
return expireTime;
}
public SmsSendReq getData() {
return data;
}
/**
* 构造延时任务
*
* @param data 业务数据
* @param expireTime 任务延时时间(ms)
*/
public DelayTask(SmsSendReq data, long expireTime) {
super();
this.data = data;
this.expireTime = (expireTime > 0 ? expireTime : 0) + System.currentTimeMillis();
}
// 获取延时时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expireTime - System.currentTimeMillis(), unit);
}
// 对延时队列中的元素进行排序
@Override
public int compareTo(Delayed o) {
long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (int) delta;
}
}
延时队列任务发布
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
/**
* 延时发布队列任务
*/
@Component
@Slf4j
public class QueueTask implements ApplicationRunner {
private static final DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
/**
* 加入到延时队列中
*
* @param task
*/
public static void put(DelayTask task) {
log.info("加入延时任务,{}", task.getData().getPhone());
delayQueue.put(task);
}
/**
* 稿件任务加入到延时队列中
*/
public static void putTask(SmsSendReq req) {
// 毫秒
log.info("加入延时任务,手机号:{}", req.getPhone());
delayQueue.put(new DelayTask(req, req.getSendTime().getTime() - System.currentTimeMillis()));
}
/**
* 清除延时队列
*/
public static void clearQueue() {
log.info("清除延时队列");
delayQueue.clear();
}
/**
* 取消延时任务
*
* @param task
* @return
*/
public static boolean remove(DelayTask task) {
log.info("取消延时任务:{}", task);
return delayQueue.remove(task);
}
@Override
public void run(ApplicationArguments args) {
log.info("初始化延时队列");
Executors.newSingleThreadExecutor().execute(new Thread(this::executeThread));
initQueueTask();
log.info("队列任务数量:" + delayQueue.size());
}
/**
* 初始化延时队列
*/
public void initQueueTask() {
// TODO 读取未执行的延时任务,业务逻辑处理
}
/**
* 延时任务执行线程
*/
private void executeThread() {
while (true) {
try {
DelayTask task = delayQueue.take();
processTask(task);
log.info("待执行任务队列数量:{}", delayQueue.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("当前线程执行异常,{}", e.getMessage());
}
}
}
/**
* 队列中未执行任务数量
*
* @return
*/
public static int getQueueSize() {
return delayQueue.size();
}
/**
* 内部执行发送短信队列
*
* @param task
*/
private void processTask(DelayTask task) {
log.info("执行延时任务:发送手机号:{}", task.getData().getPhone());
// TODO 待执行业务逻辑
}
}
Controller 方法
import com.ss.common.task.QueueTask;
import com.ss.common.task.SmsSendReq;
@RestController
public class TestController {
@GetMapping("/sendSms")
public void sendSms(SmsSendReq req){
QueueTask.putTask(req);
}
}
请求地址:http://127.0.0.1:8080/sendSms?phone=15000000000&message=你好&sendTime=2023-06-25 16:10:00
注意 url 中的中文字符和日期中间的空格会进行编译
执行结果
原文始发于微信公众号(师小师):JDK 延时队列实现消息定时发送
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/226348.html