Redis中的阻塞性命令可以让客户端被阻塞等待特定事件发生,常用在需要等待可用数据、等待结果等场景。
阻塞命令概述
等待数据
当数据不存在时,阻塞执行client直到数据符合条件或者超时。包括以下命令:
-
BRPOP/BLPOP: 从列表左右两端阻塞弹出元素,直到有元素为止。
-
BRPOPLPUSH: 阻塞式列表元素移动。
-
BLMOVE: 阻塞式的列表元素移动。
-
XREAD: 阻塞读取Stream数据。
-
XREADGROUP: 阻塞读取Stream的消费者组数据。
-
BZPOPMIN/BZPOPMAX: 阻塞并返回有序集合弹出的最小/大元素。
实现原理
-
客户端向服务器发送一个阻塞命令,如BLPOP key 1。
-
服务器将这个命令和客户端信息加入到一个字典中,键为阻塞的键,值为一个链表结构,链表中的每个元素记录一个客户端的信息。
-
当字典中有多个客户端阻塞在同一个键上时,它们会被添加到同一个链表中。
-
同时服务器会将客户端添加到TimeoutTable, 以便在beforesleep 函数中定时检测客户端阻塞是否超时
-
目标键有新操作时,服务器会获取等待key对应的客户端列表, 并向客户端发送回复。
-
beforesleep 函数中定时检测到客户端阻塞超时,向客户端发送超时回复包
通过字典和链表结构,服务器可以高效管理多个客户端在同一个键上的阻塞,并在合适的时候解除阻塞
等待从节点数据同步
当还没有足够数量的从节点执行完client最新的写命令时,阻塞当前client直到有足够副本执行命令或者超时。
-
WAIT: 阻塞等待指定键的修改。
实现原理
-
客户端向服务器发送wait 命令。
-
服务器在处理wait命令时判断 是否已有存在足够数量的从节点已经执行完当前客户端最后一条写命令,如果足够直接返回。
-
如果数量不足够,将本客户端 添加到server.clientswaitingacks 列表中, 并且设置客户端block 标记,添加到TimeoutTable; 设置 server.getackfrom_slaves以便在beforesleep 中向各个从节点发送请求 获取执行副本复制log 的offset
-
在beforesleep 遍历server.clientswaitingacks 客户端列表; 对每个客户端判断是否已有存在足够数量的从节点已经执行完当前客户端最后一条写命令,如果是取消客户端阻塞。
-
beforesleep 函数中定时检测到客户端阻塞超时,向客户端发送超时回复包
暂停客户端执行
执行client PAUSE/ 或者failover 命令时, 会让服务器使所有客户端(不管是已经存在的还是后面新建立链接的)进入挂起状态, 直到超时。
-
client PAUSE milliseconds 让服务器使所有客户端(不管存在还是新建立链接)进入挂起状态,直到milliseconds 时间后
-
client UNPAUSE 会让服务器取消pause客户端并且唤醒所有被暂停的客户端。
-
failover option 哨兵提供了 failover 命令来手动触发主从切换。执行切换时先pause 执行写命令的客户端; 等待从节点执行完所有主节点写命令; 然后主把自己变为从节点, 原来的某个从节点切换成主; 最后unpause 执行写命令客户端
实现原理
-
客户端向服务器发送client PAUSE 命令, 服务器设置全局pause标记(server.clientpausetype)并且记录 取消阻塞的 时间(server.clientpauseend_time)。
-
服务器收到任意客户端命令,准备执行前判断是否需要阻塞客户端, 如果不需要阻塞即执行命令。
-
如果需要阻塞全部客户端, 或者阻塞写命令并且此客户端的命令时写命令 阻塞此客户端, 将客户端添加到server.paused_clients 列表中;
-
客户端向服务器发送client UNPAUSE 命令, 取消全局pause标记; 唤醒已经被阻塞的所有客户端。
-
beforesleep 函数中定时检测是否已经到了server.clientpauseend_time时间, 如果是取消全局pause标记 ; 唤醒已经被阻塞的所有客户端。
源码剖析
redis源码版本:6.2.5 本文从以下几个方面带着大家走读代码:
-
阻塞等待数据的BLPOP
-
等待从节点数据同步
-
暂停客户端执行(client PAUSE/UNPAUSE )
主从同步 相关变量
//path:src/server.h
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
// 客户端block 类型
#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */
// list block 命令 (数据类型block)
#define BLOCKED_LIST 1 /* BLPOP & co. */
// 从节点同步执行命令(wait命令)
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
// modlue相关的block
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
// stream 类型xread命令 (数据类型block)
#define BLOCKED_STREAM 4 /* XREAD. */
// zset bzpop 命令 (数据类型block)
#define BLOCKED_ZSET 5 /* BZPOP et al. */
// 暂停block, 用于主动故障转移或者屏蔽redis服务读写
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
#define BLOCKED_NUM 7 /* Number of blocked states. */
// 阻塞客户端flags 会打上标记
// 打上标记后processInputBuffer不解析客户端字节流处理命令
// unblock 时取消此标记
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
// unblocked 时打上此标记, 并且放置到server.unblocked_clients列表中
// 在beforesleep调用processUnblockedClients 解析客户端缓冲区字节流,并且处理命令
#define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in
/* This structure holds the blocking operation state for a client.
* The fields used depend on client->btype. */
// 用于client 记录阻塞(block)的具体信息
typedef struct blockingState {
/* Generic fields. */
// 通用字段, block超时时间
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */
// 用于 list,zset,stream(因等待数据block类型)
/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP or XREAD. Or NULL. */
// BLMOVE 的目标列表
robj *target; /* The key that should receive the element,
* for BLMOVE. */
// 用于记录从列表 开始 pop/push
// 用于 BLPOP, BRPOP and BLMOVE
struct listPos {
int wherefrom; /* Where to pop from */
int whereto; /* Where to push to */
} listpos; /* The positions in the src/dst lists
* where we want to pop/push an element
* for BLPOP, BRPOP and BLMOVE. */
/* BLOCK_STREAM */
size_t xread_count; /* XREAD COUNT option. */
robj *xread_group; /* XREADGROUP group name. */
robj *xread_consumer; /* XREADGROUP consumer name. */
int xread_group_noack;
/* BLOCKED_WAIT */
// wait命令使用(wait命令用于block到副本同步完成)
// 等待多少个副本同步完成
int numreplicas; /* Number of replicas we are waiting for ACK. */
// 同步 副本复杂log 的offset
long long reploffset; /* Replication offset to reach. */
/* BLOCKED_MODULE */
void *module_blocked_handle; /* RedisModuleBlockedClient structure.
which is opaque for the Redis core, only
handled in module.c. */
} blockingState;
// 每个客户端链接对应一个client数据结构
typedef struct client {
// ...
// 标记位
uint64_t flags; /* Client flags: CLIENT_* macros. */
// 用于client 记录阻塞(block)的具体信息
blockingState bpop; /* blocking state */
// ...
}
struct redisServer {
// ...
// 用于阻塞客户端超时处理
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
// ...
}
// path: src/networking.c
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
// 解析client buf协议包
// 根据是否在io线程上下文决定是否执行命令
// 如果在io上下文,解析到第一个完整协议包后保存return
// 否则一直解析协议处理命令,直到解析不出完整协议包
// 设置了 CLIENT_BLOCKED 后processInputBuffer不解析客户端字节流处理命令
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
// 设置了 CLIENT_BLOCKED 直接不解析客户端协议处理,直接返回
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
// CLIENT_PENDING_COMMAND 代表已经解析到可执行命令但是还不能真正执行(比如在io多线程读取上下文中)
// 因此直接退出
if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
// 从query_buf 根据redis协议解包
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
/* If the Gopher mode and we got zero or one argument, process
* the request in Gopher mode. To avoid data race, Redis won't
* support Gopher if enable io threads to read queries. */
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
// 解包成功进入这里
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
// CLIENT_PENDING_READ被设置,代表io线程进行读取和解析
// 目前处于解析中, 因此处在io多线程中,不能处理命令
// 打上CLIENT_PENDING_COMMAND(有命令待处理) 返回
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
// 走到这里说明可以直接处理命令
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
// path :src/blocked.c
/* This function is called in the beforeSleep() function of the event loop
* in order to process the pending input buffer of clients that were
* unblocked after a blocking operation. */
// 处理unblock 列表
// 在beforesleep调用
void processUnblockedClients(void) {
listNode *ln;
client *c;
// 遍历server.unblocked_clients 列表
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
// 去掉unblock标记
c->flags &= ~CLIENT_UNBLOCKED;
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
// 执行pending 的命令
// 之前解析出来当时没有执行的命令
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
/* Then process client if it has more data in it's buffer. */
// 之前设置CLIENT_BLOCKED, tcp字节流被读取到缓存区中但是没有解析协议处理命令
// 解析缓冲区buf(读), 处理更多命令
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
}
}
}
阻塞等待数据的BLPOP
BLPOP命令处理
// path:src/t_list.c
/* BLPOP <key> [<key> ...] <timeout> */
// BLPOP 命令处理函数
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}
/* Blocking RPOP/LPOP */
// 处理 RPOP/LPOP命令
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
// 获取超时时间
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;
for (j = 1; j < c->argc-1; j++) {
// 循环遍历key,获取val
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (checkType(c,o,OBJ_LIST)) {
// 判断不是LIST 类型,返回(失败了)
return;
} else {
// LIST类型
if (listTypeLength(o) != 0) {
// 判断LIST有元素,不用阻塞,直接取元素返回
/* Non empty list, this is like a normal [LR]POP. */
robj *value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
listElementsRemoved(c,c->argv[j],where,o,1);
/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
// 客户端标识为不可阻塞,直接当成超时返回
if (c->flags & CLIENT_DENY_BLOCKING) {
addReplyNullArray(c);
return;
}
/* If the keys do not exist we must block */
struct listPos pos = {where};
// block client(等待某些key)
// 用于 BLOCKED_LIST, BLOCKED_ZSET or BLOCKED_STREAM
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);
}
// path:src/blocked.c
// block client(等待某些key)
// 用于 BLOCKED_LIST, BLOCKED_ZSET or BLOCKED_STREAM
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
dictEntry *de;
list *l;
int j;
// 记录block的timeout等信息
c->bpop.timeout = timeout;
// target用于记录BLMOVE的目标list
c->bpop.target = target;
if (listpos != NULL) c->bpop.listpos = *listpos;
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
/* Allocate our bkinfo structure, associated to each key the client
* is blocked for. */
bkinfo *bki = zmalloc(sizeof(*bki));
if (btype == BLOCKED_STREAM)
bki->stream_id = ids[j];
/* If the key already exists in the dictionary ignore it. */
// client 记录需要等待 key 而block
if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
zfree(bki);
continue;
}
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
// 在全局c->db->blocking_keys中记录那些客户端因为那个key block
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
// 将client 添加到key 对应的list中
listAddNodeTail(l,c);
bki->listnode = listLast(l);
}
// 设置CLIENT_BLOCKE标记 并且添加到TimeoutTable中
blockClient(c,btype);
}
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
* flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */
// 设置CLIENT_BLOCKE标记 并且添加到TimeoutTable中
void blockClient(client *c, int btype) {
// 设置CLIENT_BLOCKE标记
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.blocked_clients++;
server.blocked_clients_by_type[btype]++;
// 添加到TimeoutTable中
addClientToTimeoutTable(c);
if (btype == BLOCKED_PAUSE) {
// 如果是pause, 还添加到server.paused_client中
// 并且添加CLIENT_PENDING_COMMAN标记
listAddNodeTail(server.paused_clients, c);
c->paused_list_node = listLast(server.paused_clients);
/* Mark this client to execute its command */
c->flags |= CLIENT_PENDING_COMMAND;
}
}
/* Add the specified client id / timeout as a key in the radix tree we use
* to handle blocked clients timeouts. The client is not added to the list
* if its timeout is zero (block forever). */
// 添加到TimeoutTable中
void addClientToTimeoutTable(client *c) {
if (c->bpop.timeout == 0) return;
uint64_t timeout = c->bpop.timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,c);
// 添加到TimeoutTable中
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
c->flags |= CLIENT_IN_TO_TABLE;
}
调用链
void blpopCommand(client *c) // BLPOP命令处理函数
- void blockingPopGenericCommand(client *c, int where) // 真正处理 BRPOP/BLPOP命令 逻辑
- 遍历命令中的key列表
- key对应的value 不是LIST类型,报错返回
- key对应的LIST有元素,取数据返回
- 客户端标识为不可阻塞,直接当成超时返回
- void blockForKeys(client *c, ...) // block client(等待某些key)
- - 将block的具体全局信息保存在 c->bpop
- 遍历命令中的key列表
- 在c->bpop.keys保存client 等待那些key数据
- 在db的全局dict,即c->db->blocking_keys中对应key的列表中保存当前client,代表client等待这个key的数据
- void blockClient(client *c, int btype) // 设置CLIENT_BLOCKE标记 并且添加到TimeoutTable中
- c->flags |= CLIENT_BLOCKED; // 设置CLIENT_BLOCKE标记
- void addClientToTimeoutTable(client *c) // 添加到TimeoutTable中
- raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
综上可知:
-
当LIST 有数据时直接返回数据
-
LIST不存在(没数据时),设置client block标记,并且将client等待key的信息 保存在 c->bpop.keys 和 c->db->blocking_keys中
-
为了超时解除客户端阻塞, 将client 保存在server.clientstimeouttable 的数据结构中
LIST添加数据解除阻塞
// path:src/t_list.c
/* LPUSH <key> <element> [<element> ...] */
// LPUSH 命令处理函数
void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD,0);
}
/*-----------------------------------------------------------------------------
* List Commands
*----------------------------------------------------------------------------*/
/* Implements LPUSH/RPUSH/LPUSHX/RPUSHX.
* 'xx': push if key exists. */
// 处理list push 元素
void pushGenericCommand(client *c, int where, int xx) {
int j;
// 找到对应的value
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
// 检查是否是LIST类型
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
// 不存在
if (xx) {
addReply(c, shared.czero);
return;
}
// 创建LIST OBJECT
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
// 添加key value
// 当LIST 不存在时才会block客户端
// 存在表示至少有一个元素,不会block客户端
// 所以在dbAdd 调用signalKeyAsReady 即可保证LIST 添加元素时及时唤醒客户端
dbAdd(c->db,c->argv[1],lobj);
}
// 添加元素到LIST 对象中
for (j = 2; j < c->argc; j++) {
listTypePush(lobj,c->argv[j],where);
server.dirty++;
}
// 添加回复包给客户端
addReplyLongLong(c, listTypeLength(lobj));
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
// path:src/db.c
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
*
* The program is aborted if the key already exists. */
// 添加key value
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
// 添加kv 后, 调用signalKeyAsReady将key 加入到server.ready_keys
// 后续在beforeSleep 处理server.ready_keys unblock相关客户端
// 当LIST 或者ZSET 不存在时才会block客户端
// 存在表示至少有一个元素,不会block客户端
// 所以在dbAdd 调用signalKeyAsReady 即可保证LIST 或者ZSET 添加元素时及时唤醒客户端
signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
}
// path:src/blocked.c
/* If the specified key has clients blocked waiting for list pushes, this
* function will put the key reference into the server.ready_keys list.
* Note that db->ready_keys is a hash table that allows us to avoid putting
* the same key again and again in the list in case of multiple pushes
* made by a script or in the context of MULTI/EXEC.
*
* The list will be finally processed by handleClientsBlockedOnKeys() */
void signalKeyAsReady(redisDb *db, robj *key, int type) {
readyList *rl;
/* Quick returns. */
int btype = getBlockedTypeByType(type);
// OBJ_LIST, OBJ_ZSET, OBJ_STREAM, OBJ_MODULE 才可能block
if (btype == BLOCKED_NONE) {
// 如果key对应的value 不会引起任何block,直接返回
/* The type can never block. */
return;
}
if (!server.blocked_clients_by_type[btype] &&
!server.blocked_clients_by_type[BLOCKED_MODULE]) {
// 不存在被block住的客户端,直接返回
/* No clients block on this type. Note: Blocked modules are represented
* by BLOCKED_MODULE, even if the intention is to wake up by normal
* types (list, zset, stream), so we need to check that there are no
* blocked modules before we do a quick return here. */
return;
}
/* No clients blocking for this key? No need to queue it. */
// 没有client因为此key block住,直接返回
if (dictFind(db->blocking_keys,key) == NULL) return;
/* Key was already signaled? No need to queue it again. */
// 已经在db->ready_keys 中,直接返回
if (dictFind(db->ready_keys,key) != NULL) return;
/* Ok, we need to queue this key into server.ready_keys. */
// 添加到 server.ready_keys中, beforeSleep时统一处理
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
// 添加到 server.ready_keys中, beforeSleep时统一处理
listAddNodeTail(server.ready_keys,rl);
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check. */
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
// beforesleep 中调用
// 遍历server.ready_keys 处理因为 key(也即数据)block的客户端
void handleClientsBlockedOnKeys(void) {
// server.ready_keys保存所有此次epoll_wait之后执行命令后有变动的key
// server.ready_keys链表元素大于0
while(listLength(server.ready_keys) != 0) {
list *l;
/* Point server.ready_keys to a fresh list and save the current one
* locally. This way as we run the old list we are free to call
* signalKeyAsReady() that may push new elements in server.ready_keys
* when handling clients blocked into BLMOVE. */
l = server.ready_keys;
server.ready_keys = listCreate();
// 遍历server.ready_keys链表
while(listLength(l) != 0) {
listNode *ln = listFirst(l);
readyList *rl = ln->value;
/* First of all remove this key from db->ready_keys so that
* we can safely call signalKeyAsReady() against this key. */
// 将当前处理的key 从db->ready_keys 删除
dictDelete(rl->db->ready_keys,rl->key);
/* Even if we are not inside call(), increment the call depth
* in order to make sure that keys are expired against a fixed
* reference time, and not against the wallclock time. This
* way we can lookup an object multiple times (BLMOVE does
* that) without the risk of it being freed in the second
* lookup, invalidating the first one.
* See https://github.com/redis/redis/pull/6554. */
server.fixed_time_expire++;
updateCachedTime(0);
/* Serve clients blocked on the key. */
// 获取key对应的val
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL) {
// 根据val 类型做具体处理
if (o->type == OBJ_LIST)
// LIST
serveClientsBlockedOnListKey(o,rl);
else if (o->type == OBJ_ZSET)
// ZSET
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
// OBJ_STREAM
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
}
server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked on a list key, and there may be new
* data to fetch (the key is ready). */
// 用于处理 LIST添加了新元素时, 因为LIST阻塞的 client
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
// 获取key 对应的 block 客户端列表
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
// 遍历block client 列表
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
// 不是 BLOCKED_LIST) 跳过
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listRotateHeadToTail(clients);
continue;
}
robj *dstkey = receiver->bpop.target;
int wherefrom = receiver->bpop.listpos.wherefrom;
int whereto = receiver->bpop.listpos.whereto;
// 从列表获取对应元素
robj *value = listTypePop(o, wherefrom);
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
monotime replyTimer;
elapsedStart(&replyTimer);
// 处理具体LIST block 命令
// BLMOVE BLPOP BRPOP
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
wherefrom, whereto) == C_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
// 失败,value放回列表
listTypePush(o,value,wherefrom);
}
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
// unblock 客户端
unblockClient(receiver);
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
// LIST 没有元素了,break
break;
}
}
}
// LIST 为空,直接删除KV
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
}
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
// 当LIST 有变动(新增元素)时 处理 BLPOP 或者 BRPOR 或者 BLMOVE命令
// 向某个被block的客户端 重新处理命令,发送数据回复包
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto)
{
robj *argv[5];
if (dstkey == NULL) {
// dstkey为空,即为BLPOP 或者 BRPOR命令
/* Propagate the [LR]POP operation. */
argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop :
shared.rpop;
argv[1] = key;
propagate((wherefrom == LIST_HEAD) ?
server.lpopCommand : server.rpopCommand,
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
/* BRPOP/BLPOP */
// POP出来的元素值发送给客户端
addReplyArrayLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,value);
/* Notify event. */
char *event = (wherefrom == LIST_HEAD) ? "lpop" : "rpop";
notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
} else {
/* BLMOVE */
// 将POP出来的元素压到另外一个列表
// 先获取dstkey 对应LIST Value
robj *dstobj =
lookupKeyWrite(receiver->db,dstkey);
if (!(dstobj &&
checkType(receiver,dstobj,OBJ_LIST)))
{
// 存在 并且是 LIST 类型
lmoveHandlePush(receiver,dstkey,dstobj,value,whereto);
/* Propagate the LMOVE/RPOPLPUSH operation. */
int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand);
argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove;
argv[1] = key;
argv[2] = dstkey;
argv[3] = getStringObjectFromListPosition(wherefrom);
argv[4] = getStringObjectFromListPosition(whereto);
propagate(isbrpoplpush ? server.rpoplpushCommand : server.lmoveCommand,
db->id,argv,(isbrpoplpush ? 3 : 5),
PROPAGATE_AOF|
PROPAGATE_REPL);
/* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",
key,receiver->db->id);
} else {
// 不是LIST 类型, 只能报错
/* BLMOVE failed because of wrong
* destination type. */
return C_ERR;
}
}
return C_OK;
}
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
// 取消客户端block
// 根据客户端不同的block类型(原因)调用不同函数进行处理
void unblockClient(client *c) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
// 等待数据类型的 block
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
// 等待副本同步的block
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
// 等待module状态相关的block
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else if (c->btype == BLOCKED_PAUSE) {
// 暂停类型的block
listDelNode(server.paused_clients,c->paused_list_node);
c->paused_list_node = NULL;
} else {
serverPanic("Unknown btype in unblockClient().");
}
/* Reset the client for a new query since, for blocking commands
* we do not do it immediately after the command returns (when the
* client got blocked) in order to be still able to access the argument
* vector from module callbacks and updateStatsOnUnblock. */
if (c->btype != BLOCKED_PAUSE) {
// 命令类型block(即非pause类型)
// 此处需要释放或者reset相关变量
freeClientOriginalArgv(c);
resetClient(c);
}
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
server.blocked_clients--;
server.blocked_clients_by_type[c->btype]--;
// 去掉blocked 标记
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
// 从block超时相关的dict中删除客户端
removeClientFromTimeoutTable(c);
// 对client添加unblock 标记,并且添加到erver.unblocked_clients
// 之后在beforesleep 进行处理(unblock时或者之后的命令处理)
queueClientForReprocessing(c);
}
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
* You should never call this function directly, but unblockClient() instead. */
// 通过调用unblockClient() 来unblock
// 遍历c->bpop.keys, 从c->db->blocking_keys key对应列表中删除对应client
// 删除c->bpop.keys列表
void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator *di;
list *l;
serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
di = dictGetIterator(c->bpop.keys);
/* The client may wait for multiple keys, so unblock it for every key. */
// 一个客户端可能等待多个key 就绪, 因此遍历所有等待的key进行unblock
// 遍历 c->bpop.keys
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
bkinfo *bki = dictGetVal(de);
/* Remove this client from the list of clients waiting for this key. */
// 从c->db->blocking_keys 找到key对应列表
l = dictFetchValue(c->db->blocking_keys,key);
serverAssertWithInfo(c,key,l != NULL);
// 从列表中删除本client的 node
listDelNode(l,bki->listnode);
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
// 如果列表为空,从c->db->blocking_keys删除对应key val
dictDelete(c->db->blocking_keys,key);
}
dictReleaseIterator(di);
/* Cleanup the client structure */
// 清理client结构中有关block相关的变量
dictEmpty(c->bpop.keys,NULL);
if (c->bpop.target) {
decrRefCount(c->bpop.target);
c->bpop.target = NULL;
}
if (c->bpop.xread_group) {
decrRefCount(c->bpop.xread_group);
decrRefCount(c->bpop.xread_consumer);
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
}
}
// 对client添加unblock 标记,并且添加到erver.unblocked_clients
// 在beforesleep 进行1处理(unblock时或者之后的命令处理)
void queueClientForReprocessing(client *c) {
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,c);
}
}
// path:src/server.c
// 处理完所有客户端事件(命令)
// 进入epoll_wait之前
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
/* Try to process blocked clients every once in while. Example: A module
* calls RM_SignalKeyAsReady from within a timer callback (So we don't
* visit processCommand() at all). */
// 处理被某些key block的客户端列表
handleClientsBlockedOnKeys();
// ...
}
// path:src/timeout.c
/* Remove the client from the table when it is unblocked for reasons
* different than timing out. */
// 将client 从TimeoutTable中删除
void removeClientFromTimeoutTable(client *c) {
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
c->flags &= ~CLIENT_IN_TO_TABLE;
uint64_t timeout = c->bpop.timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,c);
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
}
调用链
void lpushCommand(client *c) // LPUSH 命令处理函数
- void pushGenericCommand(client *c, int where, int xx) // 处理list push 元素
- void dbAdd(redisDb *db, robj *key, robj *val) //当LIST 不存在, 添加key value, 不存在时才会block客户端,
- void signalKeyAsReady(redisDb *db, robj *key, int type) // 调用signalKeyAsReady 即可保证LIST 添加元素时及时唤醒客户端
- 将LIST的key添加到server.ready_keys中, beforeSleep时统一处理
void beforeSleep(struct aeEventLoop *eventLoop) // 处理完所有客户端事件(命令), 进入epoll_wait之前
- void handleClientsBlockedOnKeys(void) // // 遍历server.ready_keys 处理因为 key(也即数据)block的客户端
- while(listLength(server.ready_keys) != 0) // 遍历, server.ready_keys保存所有此次epoll_wait之后执行命令后有变动的key
- void serveClientsBlockedOnListKey(robj *o, readyList *rl) // 用于处理 LIST添加了新元素时, 因为LIST阻塞的 client
- 遍历 key 对应的 block 客户端列表(rl->db->blocking_keys)
- 遍历客户端列表
- 从LIST 取出一个元素
- 能够取到值
- int serveClientBlockedOnList(client *receiver...) // 当LIST 有变动(新增元素)时 处理 BLPOP 或者 BRPOR 或者 BLMOVE命令, 向某个被block的客户端 重新处理命令,发送数据回复包
- void unblockClient(client *c) // 客户端等到数据并且处理了,unblock客户端
- void unblockClientWaitingData(client *c) // 遍历c->bpop.keys, 从c->db->blocking_keys key对应列表中删除对应client, 删除c->bpop.keys列表
- c->flags &= ~CLIENT_BLOCKED; // 去掉blocked 标记
- void removeClientFromTimeoutTable(client *c) // 从block超时相关的dict中删除客户端
- void queueClientForReprocessing(client *c) // 对client添加unblock 标记,并且添加到server.unblocked_clients,在beforesleep 时 进一步处理
- LIST为空
- break // 遍历客户端列表
综上:
-
当向LIST添加数据时,当LIST从空列表(不存在)变为有元素时, 将key记录到server.ready_keys
-
在beforesleep时调用handleClientsBlockedOnKeys 对server.ready_keys进行处理, 取出对应数据发送给等待它的客户端,并且 unblock这些客户端
-
unblock的客户端被添加到server.unblocked_clients中,beforesleep时调用processUnblockedClients 进一步解析client 缓存内容,执行命令
超时处理
// path:src/server.c
// 处理完所有客户端事件(命令)
// 进入epoll_wait之前
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
// 检测处理block client timeout
handleBlockedClientsTimeout();
// ...
}
// path:src/timeout.c
/* This function is called in beforeSleep() in order to unblock clients
* that are waiting in blocking operations with a timeout set. */
// 检测处理block client timeout
// 在beforesleep 调用
void handleBlockedClientsTimeout(void) {
if (raxSize(server.clients_timeout_table) == 0) return;
uint64_t now = mstime();
raxIterator ri;
raxStart(&ri,server.clients_timeout_table);
raxSeek(&ri,"^",NULL,0);
// 遍历TimeoutTable
while(raxNext(&ri)) {
uint64_t timeout;
client *c;
// 解码出 timeout 时间 和 client指针
decodeTimeoutKey(ri.key,&timeout,&c);
// 已经不超时,break
if (timeout >= now) break; /* All the timeouts are in the future. */
c->flags &= ~CLIENT_IN_TO_TABLE;
// 处理block timeout 客户端
// 回复超时,并且unblock客户端
checkBlockedClientTimeout(c,now);
// 从TimeoutTable 删除已经超时客户端
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
raxSeek(&ri,"^",NULL,0);
}
raxStop(&ri);
}
/* Check if this blocked client timedout (does nothing if the client is
* not blocked right now). If so send a reply, unblock it, and return 1.
* Otherwise 0 is returned and no operation is performed. */
// 处理block timeout 客户端
// 回复超时,并且unblock客户端
int checkBlockedClientTimeout(client *c, mstime_t now) {
if (c->flags & CLIENT_BLOCKED &&
c->bpop.timeout != 0
&& c->bpop.timeout < now)
{
// block 并且超时
/* Handle blocking operation specific timeout. */
// 给客户端发送超时回复包
replyToBlockedClientTimedOut(c);
// unblock客户端
unblockClient(c);
return 1;
} else {
return 0;
}
}
// path: src/blocked.c
/* This function gets called when a blocked client timed out in order to
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
addReplyNullArray(c);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
}
调用链
void beforeSleep(struct aeEventLoop *eventLoop) // 处理完所有客户端事件(命令), 进入epoll_wait之前
- void handleBlockedClientsTimeout(void) // 检测处理block client timeout
- int checkBlockedClientTimeout(client *c, mstime_t now) //处理block timeout 客户端, 回复超时,并且unblock客户端
- void replyToBlockedClientTimedOut(client *c) // 给block timeout 客户端发送超时回复包
- void unblockClient(client *c) // 取消客户端block, 根据客户端不同的block类型(原因)调用不同函数进行处理
等待从节点数据同步
//path: src/replication.c
/* WAIT for N replicas to acknowledge the processing of our latest
* write command (and all the previous commands). */
// wait 命令除了函数
// wait 命令: block等待n个从节点执行完本客户端最后的写命令
void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
// 当前客户端写命令产生的最新副本复制日志 offset
long long offset = c->woff;
if (server.masterhost) {
// 如果是从节点,不支持wait命令
addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
return;
}
/* Argument parsing. */
// 解析参数, 获取需要等待副本节点同步完成的数量
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
return;
// 等待(block超时)时间
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
!= C_OK) return;
/* First try without blocking at all. */
// 判断 已经完成 执行完 客户端要求的 副本复制log offset的从节点数量
ackreplicas = replicationCountAcksByOffset(c->woff);
if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
// 如果数量符合要求,直接返回
addReplyLongLong(c,ackreplicas);
return;
}
/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
// 设置client 中bpop相关遍历
c->bpop.timeout = timeout; // 超时时间
c->bpop.reploffset = offset; // 副本复制日志log offset
c->bpop.numreplicas = numreplicas; // 要求从节点数量
listAddNodeHead(server.clients_waiting_acks,c); // 将客户端添加到全局变量server.clients_waiting_acks 列表
// 调用block 函数
blockClient(c,BLOCKED_WAIT);
/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
// 设置server.get_ack_from_slaves = 1;
// 即在beforesleep(进入epoll wait之前)会向从节点发送命令,要求他们回复当前执行完成的副本复制日志 offset
replicationRequestAckFromSlaves();
}
/* This just set a flag so that we broadcast a REPLCONF GETACK command
* to all the slaves in the beforeSleep() function. Note that this way
* we "group" all the clients that want to wait for synchronous replication
* in a given event loop iteration, and send a single GETACK for them all. */
// 设置全局变量server.get_ack_from_slaves, 在beforesleep进行处理
void replicationRequestAckFromSlaves(void) {
server.get_ack_from_slaves = 1;
}
// path:src/server.c
// 处理完所有客户端事件(命令)
// 进入epoll_wait之前
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
// 处理副本同步进度而block的客户端
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
// ...
/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. Note that we do this after
* processUnblockedClients(), so if there are multiple pipelined WAITs
* and the just unblocked WAIT gets blocked again, we don't have to wait
* a server cron cycle in absence of other event loop events. See #6623.
*
* We also don't send the ACKs while clients are paused, since it can
* increment the replication backlog, they'll be sent after the pause
* if we are still the master. */
// get_ack_from_slaves 为1代表有客户端使用wait命令 需要获取副本ack
// checkClientPauseTimeoutAndReturnIfPaused(),如果当前处于pause阶段,return ture, 否则false
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
robj *argv[3];
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
// 发生获取当前从节点 最新执行命令(即副本复制log offset)命令
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
server.get_ack_from_slaves = 0;
}
// ...
}
//path: src/replication.c
/* Check if there are clients blocked in WAIT that can be unblocked since
* we received enough ACKs from slaves. */
// 处理同步副本而block的客户端(beforeSleep调用)
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
int last_numreplicas = 0;
listIter li;
listNode *ln;
listRewind(server.clients_waiting_acks,&li);
// 遍历block 客户端
while((ln = listNext(&li))) {
client *c = ln->value;
/* Every time we find a client that is satisfied for a given
* offset and number of replicas, we remember it so the next client
* may be unblocked without calling replicationCountAcksByOffset()
* if the requested offset / replicas were equal or less. */
// 判断从节点offset>c->bpop.reploffset的数量是否>=c->bpop.numreplicas
if (last_offset && last_offset >= c->bpop.reploffset &&
last_numreplicas >= c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
}
}
}
}
// 主从节点交互命令 replconf处理函数
void replconfCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
// 处理从节点getack 回复包
// 保存从节点当前最新执行完的 副本复制log offset
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
* internal only command that normal clients should never use. */
long long offset;
if (!(c->flags & CLIENT_SLAVE)) return;
if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
return;
if (offset > c->repl_ack_off)
// 保存从节点当前最新执行完的 副本复制log offset
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
* confirms slave is online and ready to get more data). This
* allows for simpler and less CPU intensive EOF detection
* when streaming RDB files.
* There's a chance the ACK got to us before we detected that the
* bgsave is done (since that depends on cron ticks), so run a
* quick check first (instead of waiting for the next ACK. */
if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
checkChildrenDone();
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
/* REPLCONF GETACK is used in order to request an ACK ASAP
* to the slave. */
// 处理master 节点的replconf getack 命令
// 发生当前执行完命令的副本复制log offset
if (server.masterhost && server.master) replicationSendAck();
return;
} else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) {
// ...
}
// ...
}
调用链
void waitCommand(client *c) //wait 命令: block等待n个从节点执行完本客户端最后的写命令
- if 已经有足够数量的从节点同步完数据(执行完client最后的写命令)
- 返回成功
- else
- 将client 添加到全局变量server.clients_waiting_acks 列表
- blockClient(c,BLOCKED_WAIT); // 调用block 函数
- void replicationRequestAckFromSlaves(void) // 设置全局变量server.get_ack_from_slaves, 在beforesleep进行处理
void beforeSleep(struct aeEventLoop *eventLoop) // 处理完所有客户端事件(命令), 进入epoll_wait之前
- if (listLength(server.clients_waiting_acks)) // 处理副本同步进度而block的客户端
- void processClientsWaitingReplicas(void) //处理同步副本而block的客户端, 判断有足够从节点同步完数据时unblock相关客户端
- void unblockClient(client *c) // 取消客户端block, 根据客户端不同的block类型(原因)调用不同函数进行处理
- if server.get_ack_from_slaves == 1
- 给所有从节点发送replconf getack 命令
// 主从节点交互命令 replconf处理函数
void replconfCommand(client *c) // 主从节点交互命令 replconf处理函数
- if 是replconf ack 回复包 // 本节点是主节点
- // 保存从节点当前最新执行完的 副本复制log offset
- elseif replconf getack // 本节点是从节点
- 发生当前执行完命令的副本复制log offset
综上所述:
-
wait命令处理函数检查是否已经有足够数量从节点同步,如果是直接返回不阻塞客户端
-
如果不满足, 将客户端添加到server.clientswaitingacks 列表, 设置server.getackfrom_slaves为1,并且阻塞客户端
-
beforesleep 定时检测server.getackfrom_slaves为1, 发送replconf getack 命令给从节点要求获取其最新执行的命令(副本复制log offset)
-
beforesleep 定时检测server.clientswaitingacks 列表中客户端是否已经有足够数量的从节点满足条件,是的话unblock客户端
暂停客户端执行(client PAUSE/UNPAUSE )
client PAUSE命令处理
// path: src/networking.c
// client 命令处理函数
void clientCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
// unpuase 命令
/* CLIENT UNPAUSE */
// 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)
unpauseClients();
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
c->argc == 4))
{
/* CLIENT PAUSE TIMEOUT [WRITE|ALL] */
// pause 命令
mstime_t end;
int type = CLIENT_PAUSE_ALL;
if (c->argc == 4) {
// 判断是unpause所有客户端还是只unpause 执行写命令的客户端
if (!strcasecmp(c->argv[3]->ptr,"write")) {
type = CLIENT_PAUSE_WRITE;
} else if (!strcasecmp(c->argv[3]->ptr,"all")) {
type = CLIENT_PAUSE_ALL;
} else {
addReplyError(c,
"CLIENT PAUSE mode must be WRITE or ALL");
return;
}
}
// 获取取消pause(也即unpause) 时间
if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
UNIT_MILLISECONDS) != C_OK) return;
// 启动 pause(暂停) 客户端
// 此函数只是设置server.client_pause_type标记
// 真正block客户端是在processCommand函数中
pauseClients(end, type);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
// ...
}
// ...
}
/* Pause clients up to the specified unixtime (in ms) for a given type of
* commands.
*
* A main use case of this function is to allow pausing replication traffic
* so that a failover without data loss to occur. Replicas will continue to receive
* traffic to faciliate this functionality.
*
* This function is also internally used by Redis Cluster for the manual
* failover procedure implemented by CLUSTER FAILOVER.
*
* The function always succeed, even if there is already a pause in progress.
* In such a case, the duration is set to the maximum and new end time and the
* type is set to the more restrictive type of pause. */
// 启动 pause(暂停) 客户端
// 此函数只是设置server.client_pause_type标记
// 真正block客户端是在processCommand函数中
void pauseClients(mstime_t end, pause_type type) {
// 设置pause_type
if (type > server.client_pause_type) {
server.client_pause_type = type;
}
// 设置pause 的超时时间
if (end > server.client_pause_end_time) {
server.client_pause_end_time = end;
}
/* We allow write commands that were queued
* up before and after to execute. We need
* to track this state so that we don't assert
* in propagate(). */
if (server.in_exec) {
server.client_pause_in_transaction = 1;
}
}
调用链
void clientCommand(client *c) // client 命令处理函数
- if client pause
- void pauseClients(mstime_t end, pause_type type) // 启动 pause(暂停) 客户端,
- 设置server.client_pause_type标记; // 真正block客户端是在processCommand函数中
- 设置server.client_pause_end_time, 即end_time时结束pause// end_time在beforesleep判断超时处理,为0即beforesleep 不处理(不超时)
综上, client pause 仅设置server.clientpausetype标记
执行命令的client 被阻塞
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
// 处理客户端命令我
int processCommand(client *c) {
// ...
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// ...
/* If the server is paused, block the client until
* the pause has ended. Replicas are never paused. */
// 判断server.client_pause_type 设置了标记
if (!(c->flags & CLIENT_SLAVE) &&
((server.client_pause_type == CLIENT_PAUSE_ALL) ||
(server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
{
c->bpop.timeout = 0;
// 阻塞客户端,设置CLIENT_BLOCKE标记,添加到TimeoutTable中
// pause, 还添加到server.paused_client中
blockClient(c,BLOCKED_PAUSE);
return C_OK;
}
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
通过走读代码可知:
-
processCommand 在执行命令前判断server.clientpausetype 当前将执行命令,如果需要pause, 调用blockClient
-
blockClient 设置CLIENTBLOCKE标记,client 被添加到TimeoutTable中和 server.pausedclient 列表中
client UNPAUSE命令处理
// path: src/networking.c
// client 命令处理函数
void clientCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
// unpuase 命令
/* CLIENT UNPAUSE */
// 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)
unpauseClients();
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
c->argc == 4))
// ...
}
// ....
}
/* Unpause clients and queue them for reprocessing. */
// 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)
void unpauseClients(void) {
listNode *ln;
listIter li;
client *c;
// 设置pause off 标记
server.client_pause_type = CLIENT_PAUSE_OFF;
/* Unblock all of the clients so they are reprocessed. */
// 遍历paused 列表, unblock 客户端
listRewind(server.paused_clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
unblockClient(c);
}
}
调用链
void clientCommand(client *c) // client 命令处理函数
- if client unpause
- void unpauseClients(void) // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)
- 设置server.client_pause_type标记 为 CLIENT_PAUSE_OFF; // processCommand函数中使用
- 遍历server.paused_clients列表, unblock 客户端
综上, client uspause 仅设置server.clientpausetype 为CLIENTPAUSEOFF, 并且unblockserver.paused_clients列表 客户端
超时解除阻塞
// path:src/server.c
// 定时器执行处理函数
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
/* Clear the paused clients state if needed. */
// 检测当前是否处于pause(server.client_pause_type) 并且是否超时(server.client_pause_end_time)
// 如果超时, 停止pause并且unblock 被pause的客户端
//如果当前处于pause阶段,return ture, 否则false
checkClientPauseTimeoutAndReturnIfPaused();
// ...
}
// path:src/networking.c
/* Returns true if clients are paused and false otherwise. */
// 如果设置了pause, 返回true; 否则返回false
int areClientsPaused(void) {
return server.client_pause_type != CLIENT_PAUSE_OFF;
}
/* Checks if the current client pause has elapsed and unpause clients
* if it has. Also returns true if clients are now paused and false
* otherwise. */
// 判断并处理 pause 是否到时间结束(超时)
// 如果是, 停止pause并且unblock 被pause的客户端
int checkClientPauseTimeoutAndReturnIfPaused(void) {
// 判断server.client_pause_type 是否设置 pause 标记
if (!areClientsPaused())
// 没有paused, 直接返回false
return 0;
if (server.client_pause_end_time < server.mstime) {
// paused 并且 已经超时, unpause client
unpauseClients();
}
// 返回是否paused
return areClientsPaused();
}
/* Unpause clients and queue them for reprocessing. */
// 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)
void unpauseClients(void) {
listNode *ln;
listIter li;
client *c;
// 设置pause off 标记
server.client_pause_type = CLIENT_PAUSE_OFF;
/* Unblock all of the clients so they are reprocessed. */
// 遍历paused 列表, unblock 客户端
listRewind(server.paused_clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
unblockClient(c);
}
}
调用链
int serverCron(struct aeEventLoop *eventLoop, // 定时器执行处理函数
- int checkClientPauseTimeoutAndReturnIfPaused(void) //判断并处理pause是否到时间结束(超时);如果是,停止pause并且unblock 被pause的客户端
- void unpauseClients(void) // 停止pause客户端 并且 unblock 所有被paused 的客户端(server.paused_clients)
- 设置server.client_pause_type标记 为 CLIENT_PAUSE_OFF; // processCommand函数中使用
- 遍历server.paused_clients列表, unblock 客户端
综上, pause超时实际上是在serverCron 调用checkClientPauseTimeoutAndReturnIfPaused 判断时间是否大于等于 endtime,如果是调用unpauseClients 设置server.clientpausetype标记 为 CLIENTPAUSEOFF并且 遍历server.pausedclients列表unblock 客户端
原文始发于微信公众号(吃瓜技术派):Redis源码剖析-阻塞命令的实现机制
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/236033.html