异步IO优化机制
Redis为了实现极高的性能,在设计中采用了惰性删除、异步关闭文件描述符和异步刷盘AOF。这些机制都依赖于Redis的后台异步IO机制来提供基础支撑。
-
惰性删除机制并不会在del命令时立即删除value, 而是后台线程进行异步删除。这避免了del命令的执行被大量删除操作阻塞。
-
异步关闭文件描述符机制将文件关闭操作分离到后台线程完成,避免close调用阻塞主线程。
-
异步刷盘AOF机制将AOF文件内容的写入缓存到内存,由后台线程定期刷新到磁盘。这大幅减少了同步刷盘对写入性能的影响。
实现这些优化的关键是Redis的后台异步IO。它创建了bioclosefiles、bioaoffsync和biolazyfree 三个后台线程,专门处理被延迟的I/O操作,包括文件fd关闭、AOF刷盘和惰性删除。这种将I/O操作异步化的设计极大地提升了Redis的并发性能。主线程可以避免被阻塞操作影响,使得Redis可以大幅超越传统数据库,达到10万+ QPS的惊人性能。
本文带着大家一起走读Redis后台异步IO的代码,剖析实现机制。
异步IO代码实现
数据结构
// path:src/bio.h
/* Background job opcodes */
// 定义不同的异步io操作
// 异步关闭文件fd
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
// AOF异步刷盘
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
// 惰性删除
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
#define BIO_NUM_OPS 3 //总共三种异步io任务(文件fd关闭、AOF刷盘和惰性删除)
// path:src/bio.c
// 每种任务类型(文件fd关闭、AOF刷盘和惰性删除)一个异步io线程
// 保存异步io线程
static pthread_t bio_threads[BIO_NUM_OPS];
// 每个异步io线程互斥锁
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
// 每个异步io线程新任务通知条件变量
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
// io线程处理完每个任务通知条件变量
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
// 保存io任务链表(队列)
static list *bio_jobs[BIO_NUM_OPS];
/* The following array is used to hold the number of pending jobs for every
* OP type. This allows us to export the bioPendingJobsOfType() API that is
* useful when the main thread wants to perform some operation that may involve
* objects shared with the background thread. The main thread will just wait
* that there are no longer jobs of this type to be executed before performing
* the sensible operation. This data is also useful for reporting. */
// 记录每种异步io任务的数量
static unsigned long long bio_pending[BIO_NUM_OPS];
/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
// io任务结构体
// 每个任务一个结构体对象
struct bio_job {
// 任务创建时间戳
time_t time; /* Time at which the job was created. */
/* Job specific arguments.*/
// 保存异步关闭文件的fd, 或者异步刷 AOF 的fd
int fd; /* Fd for file based background jobs */
// 惰性删除任务的free函数
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
// 惰性删除任务的参数
void *free_args[]; /* List of arguments to be passed to the free function */
};
异步io线程处理
// path:src/bio.c
/* Initialize the background system, spawning the thread. */
// 初始化io线程
// 在redis服务器启动时被调用, 即void InitServerLast()
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
// 初始化条件变量和互斥锁
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
// 创建BIO_NUM_OPS个io线程,也即3个
// 分别负责 文件fd关闭、AOF刷盘和惰性删除
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
// 异步io线程处理函数
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
/* Check that the type is within the right interval. */
// 判断是否是不支持类型,直接报错返回
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
// 根据不同类型设置线程名字
switch (type) {
case BIO_CLOSE_FILE:
redis_set_thread_title("bio_close_file");
break;
case BIO_AOF_FSYNC:
redis_set_thread_title("bio_aof_fsync");
break;
case BIO_LAZY_FREE:
redis_set_thread_title("bio_lazy_free");
break;
}
// 设置cpu亲和性
redisSetCpuAffinity(server.bio_cpulist);
// 设置线程可以被随时kill
makeThreadKillable();
// 加锁(对应任务类型)
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
// 一直循环处理任务
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
// 等任务创建条件变量通知唤醒
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
// 获取队列头部任务
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
// 从队列获取任务后,先释放锁
pthread_mutex_unlock(&bio_mutex[type]);
// 根据不同任务类型执行不同处理
/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
// 异步关闭文件fd, 调用close关闭
close(job->fd);
} else if (type == BIO_AOF_FSYNC) {
// AOF 异步刷盘
// 调用 redis_fsync
// 根据结果设置相关状态
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (redis_fsync(job->fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
int last_status;
atomicGet(server.aof_bio_fsync_status,last_status);
atomicSet(server.aof_bio_fsync_status,C_ERR);
atomicSet(server.aof_bio_fsync_errno,errno);
if (last_status == C_OK) {
serverLog(LL_WARNING,
"Fail to fsync the AOF file: %s",strerror(errno));
}
} else {
atomicSet(server.aof_bio_fsync_status,C_OK);
}
} else if (type == BIO_LAZY_FREE) {
// 惰性删除,调用job里设置的free_fn释放内存
job->free_fn(job->free_args);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
// 执行完任务后删除
listDelNode(bio_jobs[type],ln);
// 减少pending任务数量
bio_pending[type]--;
/* Unblock threads blocked on bioWaitStepOfType() if any. */
// 没完成一个异步任务, 设置io_step_cond条件变量,广播通知等待的线程
pthread_cond_broadcast(&bio_step_cond[type]);
}
}
-
redis服务启动时(InitServerLast)调用void bioInit(void) 初始化io异步线程
-
io异步线程真正处理函数为void *bioProcessBackgroundJobs(void *arg) , 根据线程不同的标号,处理不同异步任务
-
bioProcessBackgroundJobs 核心是循环 从对应任务队列获取任务处理
异步任务创建
// path:src/bio.c
/* Return the number of pending jobs of the specified type. */
// 用户获取对应类型的pinding任务数量
unsigned long long bioPendingJobsOfType(int type) {
unsigned long long val;
pthread_mutex_lock(&bio_mutex[type]);
val = bio_pending[type];
pthread_mutex_unlock(&bio_mutex[type]);
return val;
}
/* If there are pending jobs for the specified type, the function blocks
* and waits that the next job was processed. Otherwise the function
* does not block and returns ASAP.
*
* The function returns the number of jobs still to process of the
* requested type.
*
* This function is useful when from another thread, we want to wait
* a bio.c thread to do more work in a blocking way.
*/
// 假如存在未处理(pending)任务, 等待bio_step_cond条件变量,也即io现场处理完一个异步任务,再返回
// 不存在未处理(pending)任务,直接返回
unsigned long long bioWaitStepOfType(int type) {
unsigned long long val;
pthread_mutex_lock(&bio_mutex[type]);
val = bio_pending[type];
if (val != 0) {
pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
val = bio_pending[type];
}
pthread_mutex_unlock(&bio_mutex[type]);
return val;
}
// 添加异步任务
void bioSubmitJob(int type, struct bio_job *job) {
job->time = time(NULL);
// 对应类型的任务互斥锁加锁
pthread_mutex_lock(&bio_mutex[type]);
// 添加到任务队列尾部
listAddNodeTail(bio_jobs[type],job);
// 任务数量+1
bio_pending[type]++;
// 唤醒新任务条件变量
pthread_cond_signal(&bio_newjob_cond[type]);
// 解锁
pthread_mutex_unlock(&bio_mutex[type]);
}
// 创建惰性删除任务
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
va_list valist;
/* Allocate memory for the job structure and all required
* arguments */
// 分配bio_job 结构体
struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
// 保存释放用的free 函数
job->free_fn = free_fn;
// 保存free 参数
va_start(valist, arg_count);
for (int i = 0; i < arg_count; i++) {
job->free_args[i] = va_arg(valist, void *);
}
va_end(valist);
// 调用bioSubmitJob 保存到 BIO_LAZY_FREE对应的任务队列
bioSubmitJob(BIO_LAZY_FREE, job);
}
// 创建异步关闭句柄的操作
void bioCreateCloseJob(int fd) {
// 分配bio_job 结构体
struct bio_job *job = zmalloc(sizeof(*job));
// 保存文件句柄
job->fd = fd;
// 调用bioSubmitJob 保存到 BIO_CLOSE_FILE对应的任务队列
bioSubmitJob(BIO_CLOSE_FILE, job);
}
// 创建AOF异步刷盘操作
void bioCreateFsyncJob(int fd) {
// 分配bio_job 结构体
struct bio_job *job = zmalloc(sizeof(*job));
// 保存文件句柄
job->fd = fd;
// 调用bioSubmitJob 保存到 BIO_AOF_FSYNC对应的任务队列
bioSubmitJob(BIO_AOF_FSYNC, job);
}
-
创建惰性删除任务函数为:bioCreateLazyFreeJob
-
创建异步关闭文件fd任务函数为: bioCreateCloseJob
-
创建AOF异步刷盘任务函数为:bioCreateFsyncJob
-
三个函数最终都是通过bioSubmitJob将任务添加到对应类型的任务队列中
惰性删除
// path:src/db.c
// 真正执行删除函数
/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) {
// 判断是否过期删除, 里面的实现也存在异步删除
expireIfNeeded(c->db,c->argv[j]);
// 根据lazy, 觉得是异步删除还是直接删除
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}
// 删除命令处理函数
void delCommand(client *c) {
// 调用delGenericCommand删除key
// lazy传值取决于 lazyfree_lazy_user_del 配置
delGenericCommand(c,server.lazyfree_lazy_user_del);
}
// unlike 命令处理函数
void unlinkCommand(client *c) {
// unlike 调用delGenericCommand删除key
// lazy传值为1, 代表惰性删除
delGenericCommand(c,1);
}
// path:src/lazyfree.c
// 同步删除key val
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
// 删除超时dict中的key
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
// 从dict删除获取对象
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
// 删除key value
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}
/* Delete a key, value, and associated expiration entry if any, from the DB.
* If there are enough allocations to free the value object may be put into
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
// 异步删除某个key
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
// 在超时expires的dict中删除对应key
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
// 在dict中删掉对应的key
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
// 根据key的类型和val大小,计算删除key需要的时间成本
size_t free_effort = lazyfreeGetFreeEffort(key,val);
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
* Note that if the object is shared, to reclaim it now it is not
* possible. This rarely happens, however sometimes the implementation
* of parts of the Redis core may call incrRefCount() to protect
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount(). */
// 成本大于LAZYFREE_THRESHOLD, 走惰性删除 key val
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
dictSetVal(db->dict,de,NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
// 删除key/val
// 这里需要注意如果已经做惰性删除
// 此处已经把val值设置为NULL,即指free key
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}
-
unlikeCommand 和 delCommand都是调用delGenericCommand删除key
-
delGenericCommand根据参数lazy 决定是否使用异步删除(dbAsyncDelete)还是同步删除(dbSyncDelete)
-
dbAsyncDelete根据val大小判断是否使用惰性删除,如果是调用bioCreateLazyFreeJob
-
Redis惰性删除不仅仅用在unlike命令和del命令处理, 还用在key过期删除, 内存不足淘汰key等多个地方
异步关闭句柄
// path:src/db.c
/* Plain unlink() can block for quite some time in order to actually apply
* the file deletion to the filesystem. This call removes the file in a
* background thread instead. We actually just do close() in the thread,
* by using the fact that if there is another instance of the same file open,
* the foreground unlink() will only remove the fs name, and deleting the
* file's storage space will only happen once the last reference is lost. */
// 异步删除文件
int bg_unlink(const char *filename) {
int fd = open(filename,O_RDONLY|O_NONBLOCK);
if (fd == -1) {
// 获取不到fd, 在当前(主线程)直接删除
/* Can't open the file? Fall back to unlinking in the main thread. */
return unlink(filename);
} else {
/* The following unlink() removes the name but doesn't free the
* file contents because a process still has it open. */
// 调用 unlink删除文件, 此时因为还打开一个fd,系统并未真正执行操作
// 而是等到异步线程关闭fd 才真正执行删除, 因此unlike很快就返回了
int retval = unlink(filename);
if (retval == -1) {
/* If we got an unlink error, we just return it, closing the
* new reference we have to the file. */
int old_errno = errno;
close(fd); /* This would overwrite our errno. So we saved it. */
errno = old_errno;
return -1;
}
// 创建异步关闭fd任务
// 异步线程关闭fd 才真正执行删除
bioCreateCloseJob(fd);
return 0; /* Success. */
}
}
-
bg_unlink利用fd被异步线程关闭系统才真正删除文件的特性实现异步删除文件功能而不阻塞主线程
-
bg_unlink用在删除AOF 和RDB 文件等多个场景
AOF异步刷盘
// path:src/aof.c
/* Return true if an AOf fsync is currently already in progress in a
* BIO thread. */
int aofFsyncInProgress(void) {
// aof异步刷盘任务数是否为0
// 即上次异步刷盘任务是否完成
return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
void aof_background_fsync(int fd) {
// 添加AOF异步刷盘任务
bioCreateFsyncJob(fd);
}
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
// 在写命令回复给客户端之前, 需要将aof写到文件系统(内核)
// 在进入
// flush aof_buf的内容到文件系统
// 并且根据模式和相关条件进行 文件刷盘(可能是同步或者异步)
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
// 如果是每秒刷盘模式
// 虽然aof_buf没内容
// 但是(server.aof_fsync_offset != server.aof_current_size),即还有一部分内容写到文件系统但是还没刷盘
// 并且时间已经超过一秒
// 并且不存在aof异步刷盘中(没有对应异步任务)
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
// goto 到刷盘逻辑处
goto try_fsync;
} else {
return;
}
}
// 每秒刷盘模式,获取现在是否在异步刷盘中
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
// 处于每秒刷盘模式
if (sync_in_progress) {
// 并且现在还有异步任务未完成(在异步刷盘中)
if (server.aof_flush_postponed_start == 0) {
// aof_flush_postponed_start为0,说明从未执行过异步刷盘
// 更新最新异步刷盘时间戳为当前,然后直接返回
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
// 距离上次创建异步刷盘任务小于 2秒
// 直接返回,等待目前还未执行异步任务刷盘就行
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
// 距离上次创建异步刷盘任务 大于2秒(任务目前还未真正执行)
// 统计aof_delayed_fsync(刷盘延时)次数
// 打印日志
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}
latencyStartMonitor(latency);
// 调用aofWrite 将aof_buf(缓冲器)内容写到文件系统(内核)
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
// 统计写到文件系统耗时
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
// 根据不同情况,把耗时统计到不同监控项
if (sync_in_progress) {
// 如果处于还有异步刷盘任务未被执行
// 统计时延耗时
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
// 有子进程在进行RDB 或者AOF,统计耗时
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
// 未将缓存区内容全部写到内核
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 打日志限流
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
// 打印异常日志
/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
// 全部都没有写到文件系统
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
// 写一部分
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 截断文件
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
// 处于每次刷盘,当时无法写到文件系统(内核)
// 服务退出
/* We can't recover when the fsync policy is ALWAYS since the reply
* for the client is already in the output buffers (both writes and
* reads), and the changes to the db can't be rolled back. Since we
* have a contract with the user that on acknowledged or observed
* writes are is synced on disk, we must exit. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
// 其他模式可以接受暂时没写到文件系统
// 但是设置状态,在没有写入成功前阻止再次尝试写入aof buf(即不能执行写命令)
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 全部写入成功, 清除写入失败状态
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
// 更加buf大小决定回收buf还是复用buf
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
// 尝试刷盘
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
/* Let's try to get this data on the disk. To guarantee data safe when
* the AOF fsync policy is 'always', we should exit if failed to fsync
* AOF (see comment next to the exit(1) after write error above). */
// 每次都刷盘模式
// 直接同步刷盘
if (redis_fsync(server.aof_fd) == -1) {
serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
"AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
// 刷盘异常,直接退出
exit(1);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
//每秒刷盘模式,并且已经超过1秒
if (!sync_in_progress) {
// 并且之前的异步刷盘任务已经完成
// 再次创建AOF 异步刷盘任务
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
// 处理完所有客户端事件(命令)
// 进入epoll_wait之前
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
// 将aofbuf 写到文件系统
/* Write the AOF buffer on disk */
if (server.aof_state == AOF_ON)
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
// 向客户端发送回复包
handleClientsWithPendingWritesUsingThreads();
// ...
}
-
开启aof时,redis要求说有写命令的aof日志必须先写入文件系统,才能想客户端发送回复包
-
在beforeSleep中, 先调用flushAppendOnlyFile将aof缓冲器内容先写到文件系统
-
再调用handleClientsWithPendingWritesUsingThreads向客户端发送回复包
-
flushAppendOnlyFile flush aof_buf的内容到文件系统,并且根据模式和相关条件进行 文件刷盘(可能是同步或者异步)
-
异步刷盘时调用aofbackgroundfsync创建任务
小结
本文首先介绍了Redis中几种关键的异步I/O优化手段:惰性删除、异步关闭文件描述符和异步刷盘AOF。这些手段可通过异步处理避免I/O操作阻塞主线程,提供了Redis的超高并发能力。 然后,文中深入源码层面,剖析了这些异步I/O优化的实现机制,即Redis的Background I/O Service。该服务通过bio线程来负责处理被延迟的I/O操作,使主线程可以专注在处理命令请求上,避免被阻塞。 最后,通过分析几个典型场景的代码,让读者更直观地了解这些异步I/O机制的运用,如惰性删除在del命令中的应用,以及异步刷盘对AOF写入性能的提升。 总体上,本文系统地介绍了Redis为追求极致性能而设计的各种后台异步I/O优化手段,以及这些手段的实现原理。这些手段充分利用了Redis的单线程模型下的并发处理优势,使得Redis可以提供超过10万QPS的惊人性能,成为高性能数据库的典范。读者可以通过本文全面了解Redis的异步I/O优化设计。
原文始发于微信公众号(吃瓜技术派):揭开Redis极速底层逻辑 – 异步IO机制源码剖析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236041.html