揭开Redis极速底层逻辑 – 异步IO机制源码剖析

异步IO优化机制

Redis为了实现极高的性能,在设计中采用了惰性删除、异步关闭文件描述符和异步刷盘AOF。这些机制都依赖于Redis的后台异步IO机制来提供基础支撑。

  • 惰性删除机制并不会在del命令时立即删除value, 而是后台线程进行异步删除。这避免了del命令的执行被大量删除操作阻塞。

  • 异步关闭文件描述符机制将文件关闭操作分离到后台线程完成,避免close调用阻塞主线程。

  • 异步刷盘AOF机制将AOF文件内容的写入缓存到内存,由后台线程定期刷新到磁盘。这大幅减少了同步刷盘对写入性能的影响。

实现这些优化的关键是Redis的后台异步IO。它创建了bioclosefiles、bioaoffsync和biolazyfree 三个后台线程,专门处理被延迟的I/O操作,包括文件fd关闭、AOF刷盘和惰性删除。这种将I/O操作异步化的设计极大地提升了Redis的并发性能。主线程可以避免被阻塞操作影响,使得Redis可以大幅超越传统数据库,达到10万+ QPS的惊人性能。

本文带着大家一起走读Redis后台异步IO的代码,剖析实现机制。

异步IO代码实现

数据结构

  1. // path:src/bio.h

  2. /* Background job opcodes */

  3. // 定义不同的异步io操作

  4. // 异步关闭文件fd

  5. #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */

  6. // AOF异步刷盘

  7. #define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */

  8. // 惰性删除

  9. #define BIO_LAZY_FREE 2 /* Deferred objects freeing. */

  10. #define BIO_NUM_OPS 3 //总共三种异步io任务(文件fd关闭、AOF刷盘和惰性删除)

  11. // path:src/bio.c

  12. // 每种任务类型(文件fd关闭、AOF刷盘和惰性删除)一个异步io线程

  13. // 保存异步io线程

  14. static pthread_t bio_threads[BIO_NUM_OPS];

  15. // 每个异步io线程互斥锁

  16. static pthread_mutex_t bio_mutex[BIO_NUM_OPS];

  17. // 每个异步io线程新任务通知条件变量

  18. static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];

  19. // io线程处理完每个任务通知条件变量

  20. static pthread_cond_t bio_step_cond[BIO_NUM_OPS];

  21. // 保存io任务链表(队列)

  22. static list *bio_jobs[BIO_NUM_OPS];

  23. /* The following array is used to hold the number of pending jobs for every

  24. * OP type. This allows us to export the bioPendingJobsOfType() API that is

  25. * useful when the main thread wants to perform some operation that may involve

  26. * objects shared with the background thread. The main thread will just wait

  27. * that there are no longer jobs of this type to be executed before performing

  28. * the sensible operation. This data is also useful for reporting. */

  29. // 记录每种异步io任务的数量

  30. static unsigned long long bio_pending[BIO_NUM_OPS];

  31. /* This structure represents a background Job. It is only used locally to this

  32. * file as the API does not expose the internals at all. */

  33. // io任务结构体

  34. // 每个任务一个结构体对象

  35. struct bio_job {

  36. // 任务创建时间戳

  37. time_t time; /* Time at which the job was created. */

  38. /* Job specific arguments.*/

  39. // 保存异步关闭文件的fd, 或者异步刷 AOF 的fd

  40. int fd; /* Fd for file based background jobs */

  41. // 惰性删除任务的free函数

  42. lazy_free_fn *free_fn; /* Function that will free the provided arguments */

  43. // 惰性删除任务的参数

  44. void *free_args[]; /* List of arguments to be passed to the free function */

  45. };

异步io线程处理

  1. // path:src/bio.c

  2. /* Initialize the background system, spawning the thread. */

  3. // 初始化io线程

  4. // 在redis服务器启动时被调用, 即void InitServerLast()

  5. void bioInit(void) {

  6. pthread_attr_t attr;

  7. pthread_t thread;

  8. size_t stacksize;

  9. int j;

  10. /* Initialization of state vars and objects */

  11. // 初始化条件变量和互斥锁

  12. for (j = 0; j < BIO_NUM_OPS; j++) {

  13. pthread_mutex_init(&bio_mutex[j],NULL);

  14. pthread_cond_init(&bio_newjob_cond[j],NULL);

  15. pthread_cond_init(&bio_step_cond[j],NULL);

  16. bio_jobs[j] = listCreate();

  17. bio_pending[j] = 0;

  18. }

  19. /* Set the stack size as by default it may be small in some system */

  20. pthread_attr_init(&attr);

  21. pthread_attr_getstacksize(&attr,&stacksize);

  22. if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */

  23. while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;

  24. pthread_attr_setstacksize(&attr, stacksize);

  25. /* Ready to spawn our threads. We use the single argument the thread

  26. * function accepts in order to pass the job ID the thread is

  27. * responsible of. */

  28. // 创建BIO_NUM_OPS个io线程,也即3个

  29. // 分别负责 文件fd关闭、AOF刷盘和惰性删除

  30. for (j = 0; j < BIO_NUM_OPS; j++) {

  31. void *arg = (void*)(unsigned long) j;

  32. if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {

  33. serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");

  34. exit(1);

  35. }

  36. bio_threads[j] = thread;

  37. }

  38. }

  39. // 异步io线程处理函数

  40. void *bioProcessBackgroundJobs(void *arg) {

  41. struct bio_job *job;

  42. unsigned long type = (unsigned long) arg;

  43. sigset_t sigset;

  44. /* Check that the type is within the right interval. */

  45. // 判断是否是不支持类型,直接报错返回

  46. if (type >= BIO_NUM_OPS) {

  47. serverLog(LL_WARNING,

  48. "Warning: bio thread started with wrong type %lu",type);

  49. return NULL;

  50. }

  51. // 根据不同类型设置线程名字

  52. switch (type) {

  53. case BIO_CLOSE_FILE:

  54. redis_set_thread_title("bio_close_file");

  55. break;

  56. case BIO_AOF_FSYNC:

  57. redis_set_thread_title("bio_aof_fsync");

  58. break;

  59. case BIO_LAZY_FREE:

  60. redis_set_thread_title("bio_lazy_free");

  61. break;

  62. }

  63. // 设置cpu亲和性

  64. redisSetCpuAffinity(server.bio_cpulist);

  65. // 设置线程可以被随时kill

  66. makeThreadKillable();

  67. // 加锁(对应任务类型)

  68. pthread_mutex_lock(&bio_mutex[type]);

  69. /* Block SIGALRM so we are sure that only the main thread will

  70. * receive the watchdog signal. */

  71. sigemptyset(&sigset);

  72. sigaddset(&sigset, SIGALRM);

  73. if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))

  74. serverLog(LL_WARNING,

  75. "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

  76. // 一直循环处理任务

  77. while(1) {

  78. listNode *ln;

  79. /* The loop always starts with the lock hold. */

  80. // 等任务创建条件变量通知唤醒

  81. if (listLength(bio_jobs[type]) == 0) {

  82. pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);

  83. continue;

  84. }

  85. // 获取队列头部任务

  86. /* Pop the job from the queue. */

  87. ln = listFirst(bio_jobs[type]);

  88. job = ln->value;

  89. /* It is now possible to unlock the background system as we know have

  90. * a stand alone job structure to process.*/

  91. // 从队列获取任务后,先释放锁

  92. pthread_mutex_unlock(&bio_mutex[type]);

  93. // 根据不同任务类型执行不同处理

  94. /* Process the job accordingly to its type. */

  95. if (type == BIO_CLOSE_FILE) {

  96. // 异步关闭文件fd, 调用close关闭

  97. close(job->fd);

  98. } else if (type == BIO_AOF_FSYNC) {

  99. // AOF 异步刷盘

  100. // 调用 redis_fsync

  101. // 根据结果设置相关状态

  102. /* The fd may be closed by main thread and reused for another

  103. * socket, pipe, or file. We just ignore these errno because

  104. * aof fsync did not really fail. */

  105. if (redis_fsync(job->fd) == -1 &&

  106. errno != EBADF && errno != EINVAL)

  107. {

  108. int last_status;

  109. atomicGet(server.aof_bio_fsync_status,last_status);

  110. atomicSet(server.aof_bio_fsync_status,C_ERR);

  111. atomicSet(server.aof_bio_fsync_errno,errno);

  112. if (last_status == C_OK) {

  113. serverLog(LL_WARNING,

  114. "Fail to fsync the AOF file: %s",strerror(errno));

  115. }

  116. } else {

  117. atomicSet(server.aof_bio_fsync_status,C_OK);

  118. }

  119. } else if (type == BIO_LAZY_FREE) {

  120. // 惰性删除,调用job里设置的free_fn释放内存

  121. job->free_fn(job->free_args);

  122. } else {

  123. serverPanic("Wrong job type in bioProcessBackgroundJobs().");

  124. }

  125. zfree(job);

  126. /* Lock again before reiterating the loop, if there are no longer

  127. * jobs to process we'll block again in pthread_cond_wait(). */

  128. pthread_mutex_lock(&bio_mutex[type]);

  129. // 执行完任务后删除

  130. listDelNode(bio_jobs[type],ln);

  131. // 减少pending任务数量

  132. bio_pending[type]--;

  133. /* Unblock threads blocked on bioWaitStepOfType() if any. */

  134. // 没完成一个异步任务, 设置io_step_cond条件变量,广播通知等待的线程

  135. pthread_cond_broadcast(&bio_step_cond[type]);

  136. }

  137. }

  • redis服务启动时(InitServerLast)调用void bioInit(void) 初始化io异步线程

  • io异步线程真正处理函数为void *bioProcessBackgroundJobs(void *arg) , 根据线程不同的标号,处理不同异步任务

  • bioProcessBackgroundJobs 核心是循环 从对应任务队列获取任务处理

异步任务创建

  1. // path:src/bio.c

  2. /* Return the number of pending jobs of the specified type. */

  3. // 用户获取对应类型的pinding任务数量

  4. unsigned long long bioPendingJobsOfType(int type) {

  5. unsigned long long val;

  6. pthread_mutex_lock(&bio_mutex[type]);

  7. val = bio_pending[type];

  8. pthread_mutex_unlock(&bio_mutex[type]);

  9. return val;

  10. }

  11. /* If there are pending jobs for the specified type, the function blocks

  12. * and waits that the next job was processed. Otherwise the function

  13. * does not block and returns ASAP.

  14. *

  15. * The function returns the number of jobs still to process of the

  16. * requested type.

  17. *

  18. * This function is useful when from another thread, we want to wait

  19. * a bio.c thread to do more work in a blocking way.

  20. */

  21. // 假如存在未处理(pending)任务, 等待bio_step_cond条件变量,也即io现场处理完一个异步任务,再返回

  22. // 不存在未处理(pending)任务,直接返回

  23. unsigned long long bioWaitStepOfType(int type) {

  24. unsigned long long val;

  25. pthread_mutex_lock(&bio_mutex[type]);

  26. val = bio_pending[type];

  27. if (val != 0) {

  28. pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);

  29. val = bio_pending[type];

  30. }

  31. pthread_mutex_unlock(&bio_mutex[type]);

  32. return val;

  33. }

  34. // 添加异步任务

  35. void bioSubmitJob(int type, struct bio_job *job) {

  36. job->time = time(NULL);

  37. // 对应类型的任务互斥锁加锁

  38. pthread_mutex_lock(&bio_mutex[type]);

  39. // 添加到任务队列尾部

  40. listAddNodeTail(bio_jobs[type],job);

  41. // 任务数量+1

  42. bio_pending[type]++;

  43. // 唤醒新任务条件变量

  44. pthread_cond_signal(&bio_newjob_cond[type]);

  45. // 解锁

  46. pthread_mutex_unlock(&bio_mutex[type]);

  47. }

  48. // 创建惰性删除任务

  49. void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {

  50. va_list valist;

  51. /* Allocate memory for the job structure and all required

  52. * arguments */

  53. // 分配bio_job 结构体

  54. struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));

  55. // 保存释放用的free 函数

  56. job->free_fn = free_fn;

  57. // 保存free 参数

  58. va_start(valist, arg_count);

  59. for (int i = 0; i < arg_count; i++) {

  60. job->free_args[i] = va_arg(valist, void *);

  61. }

  62. va_end(valist);

  63. // 调用bioSubmitJob 保存到 BIO_LAZY_FREE对应的任务队列

  64. bioSubmitJob(BIO_LAZY_FREE, job);

  65. }

  66. // 创建异步关闭句柄的操作

  67. void bioCreateCloseJob(int fd) {

  68. // 分配bio_job 结构体

  69. struct bio_job *job = zmalloc(sizeof(*job));

  70. // 保存文件句柄

  71. job->fd = fd;

  72. // 调用bioSubmitJob 保存到 BIO_CLOSE_FILE对应的任务队列

  73. bioSubmitJob(BIO_CLOSE_FILE, job);

  74. }

  75. // 创建AOF异步刷盘操作

  76. void bioCreateFsyncJob(int fd) {

  77. // 分配bio_job 结构体

  78. struct bio_job *job = zmalloc(sizeof(*job));

  79. // 保存文件句柄

  80. job->fd = fd;

  81. // 调用bioSubmitJob 保存到 BIO_AOF_FSYNC对应的任务队列

  82. bioSubmitJob(BIO_AOF_FSYNC, job);

  83. }

  • 创建惰性删除任务函数为:bioCreateLazyFreeJob

  • 创建异步关闭文件fd任务函数为: bioCreateCloseJob

  • 创建AOF异步刷盘任务函数为:bioCreateFsyncJob

  • 三个函数最终都是通过bioSubmitJob将任务添加到对应类型的任务队列中

惰性删除

  1. // path:src/db.c

  2. // 真正执行删除函数

  3. /* This command implements DEL and LAZYDEL. */

  4. void delGenericCommand(client *c, int lazy) {

  5. int numdel = 0, j;

  6. for (j = 1; j < c->argc; j++) {

  7. // 判断是否过期删除, 里面的实现也存在异步删除

  8. expireIfNeeded(c->db,c->argv[j]);

  9. // 根据lazy, 觉得是异步删除还是直接删除

  10. int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :

  11. dbSyncDelete(c->db,c->argv[j]);

  12. if (deleted) {

  13. signalModifiedKey(c,c->db,c->argv[j]);

  14. notifyKeyspaceEvent(NOTIFY_GENERIC,

  15. "del",c->argv[j],c->db->id);

  16. server.dirty++;

  17. numdel++;

  18. }

  19. }

  20. addReplyLongLong(c,numdel);

  21. }

  22. // 删除命令处理函数

  23. void delCommand(client *c) {

  24. // 调用delGenericCommand删除key

  25. // lazy传值取决于 lazyfree_lazy_user_del 配置

  26. delGenericCommand(c,server.lazyfree_lazy_user_del);

  27. }

  28. // unlike 命令处理函数

  29. void unlinkCommand(client *c) {

  30. // unlike 调用delGenericCommand删除key

  31. // lazy传值为1, 代表惰性删除

  32. delGenericCommand(c,1);

  33. }

  34. // path:src/lazyfree.c

  35. // 同步删除key val

  36. /* Delete a key, value, and associated expiration entry if any, from the DB */

  37. int dbSyncDelete(redisDb *db, robj *key) {

  38. /* Deleting an entry from the expires dict will not free the sds of

  39. * the key, because it is shared with the main dictionary. */

  40. // 删除超时dict中的key

  41. if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

  42. // 从dict删除获取对象

  43. dictEntry *de = dictUnlink(db->dict,key->ptr);

  44. if (de) {

  45. robj *val = dictGetVal(de);

  46. /* Tells the module that the key has been unlinked from the database. */

  47. moduleNotifyKeyUnlink(key,val);

  48. // 删除key value

  49. dictFreeUnlinkedEntry(db->dict,de);

  50. if (server.cluster_enabled) slotToKeyDel(key->ptr);

  51. return 1;

  52. } else {

  53. return 0;

  54. }

  55. }

  56. /* Delete a key, value, and associated expiration entry if any, from the DB.

  57. * If there are enough allocations to free the value object may be put into

  58. * a lazy free list instead of being freed synchronously. The lazy free list

  59. * will be reclaimed in a different bio.c thread. */

  60. #define LAZYFREE_THRESHOLD 64

  61. // 异步删除某个key

  62. int dbAsyncDelete(redisDb *db, robj *key) {

  63. /* Deleting an entry from the expires dict will not free the sds of

  64. * the key, because it is shared with the main dictionary. */

  65. // 在超时expires的dict中删除对应key

  66. if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

  67. /* If the value is composed of a few allocations, to free in a lazy way

  68. * is actually just slower... So under a certain limit we just free

  69. * the object synchronously. */

  70. // 在dict中删掉对应的key

  71. dictEntry *de = dictUnlink(db->dict,key->ptr);

  72. if (de) {

  73. robj *val = dictGetVal(de);

  74. /* Tells the module that the key has been unlinked from the database. */

  75. moduleNotifyKeyUnlink(key,val);

  76. // 根据key的类型和val大小,计算删除key需要的时间成本

  77. size_t free_effort = lazyfreeGetFreeEffort(key,val);

  78. /* If releasing the object is too much work, do it in the background

  79. * by adding the object to the lazy free list.

  80. * Note that if the object is shared, to reclaim it now it is not

  81. * possible. This rarely happens, however sometimes the implementation

  82. * of parts of the Redis core may call incrRefCount() to protect

  83. * objects, and then call dbDelete(). In this case we'll fall

  84. * through and reach the dictFreeUnlinkedEntry() call, that will be

  85. * equivalent to just calling decrRefCount(). */

  86. // 成本大于LAZYFREE_THRESHOLD, 走惰性删除 key val

  87. if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {

  88. atomicIncr(lazyfree_objects,1);

  89. bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);

  90. dictSetVal(db->dict,de,NULL);

  91. }

  92. }

  93. /* Release the key-val pair, or just the key if we set the val

  94. * field to NULL in order to lazy free it later. */

  95. if (de) {

  96. // 删除key/val

  97. // 这里需要注意如果已经做惰性删除

  98. // 此处已经把val值设置为NULL,即指free key

  99. dictFreeUnlinkedEntry(db->dict,de);

  100. if (server.cluster_enabled) slotToKeyDel(key->ptr);

  101. return 1;

  102. } else {

  103. return 0;

  104. }

  105. }

  • unlikeCommand 和 delCommand都是调用delGenericCommand删除key

  • delGenericCommand根据参数lazy 决定是否使用异步删除(dbAsyncDelete)还是同步删除(dbSyncDelete)

  • dbAsyncDelete根据val大小判断是否使用惰性删除,如果是调用bioCreateLazyFreeJob

  • Redis惰性删除不仅仅用在unlike命令和del命令处理, 还用在key过期删除, 内存不足淘汰key等多个地方

异步关闭句柄

  1. // path:src/db.c

  2. /* Plain unlink() can block for quite some time in order to actually apply

  3. * the file deletion to the filesystem. This call removes the file in a

  4. * background thread instead. We actually just do close() in the thread,

  5. * by using the fact that if there is another instance of the same file open,

  6. * the foreground unlink() will only remove the fs name, and deleting the

  7. * file's storage space will only happen once the last reference is lost. */

  8. // 异步删除文件

  9. int bg_unlink(const char *filename) {

  10. int fd = open(filename,O_RDONLY|O_NONBLOCK);

  11. if (fd == -1) {

  12. // 获取不到fd, 在当前(主线程)直接删除

  13. /* Can't open the file? Fall back to unlinking in the main thread. */

  14. return unlink(filename);

  15. } else {

  16. /* The following unlink() removes the name but doesn't free the

  17. * file contents because a process still has it open. */

  18. // 调用 unlink删除文件, 此时因为还打开一个fd,系统并未真正执行操作

  19. // 而是等到异步线程关闭fd 才真正执行删除, 因此unlike很快就返回了

  20. int retval = unlink(filename);

  21. if (retval == -1) {

  22. /* If we got an unlink error, we just return it, closing the

  23. * new reference we have to the file. */

  24. int old_errno = errno;

  25. close(fd); /* This would overwrite our errno. So we saved it. */

  26. errno = old_errno;

  27. return -1;

  28. }

  29. // 创建异步关闭fd任务

  30. // 异步线程关闭fd 才真正执行删除

  31. bioCreateCloseJob(fd);

  32. return 0; /* Success. */

  33. }

  34. }

  • bg_unlink利用fd被异步线程关闭系统才真正删除文件的特性实现异步删除文件功能而不阻塞主线程

  • bg_unlink用在删除AOF 和RDB 文件等多个场景

AOF异步刷盘

  1. // path:src/aof.c

  2. /* Return true if an AOf fsync is currently already in progress in a

  3. * BIO thread. */

  4. int aofFsyncInProgress(void) {

  5. // aof异步刷盘任务数是否为0

  6. // 即上次异步刷盘任务是否完成

  7. return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

  8. }

  9. /* Starts a background task that performs fsync() against the specified

  10. * file descriptor (the one of the AOF file) in another thread. */

  11. void aof_background_fsync(int fd) {

  12. // 添加AOF异步刷盘任务

  13. bioCreateFsyncJob(fd);

  14. }

  15. /* Write the append only file buffer on disk.

  16. *

  17. * Since we are required to write the AOF before replying to the client,

  18. * and the only way the client socket can get a write is entering when the

  19. * the event loop, we accumulate all the AOF writes in a memory

  20. * buffer and write it on disk using this function just before entering

  21. * the event loop again.

  22. *

  23. * About the 'force' argument:

  24. *

  25. * When the fsync policy is set to 'everysec' we may delay the flush if there

  26. * is still an fsync() going on in the background thread, since for instance

  27. * on Linux write(2) will be blocked by the background fsync anyway.

  28. * When this happens we remember that there is some aof buffer to be

  29. * flushed ASAP, and will try to do that in the serverCron() function.

  30. *

  31. * However if force is set to 1 we'll write regardless of the background

  32. * fsync. */

  33. #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */

  34. // 在写命令回复给客户端之前, 需要将aof写到文件系统(内核)

  35. // 在进入

  36. // flush aof_buf的内容到文件系统

  37. // 并且根据模式和相关条件进行 文件刷盘(可能是同步或者异步)

  38. void flushAppendOnlyFile(int force) {

  39. ssize_t nwritten;

  40. int sync_in_progress = 0;

  41. mstime_t latency;

  42. if (sdslen(server.aof_buf) == 0) {

  43. /* Check if we need to do fsync even the aof buffer is empty,

  44. * because previously in AOF_FSYNC_EVERYSEC mode, fsync is

  45. * called only when aof buffer is not empty, so if users

  46. * stop write commands before fsync called in one second,

  47. * the data in page cache cannot be flushed in time. */

  48. // 如果是每秒刷盘模式

  49. // 虽然aof_buf没内容

  50. // 但是(server.aof_fsync_offset != server.aof_current_size),即还有一部分内容写到文件系统但是还没刷盘

  51. // 并且时间已经超过一秒

  52. // 并且不存在aof异步刷盘中(没有对应异步任务)

  53. if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&

  54. server.aof_fsync_offset != server.aof_current_size &&

  55. server.unixtime > server.aof_last_fsync &&

  56. !(sync_in_progress = aofFsyncInProgress())) {

  57. // goto 到刷盘逻辑处

  58. goto try_fsync;

  59. } else {

  60. return;

  61. }

  62. }

  63. // 每秒刷盘模式,获取现在是否在异步刷盘中

  64. if (server.aof_fsync == AOF_FSYNC_EVERYSEC)

  65. sync_in_progress = aofFsyncInProgress();

  66. if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {

  67. /* With this append fsync policy we do background fsyncing.

  68. * If the fsync is still in progress we can try to delay

  69. * the write for a couple of seconds. */

  70. // 处于每秒刷盘模式

  71. if (sync_in_progress) {

  72. // 并且现在还有异步任务未完成(在异步刷盘中)

  73. if (server.aof_flush_postponed_start == 0) {

  74. // aof_flush_postponed_start为0,说明从未执行过异步刷盘

  75. // 更新最新异步刷盘时间戳为当前,然后直接返回

  76. /* No previous write postponing, remember that we are

  77. * postponing the flush and return. */

  78. server.aof_flush_postponed_start = server.unixtime;

  79. return;

  80. } else if (server.unixtime - server.aof_flush_postponed_start < 2) {

  81. // 距离上次创建异步刷盘任务小于 2秒

  82. // 直接返回,等待目前还未执行异步任务刷盘就行

  83. /* We were already waiting for fsync to finish, but for less

  84. * than two seconds this is still ok. Postpone again. */

  85. return;

  86. }

  87. /* Otherwise fall trough, and go write since we can't wait

  88. * over two seconds. */

  89. // 距离上次创建异步刷盘任务 大于2秒(任务目前还未真正执行)

  90. // 统计aof_delayed_fsync(刷盘延时)次数

  91. // 打印日志

  92. server.aof_delayed_fsync++;

  93. serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");

  94. }

  95. }

  96. /* We want to perform a single write. This should be guaranteed atomic

  97. * at least if the filesystem we are writing is a real physical one.

  98. * While this will save us against the server being killed I don't think

  99. * there is much to do about the whole server stopping for power problems

  100. * or alike */

  101. if (server.aof_flush_sleep && sdslen(server.aof_buf)) {

  102. usleep(server.aof_flush_sleep);

  103. }

  104. latencyStartMonitor(latency);

  105. // 调用aofWrite 将aof_buf(缓冲器)内容写到文件系统(内核)

  106. nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

  107. // 统计写到文件系统耗时

  108. latencyEndMonitor(latency);

  109. /* We want to capture different events for delayed writes:

  110. * when the delay happens with a pending fsync, or with a saving child

  111. * active, and when the above two conditions are missing.

  112. * We also use an additional event name to save all samples which is

  113. * useful for graphing / monitoring purposes. */

  114. // 根据不同情况,把耗时统计到不同监控项

  115. if (sync_in_progress) {

  116. // 如果处于还有异步刷盘任务未被执行

  117. // 统计时延耗时

  118. latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);

  119. } else if (hasActiveChildProcess()) {

  120. // 有子进程在进行RDB 或者AOF,统计耗时

  121. latencyAddSampleIfNeeded("aof-write-active-child",latency);

  122. } else {

  123. latencyAddSampleIfNeeded("aof-write-alone",latency);

  124. }

  125. latencyAddSampleIfNeeded("aof-write",latency);

  126. /* We performed the write so reset the postponed flush sentinel to zero. */

  127. server.aof_flush_postponed_start = 0;

  128. if (nwritten != (ssize_t)sdslen(server.aof_buf)) {

  129. // 未将缓存区内容全部写到内核

  130. static time_t last_write_error_log = 0;

  131. int can_log = 0;

  132. /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */

  133. // 打日志限流

  134. if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {

  135. can_log = 1;

  136. last_write_error_log = server.unixtime;

  137. }

  138. // 打印异常日志

  139. /* Log the AOF write error and record the error code. */

  140. if (nwritten == -1) {

  141. // 全部都没有写到文件系统

  142. if (can_log) {

  143. serverLog(LL_WARNING,"Error writing to the AOF file: %s",

  144. strerror(errno));

  145. server.aof_last_write_errno = errno;

  146. }

  147. } else {

  148. // 写一部分

  149. if (can_log) {

  150. serverLog(LL_WARNING,"Short write while writing to "

  151. "the AOF file: (nwritten=%lld, "

  152. "expected=%lld)",

  153. (long long)nwritten,

  154. (long long)sdslen(server.aof_buf));

  155. }

  156. // 截断文件

  157. if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {

  158. if (can_log) {

  159. serverLog(LL_WARNING, "Could not remove short write "

  160. "from the append-only file. Redis may refuse "

  161. "to load the AOF the next time it starts. "

  162. "ftruncate: %s", strerror(errno));

  163. }

  164. } else {

  165. /* If the ftruncate() succeeded we can set nwritten to

  166. * -1 since there is no longer partial data into the AOF. */

  167. nwritten = -1;

  168. }

  169. server.aof_last_write_errno = ENOSPC;

  170. }

  171. /* Handle the AOF write error. */

  172. if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

  173. // 处于每次刷盘,当时无法写到文件系统(内核)

  174. // 服务退出

  175. /* We can't recover when the fsync policy is ALWAYS since the reply

  176. * for the client is already in the output buffers (both writes and

  177. * reads), and the changes to the db can't be rolled back. Since we

  178. * have a contract with the user that on acknowledged or observed

  179. * writes are is synced on disk, we must exit. */

  180. serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");

  181. exit(1);

  182. } else {

  183. /* Recover from failed write leaving data into the buffer. However

  184. * set an error to stop accepting writes as long as the error

  185. * condition is not cleared. */

  186. server.aof_last_write_status = C_ERR;

  187. // 其他模式可以接受暂时没写到文件系统

  188. // 但是设置状态,在没有写入成功前阻止再次尝试写入aof buf(即不能执行写命令)

  189. /* Trim the sds buffer if there was a partial write, and there

  190. * was no way to undo it with ftruncate(2). */

  191. if (nwritten > 0) {

  192. server.aof_current_size += nwritten;

  193. sdsrange(server.aof_buf,nwritten,-1);

  194. }

  195. return; /* We'll try again on the next call... */

  196. }

  197. } else {

  198. /* Successful write(2). If AOF was in error state, restore the

  199. * OK state and log the event. */

  200. // 全部写入成功, 清除写入失败状态

  201. if (server.aof_last_write_status == C_ERR) {

  202. serverLog(LL_WARNING,

  203. "AOF write error looks solved, Redis can write again.");

  204. server.aof_last_write_status = C_OK;

  205. }

  206. }

  207. server.aof_current_size += nwritten;

  208. // 更加buf大小决定回收buf还是复用buf

  209. /* Re-use AOF buffer when it is small enough. The maximum comes from the

  210. * arena size of 4k minus some overhead (but is otherwise arbitrary). */

  211. if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {

  212. sdsclear(server.aof_buf);

  213. } else {

  214. sdsfree(server.aof_buf);

  215. server.aof_buf = sdsempty();

  216. }

  217. // 尝试刷盘

  218. try_fsync:

  219. /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are

  220. * children doing I/O in the background. */

  221. if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())

  222. return;

  223. /* Perform the fsync if needed. */

  224. if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

  225. /* redis_fsync is defined as fdatasync() for Linux in order to avoid

  226. * flushing metadata. */

  227. latencyStartMonitor(latency);

  228. /* Let's try to get this data on the disk. To guarantee data safe when

  229. * the AOF fsync policy is 'always', we should exit if failed to fsync

  230. * AOF (see comment next to the exit(1) after write error above). */

  231. // 每次都刷盘模式

  232. // 直接同步刷盘

  233. if (redis_fsync(server.aof_fd) == -1) {

  234. serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "

  235. "AOF fsync policy is 'always': %s. Exiting...", strerror(errno));

  236. // 刷盘异常,直接退出

  237. exit(1);

  238. }

  239. latencyEndMonitor(latency);

  240. latencyAddSampleIfNeeded("aof-fsync-always",latency);

  241. server.aof_fsync_offset = server.aof_current_size;

  242. server.aof_last_fsync = server.unixtime;

  243. } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&

  244. server.unixtime > server.aof_last_fsync)) {

  245. //每秒刷盘模式,并且已经超过1秒

  246. if (!sync_in_progress) {

  247. // 并且之前的异步刷盘任务已经完成

  248. // 再次创建AOF 异步刷盘任务

  249. aof_background_fsync(server.aof_fd);

  250. server.aof_fsync_offset = server.aof_current_size;

  251. }

  252. server.aof_last_fsync = server.unixtime;

  253. }

  254. }

  255. // 处理完所有客户端事件(命令)

  256. // 进入epoll_wait之前

  257. void beforeSleep(struct aeEventLoop *eventLoop) {

  258. // ...

  259. // 将aofbuf 写到文件系统

  260. /* Write the AOF buffer on disk */

  261. if (server.aof_state == AOF_ON)

  262. flushAppendOnlyFile(0);

  263. /* Handle writes with pending output buffers. */

  264. // 向客户端发送回复包

  265. handleClientsWithPendingWritesUsingThreads();

  266. // ...

  267. }

  • 开启aof时,redis要求说有写命令的aof日志必须先写入文件系统,才能想客户端发送回复包

  • 在beforeSleep中, 先调用flushAppendOnlyFile将aof缓冲器内容先写到文件系统

  • 再调用handleClientsWithPendingWritesUsingThreads向客户端发送回复包

  • flushAppendOnlyFile flush aof_buf的内容到文件系统,并且根据模式和相关条件进行 文件刷盘(可能是同步或者异步)

  • 异步刷盘时调用aofbackgroundfsync创建任务

小结

本文首先介绍了Redis中几种关键的异步I/O优化手段:惰性删除、异步关闭文件描述符和异步刷盘AOF。这些手段可通过异步处理避免I/O操作阻塞主线程,提供了Redis的超高并发能力。 然后,文中深入源码层面,剖析了这些异步I/O优化的实现机制,即Redis的Background I/O Service。该服务通过bio线程来负责处理被延迟的I/O操作,使主线程可以专注在处理命令请求上,避免被阻塞。 最后,通过分析几个典型场景的代码,让读者更直观地了解这些异步I/O机制的运用,如惰性删除在del命令中的应用,以及异步刷盘对AOF写入性能的提升。 总体上,本文系统地介绍了Redis为追求极致性能而设计的各种后台异步I/O优化手段,以及这些手段的实现原理。这些手段充分利用了Redis的单线程模型下的并发处理优势,使得Redis可以提供超过10万QPS的惊人性能,成为高性能数据库的典范。读者可以通过本文全面了解Redis的异步I/O优化设计。


原文始发于微信公众号(吃瓜技术派):揭开Redis极速底层逻辑 – 异步IO机制源码剖析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236041.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!