前言
尽管我们经过谨慎的评估,仍然不能够保证一次计算出来来的线程池参数是合适的,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢?基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下
实现
分布式配置中心这里使用的是Apollo
核心依赖
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.1.0</version>
</dependency>
<!-- apollo动态更新SpringBean需要的依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
配置文件
thread.pool.corePoolSize = 8
thread.pool.maximumPoolSize = 10
实现代码
@Component(ThreadPoolManager.PACKAGE_BEAN_NAME)
@ConfigurationProperties("thread.pool")
@RefreshScope
@Data
public class ThreadPoolManager implements DisposableBean {
private int corePoolSize;
private int maximumPoolSize;
private int queueCapactiy;
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);
public static final String PACKAGE_BEAN_NAME = "ThreadPoolManager";
/**
* 定义线程池
*/
private ThreadPoolExecutor threadPool;
@PostConstruct
public void init() {
this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 1800, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapactiy),
new ThreadFactoryBuilder().setDaemon(false).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* @param name
* @param runnable
*/
public void execute(final String name, final Runnable runnable) {
execute(name, runnable, null);
}
/**
* @param name
* @param runnable
* @param exceptionHandler 异常处理
*/
public void execute(final String name, final Runnable runnable, final Thread.UncaughtExceptionHandler exceptionHandler) {
threadPool.execute(() -> {
final Thread currentThread = Thread.currentThread();
setName(currentThread, name);
if (Objects.isNull(exceptionHandler)) {
//默认只打印异常日志
currentThread.setUncaughtExceptionHandler((thread, exception) -> {
logger.error("线程执行异常-{}", currentThread.getName(), exception);
});
} else {
currentThread.setUncaughtExceptionHandler(exceptionHandler);
}
runnable.run();
});
}
/**
* 销毁线程池
* @throws Exception
*/
@Override
public void destroy() throws Exception {
MoreExecutors.shutdownAndAwaitTermination(threadPool, 20, TimeUnit.SECONDS);
}
/**
* @param callable
* @return submit方式提交任务,则异常不能被异常处理器捕获,如果一个由submit提交的任务由于抛出了异常而结束,那么这个异常将被Future.get封装在ExecutionException中重新抛出
*/
public <T> Future<T> submit(String name, final Callable<T> callable) {
return threadPool.submit(() -> {
setName(Thread.currentThread(), name);
return callable.call();
});
}
private void setName(Thread thread, String name) {
thread.setName(name);
}
@ApolloConfigChangeListener("thread-pool")
private void configChangeListter(ConfigChangeEvent changeEvent) {
ConfigChange corePoolSizeChange = changeEvent.getChange("thread.pool.corePoolSize");
reflushThreadPoll(corePoolSizeChange, ThreadPoolExecutor::setCorePoolSize, threadPool);
ConfigChange maximumPoolSizeChange = changeEvent.getChange("thread.pool.maximumPoolSize");
reflushThreadPoll(maximumPoolSizeChange, ThreadPoolExecutor::setMaximumPoolSize, threadPool);
logger.info("线程池已更新");
}
private void reflushThreadPoll(ConfigChange configChange, BiConsumer<ThreadPoolExecutor, Integer> supplier, ThreadPoolExecutor threadPool) {
if (Objects.nonNull(configChange)) {
String newValue = configChange.getNewValue();
supplier.accept(threadPool, Integer.parseInt(newValue));
}
}
}
对于动态更新的Queue很遗憾的是JDK原生中并没有相关实现
常用的ArrayBlockingQueue的存储方式是使用一个Object数组,而且是被声明为final,也就是说初始化之后就无法再改变它的大小LinkedBlockingQueue也是如此
但是我们可以参考rabbitmq-client的实现 VariableLinkedBlockingQueue,这是一个LinkedBlockingQueue类的克隆,增加了一个setCapacity(int)方法,允许在使用的过程中更改容量github地址
public class VariableLinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
/*
* A variant of the "two lock queue" algorithm. The putLock gates
* entry to put (and offer), and has an associated condition for
* waiting puts. Similarly for the takeLock. The "count" field
* that they both rely on is maintained as an atomic to avoid
* needing to get both locks in most cases. Also, to minimize need
* for puts to get takeLock and vice-versa, cascading notifies are
* used. When a put notices that it has enabled at least one take,
* it signals taker. That taker in turn signals others if more
* items have been entered since the signal. And symmetrically for
* takes signalling puts. Operations such as remove(Object) and
* iterators acquire both locks.
*/
/**
* Linked list node class
*/
static class Node<E> {
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
/** Head of linked list */
private transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* Signal a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signal a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* Create a node and link it at end of queue
* @param x the item
*/
private void insert(E x) {
last = last.next = new Node<E>(x);
}
/**
* Remove a node from head of queue,
* @return the node
*/
private E extract() {
Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Lock to prevent both puts and takes.
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlock to allow both puts and takes.
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
/**
* Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public VariableLinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
*
* @param capacity the capacity of this queue.
* @throws IllegalArgumentException if <tt>capacity</tt> is not greater
* than zero.
*/
public VariableLinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
* @param c the collection of elements to initially contain
* @throws NullPointerException if <tt>c</tt> or any element within it
* is <tt>null</tt>
*/
public VariableLinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}
// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue.
*/
@Override
public int size() {
return count.get();
}
/**
* Set a new capacity for the queue. Increasing the capacity can
* cause any waiting {@link #put(Object)} invocations to succeed if the new
* capacity is larger than the queue.
* @param capacity the new capacity for the queue
*/
public void setCapacity(int capacity) {
final int oldCapacity = this.capacity;
this.capacity = capacity;
final int size = count.get();
if (capacity > size && size >= oldCapacity) {
signalNotFull();
}
}
// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of elements that this queue can ideally (in
* the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current <tt>size</tt> of this queue.
* <p>Note that you <em>cannot</em> always tell if
* an attempt to <tt>add</tt> an element will succeed by
* inspecting <tt>remainingCapacity</tt> because it may be the
* case that a waiting consumer is ready to <tt>take</tt> an
* element out of an otherwise full queue.
*/
@Override
public int remainingCapacity() {
return capacity - count.get();
}
/**
* Adds the specified element to the tail of this queue, waiting if
* necessary for space to become available.
* @param o the element to add
* @throws InterruptedException if interrupted while waiting.
* @throws NullPointerException if the specified element is <tt>null</tt>.
*/
@Override
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() >= capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
* @param o the element to add
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return <tt>true</tt> if successful, or <tt>false</tt> if
* the specified waiting time elapses before space is available.
* @throws InterruptedException if interrupted while waiting.
* @throws NullPointerException if the specified element is <tt>null</tt>.
*/
@Override
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
for (;;) {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/**
* Inserts the specified element at the tail of this queue if possible,
* returning immediately if this queue is full.
*
* @param o the element to add.
* @return <tt>true</tt> if it was possible to add the element to
* this queue, else <tt>false</tt>
* @throws NullPointerException if the specified element is <tt>null</tt>
*/
@Override
public boolean offer(E o) {
if (o == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() >= capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
@Override
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c >= capacity)
signalNotFull();
return x;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
takeLock.unlock();
}
if (c >= capacity)
signalNotFull();
return x;
}
@Override
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c >= capacity)
signalNotFull();
return x;
}
@Override
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
@Override
public boolean remove(Object o) {
if (o == null) return false;
boolean removed = false;
fullyLock();
try {
Node<E> trail = head;
Node<E> p = head.next;
while (p != null) {
if (o.equals(p.item)) {
removed = true;
break;
}
trail = p;
p = p.next;
}
if (removed) {
p.item = null;
trail.next = p.next;
if (count.getAndDecrement() >= capacity)
notFull.signalAll();
}
} finally {
fullyUnlock();
}
return removed;
}
@Override
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
@Override
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for (Node<?> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
return a;
} finally {
fullyUnlock();
}
}
@Override
public String toString() {
fullyLock();
try {
return super.toString();
} finally {
fullyUnlock();
}
}
@Override
public void clear() {
fullyLock();
try {
head.next = null;
if (count.getAndSet(0) >= capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
}
@Override
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
Node<E> first;
fullyLock();
try {
first = head.next;
head.next = null;
if (count.getAndSet(0) >= capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
// Transfer the elements outside of locks
int n = 0;
for (Node<E> p = first; p != null; p = p.next) {
c.add(p.item);
p.item = null;
++n;
}
return n;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
fullyLock();
try {
int n = 0;
Node<E> p = head.next;
while (p != null && n < maxElements) {
c.add(p.item);
p.item = null;
p = p.next;
++n;
}
if (n != 0) {
head.next = p;
if (count.getAndAdd(-n) >= capacity)
notFull.signalAll();
}
return n;
} finally {
fullyUnlock();
}
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence.
*/
@Override
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/*
* Basic weak-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
final ReentrantLock putLock = VariableLinkedBlockingQueue.this.putLock;
final ReentrantLock takeLock = VariableLinkedBlockingQueue.this.takeLock;
putLock.lock();
takeLock.lock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
takeLock.unlock();
putLock.unlock();
}
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public E next() {
final ReentrantLock putLock = VariableLinkedBlockingQueue.this.putLock;
final ReentrantLock takeLock = VariableLinkedBlockingQueue.this.takeLock;
putLock.lock();
takeLock.lock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = current.next;
if (current != null)
currentElement = current.item;
return x;
} finally {
takeLock.unlock();
putLock.unlock();
}
}
@Override
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
final ReentrantLock putLock = VariableLinkedBlockingQueue.this.putLock;
final ReentrantLock takeLock = VariableLinkedBlockingQueue.this.takeLock;
putLock.lock();
takeLock.lock();
try {
Node<E> node = lastRet;
lastRet = null;
Node<E> trail = head;
Node<E> p = head.next;
while (p != null && p != node) {
trail = p;
p = p.next;
}
if (p == node) {
p.item = null;
trail.next = p.next;
int c = count.getAndDecrement();
if (c >= capacity)
notFull.signalAll();
}
} finally {
takeLock.unlock();
putLock.unlock();
}
}
}
/**
* Save the state to a stream (that is, serialize it).
*
* @serialData The capacity is emitted (int), followed by all of
* its elements (each an <tt>Object</tt>) in the proper order,
* followed by a null
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for (Node<E> p = head.next; p != null; p = p.next)
s.writeObject(p.item);
// Use trailing null as sentinel
s.writeObject(null);
} finally {
fullyUnlock();
}
}
/**
* Reconstitute this queue instance from a stream (that is,
* deserialize it).
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
@SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null)
break;
add(item);
}
}
}
原文始发于微信公众号(小奏技术):SpringBoot+Apollo配置动态线程池
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30508.html