ScheduledThreadPoolExecutor 线程池
链接: ScheduledThreadPoolExecutor.
import java.util.concurrent.Future;
public class Entity {
/**
* 订单到期时间
*/
private String orderExpirationTime;
/**
* 定时器Future
*/
private Future future;
/**
* @param orderExpirationTime 订单到期时间
* @param future 定时器
*/
public Entity(String orderExpirationTime, Future future) {
this.orderExpirationTime = orderExpirationTime;
this.future = future;
}
/**
* 获取值
*/
public String getOrderExpirationTime() {
return orderExpirationTime;
}
/**
* 获取Future对象
*/
public Future getFuture() {
return future;
}
}
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.Date;
import java.util.concurrent.*;
/**
* 取消订单定时器缓存
*/
public class CancelOrderTimer {
/**
* key为orderNumber
*/
private final static ConcurrentHashMap<String, Entity> map = new ConcurrentHashMap<>();
/**
* 线程池大小
*/
private static final int POOL_SIZE = 5;
/**
* 过期时间:一分钟
*/
public static final int EXPIRE = 1;
private final static ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(POOL_SIZE,
new BasicThreadFactory.Builder().
namingPattern("cancelOrder-schedule-pool-%d")
.daemon(true)
.build());
public static ScheduledExecutorService getExecutor() {
return executor;
}
/**
* 读取缓存
*/
public static String get(String key) {
Entity entity = map.get(key);
return entity == null ? null : entity.getOrderExpirationTime();
}
/**
* 放入缓存
*/
public static void put(String orderNumber,Future future) {
Date newDate = DateUtil.offset(DateUtil.date(), DateField.MINUTE, EXPIRE);
String orderExpirationTime = DateUtil.formatDateTime(newDate);
map.put(orderNumber, new Entity(orderExpirationTime, future));
}
/**
* 清除缓存并取消定时任务
* @param orderNumber 订单号
* @param mayInterruptIfRunning 是否中断正在执行的任务
*/
public static void remove(String orderNumber, boolean mayInterruptIfRunning) {
Entity entity = map.remove(orderNumber);
if (entity == null) {
return;
}
Future future = entity.getFuture();
if (future != null) {
// 注意:传入true会中断正在执行的任务
future.cancel(mayInterruptIfRunning);
}
}
/**
* 获取订单剩余取消时间
*
* @param orderNumber 订单号
*/
public static String getOrderExpirationTime(String orderNumber) {
long remainMinute = 0;
long remainSecond = 0;
String orderExpirationTime = get(orderNumber);
if (StringUtils.isNotBlank(orderExpirationTime)) {
long s = DateUtil.parse(orderExpirationTime).getTime();
long between = (s - System.currentTimeMillis()) / 1000;
remainMinute = between / 60 % 60;
remainSecond = between % 60;
}
return "订单过期还剩:" + remainMinute + "分钟" + remainSecond + "秒";
}
}
import cn.hutool.core.lang.Console;
import cn.hutool.core.thread.ConcurrencyTester;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.IdUtil;
import com.zm.demo.util.CancelOrderTimer;
import com.zm.demo.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@Validated
public class DelayTestController {
public static Logger log = LoggerFactory.getLogger(CancelOrderTimer.class);
@Autowired
private RedisUtil RedisUtil;
@GetMapping("/put_timer")
public String hello1(@RequestParam String orderNumber){
ConcurrencyTester tester = ThreadUtil.concurrencyTest(100, () -> {
// 测试的逻辑内容
putTimer(IdUtil.simpleUUID());
});
// 获取总的执行时间,单位毫秒
Console.log(tester.getInterval());
return "ok";
}
@GetMapping("/expiration_time")
public String hello2(@RequestParam String orderNumber){
return CancelOrderTimer.getOrderExpirationTime(orderNumber);
}
public void putTimer(String orderNumber) {
CancelOrderTimer.remove(orderNumber,false);
ScheduledExecutorService executor = CancelOrderTimer.getExecutor();
Future future = executor.schedule(() -> {
try {
synchronized (CancelOrderTimer.class) {
// 这里用 redis计录 任务执行成功的次数
RedisUtil.incrBy("one",1);
log.info(orderNumber + "订单号成功执行取消订单");
}
} catch (Exception e) {
e.printStackTrace();
log.error(orderNumber + "订单号执行取消订单失败");
}
}, CancelOrderTimer.EXPIRE, TimeUnit.MINUTES);
CancelOrderTimer.put(orderNumber,future);
log.info( orderNumber + "订单号启动取消订单定时器");
}
}
redis
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.3</version>
</dependency>
spring:
redis:
host: xxxxxxx
port: 6379
timeout: 5000
password: xxxxxx
import java.io.Serializable;
public class TaskBodyDTO implements Serializable {
private String orderNumber;
public String getOrderNumber() {
return orderNumber;
}
public void setOrderNumber(String orderNumber) {
this.orderNumber = orderNumber;
}
}
/**
* 队列事件监听接口,需要实现这个方法
*
* @param <T>
*/
public interface RedisDelayedQueueListener<T> {
/**
* 执行方法
*
* @param t
*/
void invoke(T t);
}
import com.zm.demo.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TestListener1 implements RedisDelayedQueueListener<TaskBodyDTO>{
public static Logger log = LoggerFactory.getLogger(TestListener1.class);
@Autowired
private RedisUtil RedisUtil;
@Override
public void invoke(TaskBodyDTO taskBodyDTO) {
RedisUtil.incrBy("one",1);
log.info( taskBodyDTO.getOrderNumber() + "订单号成功执行取消订单");
}
}
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.*;
/**
* 初始化队列监听
*/
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {
private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);
@Autowired
RedissonClient redissonClient;
/**
* corePoolSize:核心线程数
*
* maximumPoolSize:最大线程数
*
* keepAliveTime + unit:线程回收时间
*
* workQueue:任务较多时暂存到队列
*
* threadFactory:执行程序创建新线程时使用的工厂
*
* handler:超出线程池容量以及队列长度后拒绝任务的策略
*/
private final static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("judge-pool-%d")
.setUncaughtExceptionHandler((thread, throwable)->
logger.error("ThreadPool {} got exception", thread,throwable))
.build();
// 创建线程池,使⽤有界阻塞队列防⽌内存溢出
private final static ExecutorService statsThreadPool = new ThreadPoolExecutor(5, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100), namedThreadFactory);
/**
* 获取应用上下文并获取相应的接口实现类并启动对应的监听线程
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
String listenerName = taskEventListenerEntry.getValue().getClass().getName();
startThread(listenerName, taskEventListenerEntry.getValue());
}
}
/**
* 启动线程获取队列*
*
* @param queueName queueName
* @param redisDelayedQueueListener 任务回调监听
* @param <T> 泛型
* @return
*/
private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
//由于此线程需要常驻,可以新建线程,不用交给线程池管理
Thread thread = new Thread(() -> {
logger.info("启动监听队列线程" + queueName);
while (true) {
try {
T t = blockingFairQueue.take();
statsThreadPool.execute(() -> {
redisDelayedQueueListener.invoke(t);
});
} catch (Exception e) {
logger.info("监听队列线程错误,", e);
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
}
}
}
});
thread.setName(queueName);
thread.start();
}
}
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class RedisDelayedQueue {
@Autowired
RedissonClient redissonClient;
private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);
/**
* 添加队列
*
* @param t DTO传输类
* @param delay 时间数量
* @param timeUnit 时间单位
* @param <T> 泛型
*/
public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
}
@Autowired
RedisDelayedQueue redisDelayedQueue;
@GetMapping("/redis/put_timer")
public String hello(){
ConcurrencyTester tester = ThreadUtil.concurrencyTest(100, () -> {
TaskBodyDTO taskBody = new TaskBodyDTO();
taskBody.setOrderNumber(IdUtil.simpleUUID());
redisDelayedQueue.addQueue(taskBody, CancelOrderTimer.EXPIRE, TimeUnit.MINUTES, TestListener1.class.getName());
});
Console.log("总的执行时间:"+tester.getInterval());
return "ok";
}
消息中间件
RabbitMQ 实现延迟队列
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
绑定交换机和队列的关系
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
public static final String exchange_name = "fanout_order_exchange";
public static final String dead_exchange_name = "dead_order_exchange";
public static final String dead_rout_key = "dead_order";
/**
* 配置交换机
*/
@Bean
public FanoutExchange fanoutOrderExchange() {
return new FanoutExchange(exchange_name, true, false);
}
/**
* 配置ttl队列 存放订单 设置1分钟投入 死信队列
*/
@Bean
public Queue ttlQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
Map<String,Object> args = new HashMap<>();
// 1000/秒
args.put("x-message-ttl",60000);
args.put("x-dead-letter-exchange",dead_exchange_name);
args.put("x-dead-letter-routing-key",dead_rout_key);
return new Queue("cancel.fanout.queue", true, false ,false, args);
}
/**
* 将队列和交换机绑定
*/
@Bean
public Binding bindingFanout() {
return BindingBuilder.bind(ttlQueue()).to(fanoutOrderExchange());
}
/**
* 配置死信交换机
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(dead_exchange_name, true, false);
}
/**
* 配置死信队列
*/
@Bean
public Queue cancelQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("cancel.direct.queue", true);
}
/**
* 将队列和交换机绑定
* @return
*/
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(cancelQueue()).to(deadExchange()).with(dead_rout_key);
}
}
import cn.hutool.core.util.IdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderService {
public static Logger log = LoggerFactory.getLogger(OrderService .class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrder() {
String orderNumber = IdUtil.simpleUUID();
// convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。
// convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。
rabbitTemplate.convertAndSend(RabbitConfig.exchange_name, "", orderNumber);
log.info(orderNumber + "订单号启动取消订单定时器");
}
}
import com.zm.demo.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CancelOrderService {
public static Logger log = LoggerFactory.getLogger(CancelOrderService .class);
@Autowired
private RedisUtil RedisUtil;
/**
* @RabbitListener 监听队列
* @RabbitHandler 代表此方法是一个消息接收的方法;该不要有返回值
*/
@RabbitListener(queues = "cancel.direct.queue")
@RabbitHandler
public void invoke(String message){
RedisUtil.incrBy("one",1);
log.info(message + "订单号成功执行取消订单");
}
}
@Autowired
OrderService OrderService;
@GetMapping("/mq/put_timer")
public String mq(){
ConcurrencyTester tester = ThreadUtil.concurrencyTest(1000, () -> {
OrderService.makeOrder();
});
Console.log("总的执行时间:"+tester.getInterval());
return "ok";
}
使用死信队列实现延时消息的缺点:
1) 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。
2) 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递。
3) 可能存在一定的时间误差。
并发测试对比:服务器性能 1核2G
定时器线程池 100并发
添加任务:87ms
执行任务:8秒
redis 100并发
添加任务:698ms
执行任务:8秒
RabbitMQ 100并发
添加任务:6秒
执行任务:8秒
定时器线程池 1000并发
添加任务:202ms
执行任务:1分钟30秒
redis 1000并发
添加任务:3秒
执行任务:1分钟30秒
RabbitMQ 1000并发
添加任务:1分钟多
执行任务:2分钟多
RabbitMQ 为什么这么慢 代码有问题还是什么原因 有待研究~
具体选择看具体需求及场景
定时器线程池
优点:
使用简单
支持停止任务
执行时间较为准时
缺点:
任务数量大时,占用大量内存
一旦宕机或者执行任务失败,无法重新执行任务,需要写补偿机制
存在较小时间误差
redis 延迟队列
优点:
redis 执行效率快
支持分布式
消息持久化
缺点:
编码实现稍微复杂
没有确认消息可靠消费机制,需要写补偿机制
无法取消任务
存在较小时间误差
RabbitMQ 延迟队列
优点:
解耦
通过生产者可靠消息投递和消费者可靠消息确认机制能确保任务稳定执行
消息持久化
支持分布式
能接收大量的消息
缺点:
编码实现复杂
无法取消任务
存在时间误差
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/133943.html