Redis源码剖析-阻塞命令的实现机制

Redis中的阻塞性命令可以让客户端被阻塞等待特定事件发生,常用在需要等待可用数据、等待结果等场景。

阻塞命令概述

等待数据

当数据不存在时,阻塞执行client直到数据符合条件或者超时。包括以下命令:

  • BRPOP/BLPOP: 从列表左右两端阻塞弹出元素,直到有元素为止。

  • BRPOPLPUSH: 阻塞式列表元素移动。

  • BLMOVE: 阻塞式的列表元素移动。

  • XREAD: 阻塞读取Stream数据。

  • XREADGROUP: 阻塞读取Stream的消费者组数据。

  • BZPOPMIN/BZPOPMAX: 阻塞并返回有序集合弹出的最小/大元素。

实现原理
  • 客户端向服务器发送一个阻塞命令,如BLPOP key 1。

  • 服务器将这个命令和客户端信息加入到一个字典中,键为阻塞的键,值为一个链表结构,链表中的每个元素记录一个客户端的信息。

  • 当字典中有多个客户端阻塞在同一个键上时,它们会被添加到同一个链表中。

  • 同时服务器会将客户端添加到TimeoutTable, 以便在beforesleep 函数中定时检测客户端阻塞是否超时

  • 目标键有新操作时,服务器会获取等待key对应的客户端列表, 并向客户端发送回复。

  • beforesleep 函数中定时检测到客户端阻塞超时,向客户端发送超时回复包

通过字典和链表结构,服务器可以高效管理多个客户端在同一个键上的阻塞,并在合适的时候解除阻塞

等待从节点数据同步

当还没有足够数量的从节点执行完client最新的写命令时,阻塞当前client直到有足够副本执行命令或者超时。

  • WAIT: 阻塞等待指定键的修改。

实现原理
  • 客户端向服务器发送wait 命令。

  • 服务器在处理wait命令时判断 是否已有存在足够数量的从节点已经执行完当前客户端最后一条写命令,如果足够直接返回。

  • 如果数量不足够,将本客户端 添加到server.clientswaitingacks 列表中, 并且设置客户端block 标记,添加到TimeoutTable; 设置 server.getackfrom_slaves以便在beforesleep 中向各个从节点发送请求 获取执行副本复制log 的offset

  • 在beforesleep 遍历server.clientswaitingacks 客户端列表; 对每个客户端判断是否已有存在足够数量的从节点已经执行完当前客户端最后一条写命令,如果是取消客户端阻塞。

  • beforesleep 函数中定时检测到客户端阻塞超时,向客户端发送超时回复包

暂停客户端执行

执行client PAUSE/ 或者failover 命令时, 会让服务器使所有客户端(不管是已经存在的还是后面新建立链接的)进入挂起状态, 直到超时。

  • client PAUSE milliseconds 让服务器使所有客户端(不管存在还是新建立链接)进入挂起状态,直到milliseconds 时间后

  • client UNPAUSE 会让服务器取消pause客户端并且唤醒所有被暂停的客户端。

  • failover option 哨兵提供了 failover 命令来手动触发主从切换。执行切换时先pause 执行写命令的客户端; 等待从节点执行完所有主节点写命令; 然后主把自己变为从节点, 原来的某个从节点切换成主; 最后unpause 执行写命令客户端

实现原理
  • 客户端向服务器发送client PAUSE 命令, 服务器设置全局pause标记(server.clientpausetype)并且记录 取消阻塞的 时间(server.clientpauseend_time)。

  • 服务器收到任意客户端命令,准备执行前判断是否需要阻塞客户端, 如果不需要阻塞即执行命令。

  • 如果需要阻塞全部客户端, 或者阻塞写命令并且此客户端的命令时写命令 阻塞此客户端, 将客户端添加到server.paused_clients 列表中;

  • 客户端向服务器发送client UNPAUSE 命令, 取消全局pause标记; 唤醒已经被阻塞的所有客户端。

  • beforesleep 函数中定时检测是否已经到了server.clientpauseend_time时间, 如果是取消全局pause标记 ; 唤醒已经被阻塞的所有客户端。

源码剖析

redis源码版本:6.2.5 本文从以下几个方面带着大家走读代码:

  • 阻塞等待数据的BLPOP

  • 等待从节点数据同步

  • 暂停客户端执行(client PAUSE/UNPAUSE )

主从同步 相关变量

  1. //path:src/server.h

  2. /* Client block type (btype field in client structure)

  3. * if CLIENT_BLOCKED flag is set. */

  4. // 客户端block 类型

  5. #define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */

  6. // list block 命令 (数据类型block)

  7. #define BLOCKED_LIST 1 /* BLPOP & co. */

  8. // 从节点同步执行命令(wait命令)

  9. #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */

  10. // modlue相关的block

  11. #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */

  12. // stream 类型xread命令 (数据类型block)

  13. #define BLOCKED_STREAM 4 /* XREAD. */

  14. // zset bzpop 命令 (数据类型block)

  15. #define BLOCKED_ZSET 5 /* BZPOP et al. */

  16. // 暂停block, 用于主动故障转移或者屏蔽redis服务读写

  17. #define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */

  18. #define BLOCKED_NUM 7 /* Number of blocked states. */

  19. // 阻塞客户端flags 会打上标记

  20. // 打上标记后processInputBuffer不解析客户端字节流处理命令

  21. // unblock 时取消此标记

  22. #define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */

  23. // unblocked 时打上此标记, 并且放置到server.unblocked_clients列表中

  24. // 在beforesleep调用processUnblockedClients 解析客户端缓冲区字节流,并且处理命令

  25. #define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in

  26. /* This structure holds the blocking operation state for a client.

  27. * The fields used depend on client->btype. */

  28. // 用于client 记录阻塞(block)的具体信息

  29. typedef struct blockingState {

  30. /* Generic fields. */

  31. // 通用字段, block超时时间

  32. mstime_t timeout; /* Blocking operation timeout. If UNIX current time

  33. * is > timeout then the operation timed out. */

  34. // 用于 list,zset,stream(因等待数据block类型)

  35. /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */

  36. dict *keys; /* The keys we are waiting to terminate a blocking

  37. * operation such as BLPOP or XREAD. Or NULL. */

  38. // BLMOVE 的目标列表

  39. robj *target; /* The key that should receive the element,

  40. * for BLMOVE. */

  41. // 用于记录从列表 开始 pop/push

  42. // 用于 BLPOP, BRPOP and BLMOVE

  43. struct listPos {

  44. int wherefrom; /* Where to pop from */

  45. int whereto; /* Where to push to */

  46. } listpos; /* The positions in the src/dst lists

  47. * where we want to pop/push an element

  48. * for BLPOP, BRPOP and BLMOVE. */

  49. /* BLOCK_STREAM */

  50. size_t xread_count; /* XREAD COUNT option. */

  51. robj *xread_group; /* XREADGROUP group name. */

  52. robj *xread_consumer; /* XREADGROUP consumer name. */

  53. int xread_group_noack;

  54. /* BLOCKED_WAIT */

  55. // wait命令使用(wait命令用于block到副本同步完成)

  56. // 等待多少个副本同步完成

  57. int numreplicas; /* Number of replicas we are waiting for ACK. */

  58. // 同步 副本复杂log 的offset

  59. long long reploffset; /* Replication offset to reach. */

  60. /* BLOCKED_MODULE */

  61. void *module_blocked_handle; /* RedisModuleBlockedClient structure.

  62. which is opaque for the Redis core, only

  63. handled in module.c. */

  64. } blockingState;

  65. // 每个客户端链接对应一个client数据结构

  66. typedef struct client {

  67. // ...

  68. // 标记位

  69. uint64_t flags; /* Client flags: CLIENT_* macros. */

  70. // 用于client 记录阻塞(block)的具体信息

  71. blockingState bpop; /* blocking state */

  72. // ...

  73. }

  74. struct redisServer {

  75. // ...

  76. // 用于阻塞客户端超时处理

  77. rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */

  78. // ...

  79. }

  80. // path: src/networking.c

  81. /* This function is called every time, in the client structure 'c', there is

  82. * more query buffer to process, because we read more data from the socket

  83. * or because a client was blocked and later reactivated, so there could be

  84. * pending query buffer, already representing a full command, to process. */

  85. // 解析client buf协议包

  86. // 根据是否在io线程上下文决定是否执行命令

  87. // 如果在io上下文,解析到第一个完整协议包后保存return

  88. // 否则一直解析协议处理命令,直到解析不出完整协议包

  89. // 设置了 CLIENT_BLOCKED 后processInputBuffer不解析客户端字节流处理命令

  90. void processInputBuffer(client *c) {

  91. /* Keep processing while there is something in the input buffer */

  92. while(c->qb_pos < sdslen(c->querybuf)) {

  93. /* Immediately abort if the client is in the middle of something. */

  94. // 设置了 CLIENT_BLOCKED 直接不解析客户端协议处理,直接返回

  95. if (c->flags & CLIENT_BLOCKED) break;

  96. /* Don't process more buffers from clients that have already pending

  97. * commands to execute in c->argv. */

  98. // CLIENT_PENDING_COMMAND 代表已经解析到可执行命令但是还不能真正执行(比如在io多线程读取上下文中)

  99. // 因此直接退出

  100. if (c->flags & CLIENT_PENDING_COMMAND) break;

  101. /* Don't process input from the master while there is a busy script

  102. * condition on the slave. We want just to accumulate the replication

  103. * stream (instead of replying -BUSY like we do with other clients) and

  104. * later resume the processing. */

  105. if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

  106. /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is

  107. * written to the client. Make sure to not let the reply grow after

  108. * this flag has been set (i.e. don't process more commands).

  109. *

  110. * The same applies for clients we want to terminate ASAP. */

  111. if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

  112. /* Determine request type when unknown. */

  113. if (!c->reqtype) {

  114. if (c->querybuf[c->qb_pos] == '*') {

  115. c->reqtype = PROTO_REQ_MULTIBULK;

  116. } else {

  117. c->reqtype = PROTO_REQ_INLINE;

  118. }

  119. }

  120. // 从query_buf 根据redis协议解包

  121. if (c->reqtype == PROTO_REQ_INLINE) {

  122. if (processInlineBuffer(c) != C_OK) break;

  123. /* If the Gopher mode and we got zero or one argument, process

  124. * the request in Gopher mode. To avoid data race, Redis won't

  125. * support Gopher if enable io threads to read queries. */

  126. if (server.gopher_enabled && !server.io_threads_do_reads &&

  127. ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||

  128. c->argc == 0))

  129. {

  130. processGopherRequest(c);

  131. resetClient(c);

  132. c->flags |= CLIENT_CLOSE_AFTER_REPLY;

  133. break;

  134. }

  135. } else if (c->reqtype == PROTO_REQ_MULTIBULK) {

  136. if (processMultibulkBuffer(c) != C_OK) break;

  137. } else {

  138. serverPanic("Unknown request type");

  139. }

  140. /* Multibulk processing could see a <= 0 length. */

  141. if (c->argc == 0) {

  142. resetClient(c);

  143. } else {

  144. // 解包成功进入这里

  145. /* If we are in the context of an I/O thread, we can't really

  146. * execute the command here. All we can do is to flag the client

  147. * as one that needs to process the command. */

  148. // CLIENT_PENDING_READ被设置,代表io线程进行读取和解析

  149. // 目前处于解析中, 因此处在io多线程中,不能处理命令

  150. // 打上CLIENT_PENDING_COMMAND(有命令待处理) 返回

  151. if (c->flags & CLIENT_PENDING_READ) {

  152. c->flags |= CLIENT_PENDING_COMMAND;

  153. break;

  154. }

  155. // 走到这里说明可以直接处理命令

  156. /* We are finally ready to execute the command. */

  157. if (processCommandAndResetClient(c) == C_ERR) {

  158. /* If the client is no longer valid, we avoid exiting this

  159. * loop and trimming the client buffer later. So we return

  160. * ASAP in that case. */

  161. return;

  162. }

  163. }

  164. }

  165. /* Trim to pos */

  166. if (c->qb_pos) {

  167. sdsrange(c->querybuf,c->qb_pos,-1);

  168. c->qb_pos = 0;

  169. }

  170. }

  171. // path :src/blocked.c

  172. /* This function is called in the beforeSleep() function of the event loop

  173. * in order to process the pending input buffer of clients that were

  174. * unblocked after a blocking operation. */

  175. // 处理unblock 列表

  176. // 在beforesleep调用

  177. void processUnblockedClients(void) {

  178. listNode *ln;

  179. client *c;

  180. // 遍历server.unblocked_clients 列表

  181. while (listLength(server.unblocked_clients)) {

  182. ln = listFirst(server.unblocked_clients);

  183. serverAssert(ln != NULL);

  184. c = ln->value;

  185. listDelNode(server.unblocked_clients,ln);

  186. // 去掉unblock标记

  187. c->flags &= ~CLIENT_UNBLOCKED;

  188. /* Process remaining data in the input buffer, unless the client

  189. * is blocked again. Actually processInputBuffer() checks that the

  190. * client is not blocked before to proceed, but things may change and

  191. * the code is conceptually more correct this way. */

  192. if (!(c->flags & CLIENT_BLOCKED)) {

  193. /* If we have a queued command, execute it now. */

  194. // 执行pending 的命令

  195. // 之前解析出来当时没有执行的命令

  196. if (processPendingCommandsAndResetClient(c) == C_ERR) {

  197. continue;

  198. }

  199. /* Then process client if it has more data in it's buffer. */

  200. // 之前设置CLIENT_BLOCKED, tcp字节流被读取到缓存区中但是没有解析协议处理命令

  201. // 解析缓冲区buf(读), 处理更多命令

  202. if (c->querybuf && sdslen(c->querybuf) > 0) {

  203. processInputBuffer(c);

  204. }

  205. }

  206. }

  207. }

阻塞等待数据的BLPOP

BLPOP命令处理
  1. // path:src/t_list.c

  2. /* BLPOP <key> [<key> ...] <timeout> */

  3. // BLPOP 命令处理函数

  4. void blpopCommand(client *c) {

  5. blockingPopGenericCommand(c,LIST_HEAD);

  6. }

  7. /* Blocking RPOP/LPOP */

  8. // 处理 RPOP/LPOP命令

  9. void blockingPopGenericCommand(client *c, int where) {

  10. robj *o;

  11. mstime_t timeout;

  12. int j;

  13. // 获取超时时间

  14. if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)

  15. != C_OK) return;

  16. for (j = 1; j < c->argc-1; j++) {

  17. // 循环遍历key,获取val

  18. o = lookupKeyWrite(c->db,c->argv[j]);

  19. if (o != NULL) {

  20. if (checkType(c,o,OBJ_LIST)) {

  21. // 判断不是LIST 类型,返回(失败了)

  22. return;

  23. } else {

  24. // LIST类型

  25. if (listTypeLength(o) != 0) {

  26. // 判断LIST有元素,不用阻塞,直接取元素返回

  27. /* Non empty list, this is like a normal [LR]POP. */

  28. robj *value = listTypePop(o,where);

  29. serverAssert(value != NULL);

  30. addReplyArrayLen(c,2);

  31. addReplyBulk(c,c->argv[j]);

  32. addReplyBulk(c,value);

  33. decrRefCount(value);

  34. listElementsRemoved(c,c->argv[j],where,o,1);

  35. /* Replicate it as an [LR]POP instead of B[LR]POP. */

  36. rewriteClientCommandVector(c,2,

  37. (where == LIST_HEAD) ? shared.lpop : shared.rpop,

  38. c->argv[j]);

  39. return;

  40. }

  41. }

  42. }

  43. }

  44. /* If we are not allowed to block the client, the only thing

  45. * we can do is treating it as a timeout (even with timeout 0). */

  46. // 客户端标识为不可阻塞,直接当成超时返回

  47. if (c->flags & CLIENT_DENY_BLOCKING) {

  48. addReplyNullArray(c);

  49. return;

  50. }

  51. /* If the keys do not exist we must block */

  52. struct listPos pos = {where};

  53. // block client(等待某些key)

  54. // 用于 BLOCKED_LIST, BLOCKED_ZSET or BLOCKED_STREAM

  55. blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);

  56. }

  57. // path:src/blocked.c

  58. // block client(等待某些key)

  59. // 用于 BLOCKED_LIST, BLOCKED_ZSET or BLOCKED_STREAM

  60. void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {

  61. dictEntry *de;

  62. list *l;

  63. int j;

  64. // 记录block的timeout等信息

  65. c->bpop.timeout = timeout;

  66. // target用于记录BLMOVE的目标list

  67. c->bpop.target = target;

  68. if (listpos != NULL) c->bpop.listpos = *listpos;

  69. if (target != NULL) incrRefCount(target);

  70. for (j = 0; j < numkeys; j++) {

  71. /* Allocate our bkinfo structure, associated to each key the client

  72. * is blocked for. */

  73. bkinfo *bki = zmalloc(sizeof(*bki));

  74. if (btype == BLOCKED_STREAM)

  75. bki->stream_id = ids[j];

  76. /* If the key already exists in the dictionary ignore it. */

  77. // client 记录需要等待 key 而block

  78. if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {

  79. zfree(bki);

  80. continue;

  81. }

  82. incrRefCount(keys[j]);

  83. /* And in the other "side", to map keys -> clients */

  84. // 在全局c->db->blocking_keys中记录那些客户端因为那个key block

  85. de = dictFind(c->db->blocking_keys,keys[j]);

  86. if (de == NULL) {

  87. int retval;

  88. /* For every key we take a list of clients blocked for it */

  89. l = listCreate();

  90. retval = dictAdd(c->db->blocking_keys,keys[j],l);

  91. incrRefCount(keys[j]);

  92. serverAssertWithInfo(c,keys[j],retval == DICT_OK);

  93. } else {

  94. l = dictGetVal(de);

  95. }

  96. // 将client 添加到key 对应的list中

  97. listAddNodeTail(l,c);

  98. bki->listnode = listLast(l);

  99. }

  100. // 设置CLIENT_BLOCKE标记 并且添加到TimeoutTable中

  101. blockClient(c,btype);

  102. }

  103. /* Block a client for the specific operation type. Once the CLIENT_BLOCKED

  104. * flag is set client query buffer is not longer processed, but accumulated,

  105. * and will be processed when the client is unblocked. */

  106. // 设置CLIENT_BLOCKE标记 并且添加到TimeoutTable中

  107. void blockClient(client *c, int btype) {

  108. // 设置CLIENT_BLOCKE标记

  109. c->flags |= CLIENT_BLOCKED;

  110. c->btype = btype;

  111. server.blocked_clients++;

  112. server.blocked_clients_by_type[btype]++;

  113. // 添加到TimeoutTable中

  114. addClientToTimeoutTable(c);

  115. if (btype == BLOCKED_PAUSE) {

  116. // 如果是pause, 还添加到server.paused_client中

  117. // 并且添加CLIENT_PENDING_COMMAN标记

  118. listAddNodeTail(server.paused_clients, c);

  119. c->paused_list_node = listLast(server.paused_clients);

  120. /* Mark this client to execute its command */

  121. c->flags |= CLIENT_PENDING_COMMAND;

  122. }

  123. }

  124. /* Add the specified client id / timeout as a key in the radix tree we use

  125. * to handle blocked clients timeouts. The client is not added to the list

  126. * if its timeout is zero (block forever). */

  127. // 添加到TimeoutTable中

  128. void addClientToTimeoutTable(client *c) {

  129. if (c->bpop.timeout == 0) return;

  130. uint64_t timeout = c->bpop.timeout;

  131. unsigned char buf[CLIENT_ST_KEYLEN];

  132. encodeTimeoutKey(buf,timeout,c);

  133. // 添加到TimeoutTable中

  134. if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))

  135. c->flags |= CLIENT_IN_TO_TABLE;

  136. }

调用链
  1. void blpopCommand(client *c) // BLPOP命令处理函数

  2. - void blockingPopGenericCommand(client *c, int where) // 真正处理 BRPOP/BLPOP命令 逻辑

  3. - 遍历命令中的key列表

  4. - key对应的value 不是LIST类型,报错返回

  5. - key对应的LIST有元素,取数据返回

  6. - 客户端标识为不可阻塞,直接当成超时返回

  7. - void blockForKeys(client *c, ...) // block client(等待某些key)

  8. - - block的具体全局信息保存在 c->bpop

  9. - 遍历命令中的key列表

  10. - c->bpop.keys保存client 等待那些key数据

  11. - db的全局dict,即c->db->blocking_keys中对应key的列表中保存当前client,代表client等待这个key的数据

  12. - void blockClient(client *c, int btype) // 设置CLIENT_BLOCKE标记 并且添加到TimeoutTable中

  13. - c->flags |= CLIENT_BLOCKED; // 设置CLIENT_BLOCKE标记

  14. - void addClientToTimeoutTable(client *c) // 添加到TimeoutTable中

  15. - raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))

综上可知:

  • 当LIST 有数据时直接返回数据

  • LIST不存在(没数据时),设置client block标记,并且将client等待key的信息 保存在 c->bpop.keys 和 c->db->blocking_keys中

  • 为了超时解除客户端阻塞, 将client 保存在server.clientstimeouttable 的数据结构中

LIST添加数据解除阻塞
  1. // path:src/t_list.c

  2. /* LPUSH <key> <element> [<element> ...] */

  3. // LPUSH 命令处理函数

  4. void lpushCommand(client *c) {

  5. pushGenericCommand(c,LIST_HEAD,0);

  6. }

  7. /*-----------------------------------------------------------------------------

  8. * List Commands

  9. *----------------------------------------------------------------------------*/

  10. /* Implements LPUSH/RPUSH/LPUSHX/RPUSHX.

  11. * 'xx': push if key exists. */

  12. // 处理list push 元素

  13. void pushGenericCommand(client *c, int where, int xx) {

  14. int j;

  15. // 找到对应的value

  16. robj *lobj = lookupKeyWrite(c->db, c->argv[1]);

  17. // 检查是否是LIST类型

  18. if (checkType(c,lobj,OBJ_LIST)) return;

  19. if (!lobj) {

  20. // 不存在

  21. if (xx) {

  22. addReply(c, shared.czero);

  23. return;

  24. }

  25. // 创建LIST OBJECT

  26. lobj = createQuicklistObject();

  27. quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,

  28. server.list_compress_depth);

  29. // 添加key value

  30. // 当LIST 不存在时才会block客户端

  31. // 存在表示至少有一个元素,不会block客户端

  32. // 所以在dbAdd 调用signalKeyAsReady 即可保证LIST 添加元素时及时唤醒客户端

  33. dbAdd(c->db,c->argv[1],lobj);

  34. }

  35. // 添加元素到LIST 对象中

  36. for (j = 2; j < c->argc; j++) {

  37. listTypePush(lobj,c->argv[j],where);

  38. server.dirty++;

  39. }

  40. // 添加回复包给客户端

  41. addReplyLongLong(c, listTypeLength(lobj));

  42. char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

  43. signalModifiedKey(c,c->db,c->argv[1]);

  44. notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);

  45. }

  46. // path:src/db.c

  47. /* Add the key to the DB. It's up to the caller to increment the reference

  48. * counter of the value if needed.

  49. *

  50. * The program is aborted if the key already exists. */

  51. // 添加key value

  52. void dbAdd(redisDb *db, robj *key, robj *val) {

  53. sds copy = sdsdup(key->ptr);

  54. int retval = dictAdd(db->dict, copy, val);

  55. serverAssertWithInfo(NULL,key,retval == DICT_OK);

  56. // 添加kv 后, 调用signalKeyAsReady将key 加入到server.ready_keys

  57. // 后续在beforeSleep 处理server.ready_keys unblock相关客户端

  58. // 当LIST 或者ZSET 不存在时才会block客户端

  59. // 存在表示至少有一个元素,不会block客户端

  60. // 所以在dbAdd 调用signalKeyAsReady 即可保证LIST 或者ZSET 添加元素时及时唤醒客户端

  61. signalKeyAsReady(db, key, val->type);

  62. if (server.cluster_enabled) slotToKeyAdd(key->ptr);

  63. }

  64. // path:src/blocked.c

  65. /* If the specified key has clients blocked waiting for list pushes, this

  66. * function will put the key reference into the server.ready_keys list.

  67. * Note that db->ready_keys is a hash table that allows us to avoid putting

  68. * the same key again and again in the list in case of multiple pushes

  69. * made by a script or in the context of MULTI/EXEC.

  70. *

  71. * The list will be finally processed by handleClientsBlockedOnKeys() */

  72. void signalKeyAsReady(redisDb *db, robj *key, int type) {

  73. readyList *rl;

  74. /* Quick returns. */

  75. int btype = getBlockedTypeByType(type);

  76. // OBJ_LIST, OBJ_ZSET, OBJ_STREAM, OBJ_MODULE 才可能block

  77. if (btype == BLOCKED_NONE) {

  78. // 如果key对应的value 不会引起任何block,直接返回

  79. /* The type can never block. */

  80. return;

  81. }

  82. if (!server.blocked_clients_by_type[btype] &&

  83. !server.blocked_clients_by_type[BLOCKED_MODULE]) {

  84. // 不存在被block住的客户端,直接返回

  85. /* No clients block on this type. Note: Blocked modules are represented

  86. * by BLOCKED_MODULE, even if the intention is to wake up by normal

  87. * types (list, zset, stream), so we need to check that there are no

  88. * blocked modules before we do a quick return here. */

  89. return;

  90. }

  91. /* No clients blocking for this key? No need to queue it. */

  92. // 没有client因为此key block住,直接返回

  93. if (dictFind(db->blocking_keys,key) == NULL) return;

  94. /* Key was already signaled? No need to queue it again. */

  95. // 已经在db->ready_keys 中,直接返回

  96. if (dictFind(db->ready_keys,key) != NULL) return;

  97. /* Ok, we need to queue this key into server.ready_keys. */

  98. // 添加到 server.ready_keys中, beforeSleep时统一处理

  99. rl = zmalloc(sizeof(*rl));

  100. rl->key = key;

  101. rl->db = db;

  102. incrRefCount(key);

  103. // 添加到 server.ready_keys中, beforeSleep时统一处理

  104. listAddNodeTail(server.ready_keys,rl);

  105. /* We also add the key in the db->ready_keys dictionary in order

  106. * to avoid adding it multiple times into a list with a simple O(1)

  107. * check. */

  108. incrRefCount(key);

  109. serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);

  110. }

  111. // beforesleep 中调用

  112. // 遍历server.ready_keys 处理因为 key(也即数据)block的客户端

  113. void handleClientsBlockedOnKeys(void) {

  114. // server.ready_keys保存所有此次epoll_wait之后执行命令后有变动的key

  115. // server.ready_keys链表元素大于0

  116. while(listLength(server.ready_keys) != 0) {

  117. list *l;

  118. /* Point server.ready_keys to a fresh list and save the current one

  119. * locally. This way as we run the old list we are free to call

  120. * signalKeyAsReady() that may push new elements in server.ready_keys

  121. * when handling clients blocked into BLMOVE. */

  122. l = server.ready_keys;

  123. server.ready_keys = listCreate();

  124. // 遍历server.ready_keys链表

  125. while(listLength(l) != 0) {

  126. listNode *ln = listFirst(l);

  127. readyList *rl = ln->value;

  128. /* First of all remove this key from db->ready_keys so that

  129. * we can safely call signalKeyAsReady() against this key. */

  130. // 将当前处理的key 从db->ready_keys 删除

  131. dictDelete(rl->db->ready_keys,rl->key);

  132. /* Even if we are not inside call(), increment the call depth

  133. * in order to make sure that keys are expired against a fixed

  134. * reference time, and not against the wallclock time. This

  135. * way we can lookup an object multiple times (BLMOVE does

  136. * that) without the risk of it being freed in the second

  137. * lookup, invalidating the first one.

  138. * See https://github.com/redis/redis/pull/6554. */

  139. server.fixed_time_expire++;

  140. updateCachedTime(0);

  141. /* Serve clients blocked on the key. */

  142. // 获取key对应的val

  143. robj *o = lookupKeyWrite(rl->db,rl->key);

  144. if (o != NULL) {

  145. // 根据val 类型做具体处理

  146. if (o->type == OBJ_LIST)

  147. // LIST

  148. serveClientsBlockedOnListKey(o,rl);

  149. else if (o->type == OBJ_ZSET)

  150. // ZSET

  151. serveClientsBlockedOnSortedSetKey(o,rl);

  152. else if (o->type == OBJ_STREAM)

  153. // OBJ_STREAM

  154. serveClientsBlockedOnStreamKey(o,rl);

  155. /* We want to serve clients blocked on module keys

  156. * regardless of the object type: we don't know what the

  157. * module is trying to accomplish right now. */

  158. serveClientsBlockedOnKeyByModule(rl);

  159. }

  160. server.fixed_time_expire--;

  161. /* Free this item. */

  162. decrRefCount(rl->key);

  163. zfree(rl);

  164. listDelNode(l,ln);

  165. }

  166. listRelease(l); /* We have the new list on place at this point. */

  167. }

  168. }

  169. /* Helper function for handleClientsBlockedOnKeys(). This function is called

  170. * when there may be clients blocked on a list key, and there may be new

  171. * data to fetch (the key is ready). */

  172. // 用于处理 LIST添加了新元素时, 因为LIST阻塞的 client

  173. void serveClientsBlockedOnListKey(robj *o, readyList *rl) {

  174. /* We serve clients in the same order they blocked for

  175. * this key, from the first blocked to the last. */

  176. // 获取key 对应的 block 客户端列表

  177. dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);

  178. if (de) {

  179. list *clients = dictGetVal(de);

  180. int numclients = listLength(clients);

  181. // 遍历block client 列表

  182. while(numclients--) {

  183. listNode *clientnode = listFirst(clients);

  184. client *receiver = clientnode->value;

  185. // 不是 BLOCKED_LIST) 跳过

  186. if (receiver->btype != BLOCKED_LIST) {

  187. /* Put at the tail, so that at the next call

  188. * we'll not run into it again. */

  189. listRotateHeadToTail(clients);

  190. continue;

  191. }

  192. robj *dstkey = receiver->bpop.target;

  193. int wherefrom = receiver->bpop.listpos.wherefrom;

  194. int whereto = receiver->bpop.listpos.whereto;

  195. // 从列表获取对应元素

  196. robj *value = listTypePop(o, wherefrom);

  197. if (value) {

  198. /* Protect receiver->bpop.target, that will be

  199. * freed by the next unblockClient()

  200. * call. */

  201. if (dstkey) incrRefCount(dstkey);

  202. monotime replyTimer;

  203. elapsedStart(&replyTimer);

  204. // 处理具体LIST block 命令

  205. // BLMOVE BLPOP BRPOP

  206. if (serveClientBlockedOnList(receiver,

  207. rl->key,dstkey,rl->db,value,

  208. wherefrom, whereto) == C_ERR)

  209. {

  210. /* If we failed serving the client we need

  211. * to also undo the POP operation. */

  212. // 失败,value放回列表

  213. listTypePush(o,value,wherefrom);

  214. }

  215. updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

  216. // unblock 客户端

  217. unblockClient(receiver);

  218. if (dstkey) decrRefCount(dstkey);

  219. decrRefCount(value);

  220. } else {

  221. // LIST 没有元素了,break

  222. break;

  223. }

  224. }

  225. }

  226. // LIST 为空,直接删除KV

  227. if (listTypeLength(o) == 0) {

  228. dbDelete(rl->db,rl->key);

  229. notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);

  230. }

  231. /* We don't call signalModifiedKey() as it was already called

  232. * when an element was pushed on the list. */

  233. }

  234. // 当LIST 有变动(新增元素)时 处理 BLPOP 或者 BRPOR 或者 BLMOVE命令

  235. // 向某个被block的客户端 重新处理命令,发送数据回复包

  236. int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto)

  237. {

  238. robj *argv[5];

  239. if (dstkey == NULL) {

  240. // dstkey为空,即为BLPOP 或者 BRPOR命令

  241. /* Propagate the [LR]POP operation. */

  242. argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop :

  243. shared.rpop;

  244. argv[1] = key;

  245. propagate((wherefrom == LIST_HEAD) ?

  246. server.lpopCommand : server.rpopCommand,

  247. db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);

  248. /* BRPOP/BLPOP */

  249. // POP出来的元素值发送给客户端

  250. addReplyArrayLen(receiver,2);

  251. addReplyBulk(receiver,key);

  252. addReplyBulk(receiver,value);

  253. /* Notify event. */

  254. char *event = (wherefrom == LIST_HEAD) ? "lpop" : "rpop";

  255. notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);

  256. } else {

  257. /* BLMOVE */

  258. // 将POP出来的元素压到另外一个列表

  259. // 先获取dstkey 对应LIST Value

  260. robj *dstobj =

  261. lookupKeyWrite(receiver->db,dstkey);

  262. if (!(dstobj &&

  263. checkType(receiver,dstobj,OBJ_LIST)))

  264. {

  265. // 存在 并且是 LIST 类型

  266. lmoveHandlePush(receiver,dstkey,dstobj,value,whereto);

  267. /* Propagate the LMOVE/RPOPLPUSH operation. */

  268. int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand);

  269. argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove;

  270. argv[1] = key;

  271. argv[2] = dstkey;

  272. argv[3] = getStringObjectFromListPosition(wherefrom);

  273. argv[4] = getStringObjectFromListPosition(whereto);

  274. propagate(isbrpoplpush ? server.rpoplpushCommand : server.lmoveCommand,

  275. db->id,argv,(isbrpoplpush ? 3 : 5),

  276. PROPAGATE_AOF|

  277. PROPAGATE_REPL);

  278. /* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */

  279. notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",

  280. key,receiver->db->id);

  281. } else {

  282. // 不是LIST 类型, 只能报错

  283. /* BLMOVE failed because of wrong

  284. * destination type. */

  285. return C_ERR;

  286. }

  287. }

  288. return C_OK;

  289. }

  290. /* Unblock a client calling the right function depending on the kind

  291. * of operation the client is blocking for. */

  292. // 取消客户端block

  293. // 根据客户端不同的block类型(原因)调用不同函数进行处理

  294. void unblockClient(client *c) {

  295. if (c->btype == BLOCKED_LIST ||

  296. c->btype == BLOCKED_ZSET ||

  297. c->btype == BLOCKED_STREAM) {

  298. // 等待数据类型的 block

  299. unblockClientWaitingData(c);

  300. } else if (c->btype == BLOCKED_WAIT) {

  301. // 等待副本同步的block

  302. unblockClientWaitingReplicas(c);

  303. } else if (c->btype == BLOCKED_MODULE) {

  304. // 等待module状态相关的block

  305. if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);

  306. unblockClientFromModule(c);

  307. } else if (c->btype == BLOCKED_PAUSE) {

  308. // 暂停类型的block

  309. listDelNode(server.paused_clients,c->paused_list_node);

  310. c->paused_list_node = NULL;

  311. } else {

  312. serverPanic("Unknown btype in unblockClient().");

  313. }

  314. /* Reset the client for a new query since, for blocking commands

  315. * we do not do it immediately after the command returns (when the

  316. * client got blocked) in order to be still able to access the argument

  317. * vector from module callbacks and updateStatsOnUnblock. */

  318. if (c->btype != BLOCKED_PAUSE) {

  319. // 命令类型block(即非pause类型)

  320. // 此处需要释放或者reset相关变量

  321. freeClientOriginalArgv(c);

  322. resetClient(c);

  323. }

  324. /* Clear the flags, and put the client in the unblocked list so that

  325. * we'll process new commands in its query buffer ASAP. */

  326. server.blocked_clients--;

  327. server.blocked_clients_by_type[c->btype]--;

  328. // 去掉blocked 标记

  329. c->flags &= ~CLIENT_BLOCKED;

  330. c->btype = BLOCKED_NONE;

  331. // 从block超时相关的dict中删除客户端

  332. removeClientFromTimeoutTable(c);

  333. // 对client添加unblock 标记,并且添加到erver.unblocked_clients

  334. // 之后在beforesleep 进行处理(unblock时或者之后的命令处理)

  335. queueClientForReprocessing(c);

  336. }

  337. /* Unblock a client that's waiting in a blocking operation such as BLPOP.

  338. * You should never call this function directly, but unblockClient() instead. */

  339. // 通过调用unblockClient() 来unblock

  340. // 遍历c->bpop.keys, 从c->db->blocking_keys key对应列表中删除对应client

  341. // 删除c->bpop.keys列表

  342. void unblockClientWaitingData(client *c) {

  343. dictEntry *de;

  344. dictIterator *di;

  345. list *l;

  346. serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);

  347. di = dictGetIterator(c->bpop.keys);

  348. /* The client may wait for multiple keys, so unblock it for every key. */

  349. // 一个客户端可能等待多个key 就绪, 因此遍历所有等待的key进行unblock

  350. // 遍历 c->bpop.keys

  351. while((de = dictNext(di)) != NULL) {

  352. robj *key = dictGetKey(de);

  353. bkinfo *bki = dictGetVal(de);

  354. /* Remove this client from the list of clients waiting for this key. */

  355. // 从c->db->blocking_keys 找到key对应列表

  356. l = dictFetchValue(c->db->blocking_keys,key);

  357. serverAssertWithInfo(c,key,l != NULL);

  358. // 从列表中删除本client的 node

  359. listDelNode(l,bki->listnode);

  360. /* If the list is empty we need to remove it to avoid wasting memory */

  361. if (listLength(l) == 0)

  362. // 如果列表为空,从c->db->blocking_keys删除对应key val

  363. dictDelete(c->db->blocking_keys,key);

  364. }

  365. dictReleaseIterator(di);

  366. /* Cleanup the client structure */

  367. // 清理client结构中有关block相关的变量

  368. dictEmpty(c->bpop.keys,NULL);

  369. if (c->bpop.target) {

  370. decrRefCount(c->bpop.target);

  371. c->bpop.target = NULL;

  372. }

  373. if (c->bpop.xread_group) {

  374. decrRefCount(c->bpop.xread_group);

  375. decrRefCount(c->bpop.xread_consumer);

  376. c->bpop.xread_group = NULL;

  377. c->bpop.xread_consumer = NULL;

  378. }

  379. }

  380. // 对client添加unblock 标记,并且添加到erver.unblocked_clients

  381. // 在beforesleep 进行1处理(unblock时或者之后的命令处理)

  382. void queueClientForReprocessing(client *c) {

  383. /* The client may already be into the unblocked list because of a previous

  384. * blocking operation, don't add back it into the list multiple times. */

  385. if (!(c->flags & CLIENT_UNBLOCKED)) {

  386. c->flags |= CLIENT_UNBLOCKED;

  387. listAddNodeTail(server.unblocked_clients,c);

  388. }

  389. }

  390. // path:src/server.c

  391. // 处理完所有客户端事件(命令)

  392. // 进入epoll_wait之前

  393. void beforeSleep(struct aeEventLoop *eventLoop) {

  394. // ...

  395. /* Try to process blocked clients every once in while. Example: A module

  396. * calls RM_SignalKeyAsReady from within a timer callback (So we don't

  397. * visit processCommand() at all). */

  398. // 处理被某些key block的客户端列表

  399. handleClientsBlockedOnKeys();

  400. // ...

  401. }

  402. // path:src/timeout.c

  403. /* Remove the client from the table when it is unblocked for reasons

  404. * different than timing out. */

  405. // 将client 从TimeoutTable中删除

  406. void removeClientFromTimeoutTable(client *c) {

  407. if (!(c->flags & CLIENT_IN_TO_TABLE)) return;

  408. c->flags &= ~CLIENT_IN_TO_TABLE;

  409. uint64_t timeout = c->bpop.timeout;

  410. unsigned char buf[CLIENT_ST_KEYLEN];

  411. encodeTimeoutKey(buf,timeout,c);

  412. raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);

  413. }

调用链
  1. void lpushCommand(client *c) // LPUSH 命令处理函数

  2. - void pushGenericCommand(client *c, int where, int xx) // 处理list push 元素

  3. - void dbAdd(redisDb *db, robj *key, robj *val) //当LIST 不存在, 添加key value, 不存在时才会block客户端,

  4. - void signalKeyAsReady(redisDb *db, robj *key, int type) // 调用signalKeyAsReady 即可保证LIST 添加元素时及时唤醒客户端

  5. - LISTkey添加到server.ready_keys中, beforeSleep时统一处理

  6. void beforeSleep(struct aeEventLoop *eventLoop) // 处理完所有客户端事件(命令), 进入epoll_wait之前

  7. - void handleClientsBlockedOnKeys(void) // // 遍历server.ready_keys 处理因为 key(也即数据)block的客户端

  8. - while(listLength(server.ready_keys) != 0) // 遍历, server.ready_keys保存所有此次epoll_wait之后执行命令后有变动的key

  9. - void serveClientsBlockedOnListKey(robj *o, readyList *rl) // 用于处理 LIST添加了新元素时, 因为LIST阻塞的 client

  10. - 遍历 key 对应的 block 客户端列表(rl->db->blocking_keys)

  11. - 遍历客户端列表

  12. - LIST 取出一个元素

  13. - 能够取到值

  14. - int serveClientBlockedOnList(client *receiver...) // 当LIST 有变动(新增元素)时 处理 BLPOP 或者 BRPOR 或者 BLMOVE命令, 向某个被block的客户端 重新处理命令,发送数据回复包

  15. - void unblockClient(client *c) // 客户端等到数据并且处理了,unblock客户端

  16. - void unblockClientWaitingData(client *c) // 遍历c->bpop.keys, 从c->db->blocking_keys key对应列表中删除对应client, 删除c->bpop.keys列表

  17. - c->flags &= ~CLIENT_BLOCKED; // 去掉blocked 标记

  18. - void removeClientFromTimeoutTable(client *c) // 从block超时相关的dict中删除客户端

  19. - void queueClientForReprocessing(client *c) // 对client添加unblock 标记,并且添加到server.unblocked_clients,在beforesleep 时 进一步处理

  20. - LIST为空

  21. - break // 遍历客户端列表

综上:

  • 当向LIST添加数据时,当LIST从空列表(不存在)变为有元素时, 将key记录到server.ready_keys

  • 在beforesleep时调用handleClientsBlockedOnKeys 对server.ready_keys进行处理, 取出对应数据发送给等待它的客户端,并且 unblock这些客户端

  • unblock的客户端被添加到server.unblocked_clients中,beforesleep时调用processUnblockedClients 进一步解析client 缓存内容,执行命令

超时处理
  1. // path:src/server.c

  2. // 处理完所有客户端事件(命令)

  3. // 进入epoll_wait之前

  4. void beforeSleep(struct aeEventLoop *eventLoop) {

  5. // ...

  6. // 检测处理block client timeout

  7. handleBlockedClientsTimeout();

  8. // ...

  9. }

  10. // path:src/timeout.c

  11. /* This function is called in beforeSleep() in order to unblock clients

  12. * that are waiting in blocking operations with a timeout set. */

  13. // 检测处理block client timeout

  14. // 在beforesleep 调用

  15. void handleBlockedClientsTimeout(void) {

  16. if (raxSize(server.clients_timeout_table) == 0) return;

  17. uint64_t now = mstime();

  18. raxIterator ri;

  19. raxStart(&ri,server.clients_timeout_table);

  20. raxSeek(&ri,"^",NULL,0);

  21. // 遍历TimeoutTable

  22. while(raxNext(&ri)) {

  23. uint64_t timeout;

  24. client *c;

  25. // 解码出 timeout 时间 和 client指针

  26. decodeTimeoutKey(ri.key,&timeout,&c);

  27. // 已经不超时,break

  28. if (timeout >= now) break; /* All the timeouts are in the future. */

  29. c->flags &= ~CLIENT_IN_TO_TABLE;

  30. // 处理block timeout 客户端

  31. // 回复超时,并且unblock客户端

  32. checkBlockedClientTimeout(c,now);

  33. // 从TimeoutTable 删除已经超时客户端

  34. raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);

  35. raxSeek(&ri,"^",NULL,0);

  36. }

  37. raxStop(&ri);

  38. }

  39. /* Check if this blocked client timedout (does nothing if the client is

  40. * not blocked right now). If so send a reply, unblock it, and return 1.

  41. * Otherwise 0 is returned and no operation is performed. */

  42. // 处理block timeout 客户端

  43. // 回复超时,并且unblock客户端

  44. int checkBlockedClientTimeout(client *c, mstime_t now) {

  45. if (c->flags & CLIENT_BLOCKED &&

  46. c->bpop.timeout != 0

  47. && c->bpop.timeout < now)

  48. {

  49. // block 并且超时

  50. /* Handle blocking operation specific timeout. */

  51. // 给客户端发送超时回复包

  52. replyToBlockedClientTimedOut(c);

  53. // unblock客户端

  54. unblockClient(c);

  55. return 1;

  56. } else {

  57. return 0;

  58. }

  59. }

  60. // path: src/blocked.c

  61. /* This function gets called when a blocked client timed out in order to

  62. * send it a reply of some kind. After this function is called,

  63. * unblockClient() will be called with the same client as argument. */

  64. void replyToBlockedClientTimedOut(client *c) {

  65. if (c->btype == BLOCKED_LIST ||

  66. c->btype == BLOCKED_ZSET ||

  67. c->btype == BLOCKED_STREAM) {

  68. addReplyNullArray(c);

  69. } else if (c->btype == BLOCKED_WAIT) {

  70. addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));

  71. } else if (c->btype == BLOCKED_MODULE) {

  72. moduleBlockedClientTimedOut(c);

  73. } else {

  74. serverPanic("Unknown btype in replyToBlockedClientTimedOut().");

  75. }

  76. }

调用链
  1. void beforeSleep(struct aeEventLoop *eventLoop) // 处理完所有客户端事件(命令), 进入epoll_wait之前

  2. - void handleBlockedClientsTimeout(void) // 检测处理block client timeout

  3. - int checkBlockedClientTimeout(client *c, mstime_t now) //处理block timeout 客户端, 回复超时,并且unblock客户端

  4. - void replyToBlockedClientTimedOut(client *c) // 给block timeout 客户端发送超时回复包

  5. - void unblockClient(client *c) // 取消客户端block, 根据客户端不同的block类型(原因)调用不同函数进行处理

等待从节点数据同步

  1. //path: src/replication.c

  2. /* WAIT for N replicas to acknowledge the processing of our latest

  3. * write command (and all the previous commands). */

  4. // wait 命令除了函数

  5. // wait 命令: block等待n个从节点执行完本客户端最后的写命令

  6. void waitCommand(client *c) {

  7. mstime_t timeout;

  8. long numreplicas, ackreplicas;

  9. // 当前客户端写命令产生的最新副本复制日志 offset

  10. long long offset = c->woff;

  11. if (server.masterhost) {

  12. // 如果是从节点,不支持wait命令

  13. addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");

  14. return;

  15. }

  16. /* Argument parsing. */

  17. // 解析参数, 获取需要等待副本节点同步完成的数量

  18. if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)

  19. return;

  20. // 等待(block超时)时间

  21. if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)

  22. != C_OK) return;

  23. /* First try without blocking at all. */

  24. // 判断 已经完成 执行完 客户端要求的 副本复制log offset的从节点数量

  25. ackreplicas = replicationCountAcksByOffset(c->woff);

  26. if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {

  27. // 如果数量符合要求,直接返回

  28. addReplyLongLong(c,ackreplicas);

  29. return;

  30. }

  31. /* Otherwise block the client and put it into our list of clients

  32. * waiting for ack from slaves. */

  33. // 设置client 中bpop相关遍历

  34. c->bpop.timeout = timeout; // 超时时间

  35. c->bpop.reploffset = offset; // 副本复制日志log offset

  36. c->bpop.numreplicas = numreplicas; // 要求从节点数量

  37. listAddNodeHead(server.clients_waiting_acks,c); // 将客户端添加到全局变量server.clients_waiting_acks 列表

  38. // 调用block 函数

  39. blockClient(c,BLOCKED_WAIT);

  40. /* Make sure that the server will send an ACK request to all the slaves

  41. * before returning to the event loop. */

  42. // 设置server.get_ack_from_slaves = 1;

  43. // 即在beforesleep(进入epoll wait之前)会向从节点发送命令,要求他们回复当前执行完成的副本复制日志 offset

  44. replicationRequestAckFromSlaves();

  45. }

  46. /* This just set a flag so that we broadcast a REPLCONF GETACK command

  47. * to all the slaves in the beforeSleep() function. Note that this way

  48. * we "group" all the clients that want to wait for synchronous replication

  49. * in a given event loop iteration, and send a single GETACK for them all. */

  50. // 设置全局变量server.get_ack_from_slaves, 在beforesleep进行处理

  51. void replicationRequestAckFromSlaves(void) {

  52. server.get_ack_from_slaves = 1;

  53. }

  54. // path:src/server.c

  55. // 处理完所有客户端事件(命令)

  56. // 进入epoll_wait之前

  57. void beforeSleep(struct aeEventLoop *eventLoop) {

  58. // ...

  59. /* Unblock all the clients blocked for synchronous replication

  60. * in WAIT. */

  61. // 处理副本同步进度而block的客户端

  62. if (listLength(server.clients_waiting_acks))

  63. processClientsWaitingReplicas();

  64. // ...

  65. /* Send all the slaves an ACK request if at least one client blocked

  66. * during the previous event loop iteration. Note that we do this after

  67. * processUnblockedClients(), so if there are multiple pipelined WAITs

  68. * and the just unblocked WAIT gets blocked again, we don't have to wait

  69. * a server cron cycle in absence of other event loop events. See #6623.

  70. *

  71. * We also don't send the ACKs while clients are paused, since it can

  72. * increment the replication backlog, they'll be sent after the pause

  73. * if we are still the master. */

  74. // get_ack_from_slaves 为1代表有客户端使用wait命令 需要获取副本ack

  75. // checkClientPauseTimeoutAndReturnIfPaused(),如果当前处于pause阶段,return ture, 否则false

  76. if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {

  77. robj *argv[3];

  78. argv[0] = shared.replconf;

  79. argv[1] = shared.getack;

  80. argv[2] = shared.special_asterick; /* Not used argument. */

  81. // 发生获取当前从节点 最新执行命令(即副本复制log offset)命令

  82. replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);

  83. server.get_ack_from_slaves = 0;

  84. }

  85. // ...

  86. }

  87. //path: src/replication.c

  88. /* Check if there are clients blocked in WAIT that can be unblocked since

  89. * we received enough ACKs from slaves. */

  90. // 处理同步副本而block的客户端(beforeSleep调用)

  91. void processClientsWaitingReplicas(void) {

  92. long long last_offset = 0;

  93. int last_numreplicas = 0;

  94. listIter li;

  95. listNode *ln;

  96. listRewind(server.clients_waiting_acks,&li);

  97. // 遍历block 客户端

  98. while((ln = listNext(&li))) {

  99. client *c = ln->value;

  100. /* Every time we find a client that is satisfied for a given

  101. * offset and number of replicas, we remember it so the next client

  102. * may be unblocked without calling replicationCountAcksByOffset()

  103. * if the requested offset / replicas were equal or less. */

  104. // 判断从节点offset>c->bpop.reploffset的数量是否>=c->bpop.numreplicas

  105. if (last_offset && last_offset >= c->bpop.reploffset &&

  106. last_numreplicas >= c->bpop.numreplicas)

  107. {

  108. unblockClient(c);

  109. addReplyLongLong(c,last_numreplicas);

  110. } else {

  111. int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);

  112. if (numreplicas >= c->bpop.numreplicas) {

  113. last_offset = c->bpop.reploffset;

  114. last_numreplicas = numreplicas;

  115. unblockClient(c);

  116. addReplyLongLong(c,numreplicas);

  117. }

  118. }

  119. }

  120. }

  121. // 主从节点交互命令 replconf处理函数

  122. void replconfCommand(client *c) {

  123. // ...

  124. else if (!strcasecmp(c->argv[j]->ptr,"ack")) {

  125. // 处理从节点getack 回复包

  126. // 保存从节点当前最新执行完的 副本复制log offset

  127. /* REPLCONF ACK is used by slave to inform the master the amount

  128. * of replication stream that it processed so far. It is an

  129. * internal only command that normal clients should never use. */

  130. long long offset;

  131. if (!(c->flags & CLIENT_SLAVE)) return;

  132. if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))

  133. return;

  134. if (offset > c->repl_ack_off)

  135. // 保存从节点当前最新执行完的 副本复制log offset

  136. c->repl_ack_off = offset;

  137. c->repl_ack_time = server.unixtime;

  138. /* If this was a diskless replication, we need to really put

  139. * the slave online when the first ACK is received (which

  140. * confirms slave is online and ready to get more data). This

  141. * allows for simpler and less CPU intensive EOF detection

  142. * when streaming RDB files.

  143. * There's a chance the ACK got to us before we detected that the

  144. * bgsave is done (since that depends on cron ticks), so run a

  145. * quick check first (instead of waiting for the next ACK. */

  146. if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)

  147. checkChildrenDone();

  148. if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)

  149. putSlaveOnline(c);

  150. /* Note: this command does not reply anything! */

  151. return;

  152. } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {

  153. /* REPLCONF GETACK is used in order to request an ACK ASAP

  154. * to the slave. */

  155. // 处理master 节点的replconf getack 命令

  156. // 发生当前执行完命令的副本复制log offset

  157. if (server.masterhost && server.master) replicationSendAck();

  158. return;

  159. } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) {

  160. // ...

  161. }

  162. // ...

  163. }

调用链
  1. void waitCommand(client *c) //wait 命令: block等待n个从节点执行完本客户端最后的写命令

  2. - if 已经有足够数量的从节点同步完数据(执行完client最后的写命令)

  3. - 返回成功

  4. - else

  5. - client 添加到全局变量server.clients_waiting_acks 列表

  6. - blockClient(c,BLOCKED_WAIT); // 调用block 函数

  7. - void replicationRequestAckFromSlaves(void) // 设置全局变量server.get_ack_from_slaves, 在beforesleep进行处理

  8. void beforeSleep(struct aeEventLoop *eventLoop) // 处理完所有客户端事件(命令), 进入epoll_wait之前

  9. - if (listLength(server.clients_waiting_acks)) // 处理副本同步进度而block的客户端

  10. - void processClientsWaitingReplicas(void) //处理同步副本而block的客户端, 判断有足够从节点同步完数据时unblock相关客户端

  11. - void unblockClient(client *c) // 取消客户端block, 根据客户端不同的block类型(原因)调用不同函数进行处理

  12. - if server.get_ack_from_slaves == 1

  13. - 给所有从节点发送replconf getack 命令

  14. // 主从节点交互命令 replconf处理函数

  15. void replconfCommand(client *c) // 主从节点交互命令 replconf处理函数

  16. - if replconf ack 回复包 // 本节点是主节点

  17. - // 保存从节点当前最新执行完的 副本复制log offset

  18. - elseif replconf getack // 本节点是从节点

  19. - 发生当前执行完命令的副本复制log offset

综上所述:

  • wait命令处理函数检查是否已经有足够数量从节点同步,如果是直接返回不阻塞客户端

  • 如果不满足, 将客户端添加到server.clientswaitingacks 列表, 设置server.getackfrom_slaves为1,并且阻塞客户端

  • beforesleep 定时检测server.getackfrom_slaves为1, 发送replconf getack 命令给从节点要求获取其最新执行的命令(副本复制log offset)

  • beforesleep 定时检测server.clientswaitingacks 列表中客户端是否已经有足够数量的从节点满足条件,是的话unblock客户端

暂停客户端执行(client PAUSE/UNPAUSE )

client PAUSE命令处理
  1. // path: src/networking.c

  2. // client 命令处理函数

  3. void clientCommand(client *c) {

  4. // ...

  5. else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {

  6. // unpuase 命令

  7. /* CLIENT UNPAUSE */

  8. // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)

  9. unpauseClients();

  10. addReply(c,shared.ok);

  11. } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||

  12. c->argc == 4))

  13. {

  14. /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */

  15. // pause 命令

  16. mstime_t end;

  17. int type = CLIENT_PAUSE_ALL;

  18. if (c->argc == 4) {

  19. // 判断是unpause所有客户端还是只unpause 执行写命令的客户端

  20. if (!strcasecmp(c->argv[3]->ptr,"write")) {

  21. type = CLIENT_PAUSE_WRITE;

  22. } else if (!strcasecmp(c->argv[3]->ptr,"all")) {

  23. type = CLIENT_PAUSE_ALL;

  24. } else {

  25. addReplyError(c,

  26. "CLIENT PAUSE mode must be WRITE or ALL");

  27. return;

  28. }

  29. }

  30. // 获取取消pause(也即unpause) 时间

  31. if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,

  32. UNIT_MILLISECONDS) != C_OK) return;

  33. // 启动 pause(暂停) 客户端

  34. // 此函数只是设置server.client_pause_type标记

  35. // 真正block客户端是在processCommand函数中

  36. pauseClients(end, type);

  37. addReply(c,shared.ok);

  38. } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {

  39. // ...

  40. }

  41. // ...

  42. }

  43. /* Pause clients up to the specified unixtime (in ms) for a given type of

  44. * commands.

  45. *

  46. * A main use case of this function is to allow pausing replication traffic

  47. * so that a failover without data loss to occur. Replicas will continue to receive

  48. * traffic to faciliate this functionality.

  49. *

  50. * This function is also internally used by Redis Cluster for the manual

  51. * failover procedure implemented by CLUSTER FAILOVER.

  52. *

  53. * The function always succeed, even if there is already a pause in progress.

  54. * In such a case, the duration is set to the maximum and new end time and the

  55. * type is set to the more restrictive type of pause. */

  56. // 启动 pause(暂停) 客户端

  57. // 此函数只是设置server.client_pause_type标记

  58. // 真正block客户端是在processCommand函数中

  59. void pauseClients(mstime_t end, pause_type type) {

  60. // 设置pause_type

  61. if (type > server.client_pause_type) {

  62. server.client_pause_type = type;

  63. }

  64. // 设置pause 的超时时间

  65. if (end > server.client_pause_end_time) {

  66. server.client_pause_end_time = end;

  67. }

  68. /* We allow write commands that were queued

  69. * up before and after to execute. We need

  70. * to track this state so that we don't assert

  71. * in propagate(). */

  72. if (server.in_exec) {

  73. server.client_pause_in_transaction = 1;

  74. }

  75. }

调用链
  1. void clientCommand(client *c) // client 命令处理函数

  2. - if client pause

  3. - void pauseClients(mstime_t end, pause_type type) // 启动 pause(暂停) 客户端,

  4. - 设置server.client_pause_type标记; // 真正block客户端是在processCommand函数中

  5. - 设置server.client_pause_end_time, end_time时结束pause// end_time在beforesleep判断超时处理,为0即beforesleep 不处理(不超时)

综上, client pause 仅设置server.clientpausetype标记

执行命令的client 被阻塞
  1. /* If this function gets called we already read a whole

  2. * command, arguments are in the client argv/argc fields.

  3. * processCommand() execute the command or prepare the

  4. * server for a bulk read from the client.

  5. *

  6. * If C_OK is returned the client is still alive and valid and

  7. * other operations can be performed by the caller. Otherwise

  8. * if C_ERR is returned the client was destroyed (i.e. after QUIT). */

  9. // 处理客户端命令我

  10. int processCommand(client *c) {

  11. // ...

  12. /* Now lookup the command and check ASAP about trivial error conditions

  13. * such as wrong arity, bad command name and so forth. */

  14. c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

  15. // ...

  16. /* If the server is paused, block the client until

  17. * the pause has ended. Replicas are never paused. */

  18. // 判断server.client_pause_type 设置了标记

  19. if (!(c->flags & CLIENT_SLAVE) &&

  20. ((server.client_pause_type == CLIENT_PAUSE_ALL) ||

  21. (server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))

  22. {

  23. c->bpop.timeout = 0;

  24. // 阻塞客户端,设置CLIENT_BLOCKE标记,添加到TimeoutTable中

  25. // pause, 还添加到server.paused_client中

  26. blockClient(c,BLOCKED_PAUSE);

  27. return C_OK;

  28. }

  29. /* Exec the command */

  30. if (c->flags & CLIENT_MULTI &&

  31. c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&

  32. c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&

  33. c->cmd->proc != resetCommand)

  34. {

  35. queueMultiCommand(c);

  36. addReply(c,shared.queued);

  37. } else {

  38. call(c,CMD_CALL_FULL);

  39. c->woff = server.master_repl_offset;

  40. if (listLength(server.ready_keys))

  41. handleClientsBlockedOnKeys();

  42. }

  43. return C_OK;

  44. }

通过走读代码可知:

  • processCommand 在执行命令前判断server.clientpausetype 当前将执行命令,如果需要pause, 调用blockClient

  • blockClient 设置CLIENTBLOCKE标记,client 被添加到TimeoutTable中和 server.pausedclient 列表中

client UNPAUSE命令处理
  1. // path: src/networking.c

  2. // client 命令处理函数

  3. void clientCommand(client *c) {

  4. // ...

  5. else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {

  6. // unpuase 命令

  7. /* CLIENT UNPAUSE */

  8. // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)

  9. unpauseClients();

  10. addReply(c,shared.ok);

  11. } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||

  12. c->argc == 4))

  13. // ...

  14. }

  15. // ....

  16. }

  17. /* Unpause clients and queue them for reprocessing. */

  18. // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)

  19. void unpauseClients(void) {

  20. listNode *ln;

  21. listIter li;

  22. client *c;

  23. // 设置pause off 标记

  24. server.client_pause_type = CLIENT_PAUSE_OFF;

  25. /* Unblock all of the clients so they are reprocessed. */

  26. // 遍历paused 列表, unblock 客户端

  27. listRewind(server.paused_clients,&li);

  28. while ((ln = listNext(&li)) != NULL) {

  29. c = listNodeValue(ln);

  30. unblockClient(c);

  31. }

  32. }

调用链
  1. void clientCommand(client *c) // client 命令处理函数

  2. - if client unpause

  3. - void unpauseClients(void) // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)

  4. - 设置server.client_pause_type标记 CLIENT_PAUSE_OFF; // processCommand函数中使用

  5. - 遍历server.paused_clients列表, unblock 客户端

综上, client uspause 仅设置server.clientpausetype 为CLIENTPAUSEOFF, 并且unblockserver.paused_clients列表 客户端

超时解除阻塞
  1. // path:src/server.c

  2. // 定时器执行处理函数

  3. int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

  4. // ...

  5. /* Clear the paused clients state if needed. */

  6. // 检测当前是否处于pause(server.client_pause_type) 并且是否超时(server.client_pause_end_time)

  7. // 如果超时, 停止pause并且unblock 被pause的客户端

  8. //如果当前处于pause阶段,return ture, 否则false

  9. checkClientPauseTimeoutAndReturnIfPaused();

  10. // ...

  11. }

  12. // path:src/networking.c

  13. /* Returns true if clients are paused and false otherwise. */

  14. // 如果设置了pause, 返回true; 否则返回false

  15. int areClientsPaused(void) {

  16. return server.client_pause_type != CLIENT_PAUSE_OFF;

  17. }

  18. /* Checks if the current client pause has elapsed and unpause clients

  19. * if it has. Also returns true if clients are now paused and false

  20. * otherwise. */

  21. // 判断并处理 pause 是否到时间结束(超时)

  22. // 如果是, 停止pause并且unblock 被pause的客户端

  23. int checkClientPauseTimeoutAndReturnIfPaused(void) {

  24. // 判断server.client_pause_type 是否设置 pause 标记

  25. if (!areClientsPaused())

  26. // 没有paused, 直接返回false

  27. return 0;

  28. if (server.client_pause_end_time < server.mstime) {

  29. // paused 并且 已经超时, unpause client

  30. unpauseClients();

  31. }

  32. // 返回是否paused

  33. return areClientsPaused();

  34. }

  35. /* Unpause clients and queue them for reprocessing. */

  36. // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)

  37. void unpauseClients(void) {

  38. listNode *ln;

  39. listIter li;

  40. client *c;

  41. // 设置pause off 标记

  42. server.client_pause_type = CLIENT_PAUSE_OFF;

  43. /* Unblock all of the clients so they are reprocessed. */

  44. // 遍历paused 列表, unblock 客户端

  45. listRewind(server.paused_clients,&li);

  46. while ((ln = listNext(&li)) != NULL) {

  47. c = listNodeValue(ln);

  48. unblockClient(c);

  49. }

  50. }

调用链
  1. int serverCron(struct aeEventLoop *eventLoop, // 定时器执行处理函数

  2. - int checkClientPauseTimeoutAndReturnIfPaused(void) //判断并处理pause是否到时间结束(超时);如果是,停止pause并且unblock 被pause的客户端

  3. - void unpauseClients(void) // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)

  4. - 设置server.client_pause_type标记 CLIENT_PAUSE_OFF; // processCommand函数中使用

  5. - 遍历server.paused_clients列表, unblock 客户端

综上, pause超时实际上是在serverCron 调用checkClientPauseTimeoutAndReturnIfPaused 判断时间是否大于等于 endtime,如果是调用unpauseClients 设置server.clientpausetype标记 为 CLIENTPAUSEOFF并且 遍历server.pausedclients列表unblock 客户端


原文始发于微信公众号(吃瓜技术派):Redis源码剖析-阻塞命令的实现机制

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236033.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!