在讲述Netty的高性能特性之前,基于之前的epoll技术分析中谈到C10K问题,其实是属于一个性能优化问题,目的是为了能够在单台机器上支撑更多的并发连接调度所做的性能优化,为了达到上述目标,需要要求我们设计的web服务采用合理的IO模型,并在对应的IO模型基础上引入多线程与并发库技术的使用来支撑更多的连接调度,同时考虑到计算机资源的限制,我们需要在设计web服务的时候合理对资源进行分配优化,比如内存,网络带宽以及CPU核数的充分利用,也就是说我们还需要考虑到可伸缩性的问题,通过增加资源来使得我们的web服务能够得到线性提升效果.接下来我们就来结合部分源码分析Netty技术是如何体现高性能这一个特性.
C10K&C10M问题分析
C10K&C10M解决方案
C10K问题
-
采用线程连接架构TBA模型,也就是1个客户端连接对应1个线程,那么对于内核而言,假设这个时候需要10k个连接,那么也就意味着要10k个线程,此时内核需要从这个10k个线程中轮询遍历哪个线程是有数据流量进来的,对于服务器本身而言,不论线程数量多少,线程上下文切换的时间是恒定的,即使再多的连接分配给再多的线程,其性能也不会上去,线程调度仍然无法扩展,除了本身线程资源的瓶颈之外,我们可以看到的一个现象就是线程调度无法扩展. -
相对地,采用选择/轮询来处理连接事件,也就是面向事件驱动设计EDA模式,我们在分析select/poll/epoll技术中讲到,它们都是对一个socket集合fds进行监听,每个数据包都会经过socket套接字,即使套接字增加,我们同样可以通过选择和轮询的方式来遍历socket数据流量进来的事件,这个时候单线程是可以完成一个选择和轮询就绪事件的操作,同时还可以实现连接的扩展性,随着IO技术的发展,现代服务器都会引入可扩展的epoll技术与异步IO Compeletion Port在指定时间内查询就绪的socket集合并返回给应用程序.
-
选用的IO模型能够支持web实现可伸缩性 -
结合IO模型设计的线程模型,能够通过增加适当的线程数量来支撑web服务更多的并发连接 -
最后一个可以理解为性能问题,一个Web服务的性能可以参考以下几个因素: 数据复制问题/线程上下文切换问题/内存分配问题以及锁争用(无锁编程是一个我们理想的选择)
C10M问题
-
支撑1000w/s的并发连接 -
支撑一个持续时间约为10s的100w并发连接 -
一个连接要处理接近1000M/s大小的数据包,意味着能够快速与互联网建立连接 -
1000w个数据包/秒-而当前的服务器每秒处理5w个数据包,这将达到更高的水平。过去服务器每秒能够处理100K次中断,每个数据包都会引起中断。 -
10微秒延迟-可伸缩服务器可能会处理规模,但延迟会增加。 -
10微秒抖动-限制最大延迟 -
10个连贯的CPU内核-软件应扩展到更大数量的内核。通常,软件只能轻松扩展到四个内核。服务器可以扩展到更多的内核,因此需要重写软件以支持更大的内核计算机(在C10K的基础上)
-
数据包可扩展: 编写一个自定义驱动程序以绕过TCP堆栈,直接将数据包发送到应用程序.如PF_RING,Netmap,Intel DPDK -
多核可扩展: 多核编码并不是多线程编码,而是让我们的应用程序分布在每个CPU核心上,保证我们能够随着内核的增加以线性扩展我们应用程序的处理能力.即一个是保持每个cpu核心数的数据结构,一个是每个cpu保证原子性操作,一个是使用无锁技术的数据结构,一个是使用线程模型完成流水工作,最后一个是利用处理器的亲和力,即保持运行在每个cpu核数上分配的线程是固定的,即每个cpu对应着专有的线程来完成工作. -
内存可扩展: 一个是使用连续内存分配技术,增加数据的缓存命中率,一个是分页表运用高效的缓存数据结果并对数据压缩,一个是使用池化技术管理内存,一个是合理分配线程以降低内存访问延迟,最后一个是使用预分配的内存技术.
高并发问题
高并发关注指标
-
响应时间(Response Time):发起一个request请求,执行这个request请求从开始到最后返回响应结果所花费的总体时间,也就是客户端发起请求到最后收到服务端返回响应结果的时间.比如http请求响应时间为200ms,200ms表示RT. -
每秒并发连接(并发用户数): 每秒可支撑的连接调度/同时承载正常使用系统功能的用户数量,并发连接/用户数更关注的是能够处理调度连接而不在于处理速度. -
QPS/TPS(每秒查询量/每秒事物处理量): 比如现在客户端发起一个下单操作(用户鉴权/订单校验/下单操作三个步骤),这个下单操作形成一个TPS,而下单里的每个步骤形成一个QPS,也就是说TPS包含3个QPS操作,因而对于TPS理解是一个完整的事物请求的操作结果,而QPS是针对一个request请求的操作结果,对此TPS是衡量软件测试结果的度量单位,而QPS是特定的查询服务器在指定的时间段内处理流量度量标准的数量,域名服务器的机器性能通常用QPS来衡量,QPS与TPS更关注处理速度. -
吞吐量(Throughput): 取决于我们关注系统的业务指标,比如我们关注的是软件测量结果相关的处理能力(处理速度),那么这个时候的吞吐量我们需要关注的是TPS,如果是关注机器性能的流量,那么我们关注的吞吐量是QPS,如果我们对接的是接入层的服务,那么我们可能需要关注的是并发连接的调度,此时关注的吞吐量是支撑的并发连接调度数量.
并发连接/QPS/TPS
基于上述的高并发指标的理解,现将并发连接/QPS/TPS的区分通过以下图解的方式展开:
-
并发连接: 主要体现在服务端程序高效的连接调度机制上,也就是说服务端能够在一定的时间段内能够正确地响应给每个连接的请求即可,至于何时响应以及如何响应不是并发连接关注的事情. -
QPS/TPS: 主要体现在处理速度上,要求能够正常完成对请求响应的处理,不仅是要对请求结果正确响应,同时还要求处理能力能够尽可能快速.
IO与线程模型实现高并发连接调度
-
基于先前的高性能IO编程设计并结合上述的C10K与C10M问题,实现一个支撑高并发连接调度的web服务需要借助具备可伸缩性的NIO或者AIO技术完成,通过监听socket的数据流量出入事件来响应给应用程序,并且轮询事件通过单线程的方式也能够处理,还能实现扩展,只要操作系统的fd资源配置足够大即可. -
其次,为了支撑更多更快的响应连接调度处理,我们可以适当地加入多线程处理方式来扩展上述单线程处理连接事件的能力.同时也会看到在IO相关设计,基于事件的编程,为了简化应用开发者编写代码的复杂度以及具备更好的扩展性,引入了基于EDA的Reactor与Proactor的模式设计.
C10K与C10M提升性能优化因素
数据包的收发
-
socket接收数据流量的时候我们要考虑如何将数据包直接传输到应用程序,尽量避免数据的拷贝问题. -
应用程序接收数据包的时候能不能缓存起来,同时如果加入缓存的话,有没有办法提高命中率. -
数据存储的区域能否重复利用,即使用池化技术进行管理分配,减少向计算机申请资源的性能.
应用程序的处理能力
-
充分利用CPU资源,避免CPU一直处于空闲假死状态(线程阻塞/空轮询/线程过多) -
在先前的文章高性能IO设计有说明到,我们可以在竞争环境下使用并发库,底层原子操作等手段有助于提升IO的吞吐量, -
同步环境下能够使用无锁来处理任务.
Netty高并发机制
可伸缩的IO模型
-
NIO多路复用技术具备可伸缩性,通过C10K问题的分析,我们知道单线程能够处理更多的socket就绪事件,也就是说单线程面向事件驱动设计的复用技术实现可扩展性且能支撑更多并发连接的请求调度处理,这里与线程连接不同的是我们关注的是事件而不是线程本身,因而不会受限于线程资源以及线程的调度分配问题. -
其次Netty框架是基于Reactor模式进行演变,但与Reactor模式不同的是Netty是多线程异步处理,更像是Proactor模式,只不过异步处理是在应用程序通过回调的方式完成的,而Proactor是基于AIO的方式将异步操作传输到内核并在内核中进行回调返回.
Reactor模式
关于Netty处理的单线程无锁串行化的流水工作流程示意图如下:
-
使用NIO模型实现多连接的可伸缩性扩展,同时引入Reactor模式以及责任链设计提升web服务的扩展性(程序扩展性,服务伸缩性),能够支撑更多的并发连接调度.
-
其次,Netty设计通过为每个执行的事件轮询EventLoop分配独有的线程,保证了每个事件轮询器之间处理的流水工作相互独立,同时也保证了在当前EventLoop下执行的所有流水工作都是专属于专有的线程,不存在资源竞争以及锁争用的情况,基于此,在多核环境下我们可以充分利用多核技术进一步去提升我们的并发连接调度处理能力.
-
最后一个就是Netty通过EventLoopGroup的“集群”手段来分担我们web服务的并发连接调度处理能力,有效缓解对单个线程处理并发连接的压力,提升并发连接调度的处理能力.
高性能的ByteBuf
支持堆外内存读写
网卡设备接收到数据包流量事件,内核将数据块加载到内核缓冲区中,并且通过socket传输数据到用户空间的缓冲区,最后JVM要操作socket缓冲区的数据,需要将其读取到JVM堆中存储,这个时候需要再JVM堆中申请一个内存区域用于存放数据包数据,而如果直接通过堆外内存读取数据,则可以减少一次数据的拷贝以及内存资源的损耗,如下图所示:
分片&复合缓存的零拷贝机制
// 假设buffer1以及buffer2都存储在堆外内存,堆内内存同理(只是在JVM中)
ByteBuf httpHeader = buffer1.silice(OFFSET_PAYLOAD, buffer1.readableBytes() - OFFSET_PAYLOAD);
ByteBuf httpBody = buffer2.silice(OFFSET_PAYLOAD, buffer2.readableBytes() - OFFSET_PAYLOAD);
// 逻辑上的复制,header与body仍然存储在原有的内存区域中,http为JVM在堆中创建的对象,指向一个逻辑结构上的ByteBuf
ByteBuf http = ChannelBuffers.wrappedBuffer(httpHeader, httpBody);
动态扩容
核心源代码
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
// > 4M
// cap / 4 * 4 + 4
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// < 4M 进行以2的倍数进行增长
// 2096,此时分配的内存为3072byte
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
通过上述源码可知:
-
当写出的数据不足4M的时候,将以64byte为起始值,以2的倍数进行增长扩容 -
当写出的数据大于4M的时候,将以一个公式 newCapacity = capacity/4*4+4
进行计算 -
当写出的数据为4M的时候,直接返回4M预定的默认空间大小
引用计数器与资源管理
PooledByteBufAllocator
而言,使用引用计数能够降低内存分配的开销,有助于优化内存使用和性能的提升.-
ByteBuf的实现接口ReferenceCounted
// ByteBuf.java
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>{
boolean isAccessible() {
return refCnt() != 0;
}
}
//AbstractByteBuf.java
public abstract class AbstractByteBuf extends ByteBuf {
// 对于引用计数为0的实例将无法访问,会抛出异常IllegalReferenceCountException
protected final void ensureAccessible() {
if (checkAccessible && !isAccessible()) {
throw new IllegalReferenceCountException(0);
}
}
}
//ReferenceCounted.java
public interface ReferenceCounted {
// 调用retain(increament) 将会增加引用计数increament
// 调用release(increament)将会减少引用计数increament
}
-
ChannelHandler的资源管理
// 对于入站事件,如果当前消费入站数据并且没有事件进行传播的话,那么就需要手动释放资源
public void channelRead(ChannelHandlerContext ctx, Object msg){
// ...
// not call fireChannelRead,事件传播在当前handler终止,这个时候需要手动清除
ReferenceCountUtil.release(msg);
// SimpleChannelInboundHandler能够手动清除,但是一般入站事件我个人习惯用ChannelInboundHandlerAdapter并且自己手动管理,方法单一,处理简单,可以手动管理,同理出站事件也是用Adapter
}
// 对于出站事件,如果当前需要对非法消息采取丢弃操作,则也需要手动进行处理释放资源
public void channelWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){
ReferenceCountUtil.release(msg);
promise.setSuccess(); // 丢弃消息意味着不会将数据传输到出站事件的责任链上,这个时候FutureListener无法监听到消息处理情况,需要手动通知处理结果
}
-
Netty的资源监控类ResourceLeakDetector
## 关于监控类的级别详细可查看Netty类下的ResourceLeakDetector
## 通过java配置并执行可以查看资源泄漏情况以及输出报告
java -Dio.netty.leakDetection.level=ADVANCED
内存分配算法
内存分配策略
-
内存分配策略源码
// buf = directArena.allocate(cache, initialCapacity, maxCapacity);
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 创建池化的ByteBuf
// 核心处理流程: 先从线程缓存获取栈,从栈获取buf,如果不存在则将创建ByteBuf并存储栈中,最后更新栈数据并一并更新到到线程的cache中最后返回的时候会重置buf的引用计数器
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 从内存中申请资源存储数据
allocate(cache, buf, reqCapacity);
return buf;
}
// 分配策略
// PoolArena.java
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 计算合适的一个区域
// 比如现在申请一个资源为19byte,则会为其创建一个2的临近整数方,这个时候会分配一个32byte的数据
final int normCapacity = normalizeCapacity(reqCapacity);
// 申请的容量小于8kb
if (isTinyOrSmall(normCapacity)) {
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
// 容量小于512byte
if (tiny) {
// 从缓存中获取
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 分配的容量大于512byte小于8kb
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
// 如果线程缓存没有数据则直接申请OS内存资源,存在竞争,需要同步加锁保证线程安全
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
// 并发原子操作更新数据
incTinySmallAllocation(tiny);
return;
}
// 容量大于等于8kb小于16M
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// > 16M,直接从操作系统中申请资源并且不做缓存和池化处理,于是不会添加到arena中
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
// 最后使用ByteBuf进行回收添加到cache中
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
// 可以看到这里将数据添加到线程缓存中
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass, nioBuffer, false);
}
}
-
核心流程
Netty内存分配逻辑结构视图
-
从宏观上看,线程与Arena之间的关系:
-
从微观上看每个arena存储数据过程,在上述源码中我们看到在没有使用线程缓存的时候,会创建一个PoolChunk对象,在这个PoolChunk中对于小于8kb的数据会通过维护着一个subpage类型的数组来组成一个page,我们可以认为把存储数据的buffer存放在一个chunk的一个page,并且每个page的容量都是2幂次方且单位为byte,在chunk为了便于搜索可用的page,于是在逻辑上将page以完全二叉树的数据结构进行存储,方便进行搜索查询,每个二叉树节点存储对应一个可分配的容量,根容量为16M,深度每增加1,容量就减半.如下图所示:
-
接着我们看下线程缓存存储的逻辑结构(基于可伸缩性的jemalloc算法):
上述的Tiny MemoryRegionCache对应于TinySubPageCache,Small MemoryRegionCache对应于SmallSubPageCache,而Normal MemoryRegionCache对应于NormalCache.
-
我们根据源码将内存分配策略如下:
高效的程序处理能力
Netty高效处理机制
解决空轮询的源码
// NioEventLoop.java
// 仅摘录部分代码
static{
// 可配置select的循环次数,当网络数据包一直不可达的时候,通过次数控制减少当前selector不断无结果的空轮询,一旦超过次数将会重建selector,将原有的selector关闭,避免cpu飙升.
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
void run(){
for(;;){
try{
select();
}catch(){}
selectCnt++;
// 处理就绪事件
processSelectedKeys();
// 执行任务
ranTasks = runAllTasks();
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug(...);
selectCnt = 0;
// unexpectedSelectorWakeup处理空轮询
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}
}
// unexpectedSelectorWakeup方法
// 解决空轮询根据轮询次数判断是否重建selector,丢弃原有的selector,降低CPU的负载
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 超过一定的次数之后重建selector,如何重建这里不贴代码
rebuildSelector();
return true;
}
return false;
}
使用责任链机制实现无锁串行化任务
使用并发库
在先前的高性能IO设计一文中有说到,在资源竞争的环境下,使用并发库甚至是无锁编程能够提升程序的性能,避免锁的争抢与等待.
参考资料
C10K & C10M
-
http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html
-
https://en.wikipedia.org/wiki/C10k_problem
高性能
-
http://pl.atyp.us/content/tech/servers.html
基于可伸缩性的jemalloc算法
-
http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf
-
https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919
至此,我们对Netty的高性能有了一个新的认知,后续分享以交叉方式输出,不限领域写作,最后感谢花时间阅读,如果有收获欢迎动一动小手指转发或者好看,谢谢!

老铁们关注走一走,不迷路
原文始发于微信公众号(疾风先生):深入分析Netty的高性能
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25734.html