学习目标
- 理清Zookeeper的Session创建、刷新和过期流程分析
- 明确Zookeeper的核心业务调用链
第1章 Session创建
上文给大家讲过Zookeeper的应用了,实际上Zookeeper分为两个模块,server端和client端,server端实现了所有的zookeeper业务逻辑,而client端就是封装了server端的一些方法调用。既然存在两个模块,那肯定涉及到了网络通信,ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现;使用ServerCnxn代表一个客户端与服务端的连接。从单机版启动中可以发现Zookeeper默认通信组件为NIOServerCnxnFactory。
1.1 客户端发送请求
上文中已经讲过,建立连接是通过new ZooKeeper方法完成的,在ZooKeeper的构造方法中会创建一个ClientCnxn对象,并调用该对象的start方法,在该方法中会启动两个线程任务:sendThread和eventThread。
而sendThread线程就是我们去建立连接的核心线程,在该线程的run方法中实际上是通过一个while循环,不断的执行,如果是第一次进来会去创建连接,如果连接状态是CONNECTED的话,则会最大不超过10秒去发送一次Ping请求保证连接不断开。
源码比较长,有些不重要的代码就直接省略了。
public void run() {
//发送Ping的间隔
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
try {
//如果状态是CONNECTING的话就去创建连接
if (!clientCnxnSocket.isConnected()) {
startConnect(serverAddress);
}
//如果已经连接成功,则最大不超过10秒发送一次心跳
if (state.isConnected()) {
//这段逻辑实际上就是控制心跳的是发送间隔,避免过多的发送
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
}
}
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
//会调用ZK的服务端完成会话创建
registerAndConnect(sock, addr);
} catch (IOException e) {
}
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
//调用NIO开启会话
boolean immediateConnect = sock.connect(addr);
}
1.2 服务端接收连接
服务端由NIOServerCnxnFactory启动线程去接收请求,NIOServerCnxnFactory启动时会启动四类线程:
- AcceptThread:该线程接收来自客户端的连接,并将其分配给SelectorThread(启动一个线程)。
- SelectorThread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个SelectorThread,使用系统属性zookeeper.nio.numSelectorThreads配置该类线程数,默认个数为 核心数/2。
- WorkerThread:该线程执行基本的套接字读写,使用系统属性zookeeper.nio.numWorkerThreads配置该类线程数,默认为核心数∗2核心数∗2.如果该类线程数为0,则另外启动一线程进行IO处理,见下文worker thread介绍。
- ConnectionExpirationThread:若连接上的session已过期,则关闭该连接。
1.2.1 AcceptThread
该线程会接收客户端的请求
public void run() {
while (!stopped && !acceptSocket.socket().isClosed()) {
select();
}
}
private void select() {
try {
//查找就绪的连接
selector.select();
Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
if (key.isAcceptable()) {
//1:和当前服务建立链接。
//2:获取远程客户端计算机地址信息。
//3:判断当前链接是否超出最大限制。
//4:调整为非阻塞模式。
//5:轮询获取一个SelectorThread,将当前链接分配给该SelectorThread。
//6:将当前请求添加到该SelectorThread的acceptedQueue中,并唤醒该SelectorThread。
if (!doAccept()) {
pauseAccept(10);
}
}
}
}
}
进入到doAccept方法中
private boolean doAccept() {
...
try {
//建立连接
sc = acceptSocket.accept();
accepted = true;
//获取远程计算机地址信息
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
//判断是否超出最大客户端连接的限制
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
...
}
LOG.debug("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
//调整此通道的阻塞模式
sc.configureBlocking(false);
//轮询将此连接分配给一个SelectorThread
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
//将新连接加入SelectorThread的acceptedQueue中,并唤醒SelectorThread
if (!selectorThread.addAcceptedConnection(sc)) {
...
}
acceptErrorLogger.flush();
} catch (IOException e) {
...
}
return accepted;
}
}
public boolean addAcceptedConnection(SocketChannel accepted) {
//将accepted添加到acceptedQueue
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
//唤醒SelectorThread
wakeupSelector();
return true;
}
在addAcceptedConnection方法中会唤醒SelectorThread,所以,接下来,逻辑会进入到SelectorThread.run方法中
1.2.2 SelectorThread
该线程的主要作用是从Socket读取数据,并封装成workRequest,并将workRequest交给workerPool工作线程池处理,同时将acceptedQueue中未处理的连接取出,并未每个连接绑定OP_READ读事件,并封装对应的上下文对象NIOServerCnxn。SelectorThread的run方法如下:
public void run() {
//读取就绪的IO事件,交由worker thread处理,在ZookeeperServer的processPacket()中处理数据
select();
//把acceptedQueue队列中接收的连接,取出来注册OP_READ事件,
//并添加NIOServerCnxn对象与当前key绑定
//相当于给每个连接添加附加对象NIOServerCnxn(上下文对象)
processAcceptedConnections();
//遍历所有updateQueue,更新updateQueue中连接的监听事件
processInterestOpsUpdateRequests();
}
先来看看processAcceptedConnections方法,该方法中会为每个连接创建一个NIOServerCnxn对象,同时也会调用服务续约的逻辑
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
key = accepted.register(selector, SelectionKey.OP_READ);
// 针对每个连接,创建一个NIOServerCnxn
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
addCnxn(cnxn);
}
}
这块不是很重要,我们不往深挖,接着回去看select方法
private void select() {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);
if (key.isReadable() || key.isWritable()) {
//核心逻辑
handleIO(key);
}
}
}
handleIO()方法会封装当前SelectorThread为IOWorkRequest,并将IOWorkRequest交给workerPool来调度,而workerPool调度才是读数据的开始,源码如下:
private void handleIO(SelectionKey key) {
//将SelectorThread封装成workRequest对象
IOWorkRequest workRequest = new IOWorkRequest(this, key);
//处理服务续约的方法
touchCnxn(cnxn);
//将封装好的workRequest交给线程池去处理,在这里读取客户端数据
workerPool.schedule(workRequest);
}
我们先来看看处理续约的方法,不只是在这里调用了NIOServerCnxnFactory.touchCnxn(NIOServerCnxn)方法。
public void touchCnxn(NIOServerCnxn cnxn) {
cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
}
进入到update方法中,会发现是ExpiryQueue中的一个方法,从名字上能看出来,ExpiryQueue实际上就是服务端管理session过期的队列
// 维护每个NIOServerCnxn对应的过期时间
private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
// 维护每个过期时间对应的桶里有哪些NIOServerCnxn
private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
private final AtomicLong nextExpirationTime = new AtomicLong();
public Long update(E elem, int timeout) {
Long prevExpiryTime = elemMap.get(elem);//获取当前NIOServerCnxn对应的过期时间
long now = Time.currentElapsedTime();
Long newExpiryTime = roundToNextInterval(now + timeout);//获取下次过期时间
if (newExpiryTime.equals(prevExpiryTime)) {
return null; // No change, so nothing to update
}
// First add the elem to the new expiry time bucket in expiryMap.
Set<E> set = expiryMap.get(newExpiryTime); //拿到下一个过期时间的桶
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread hasn't beaten us to it
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem); //把原来的NIOServerCnxn移动到新的桶里
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem); //清空之前过期的桶
}
}
return newExpiryTime;
}
ok,简单了解了过期时间的更新,我们在回到之前讲的通过工作线程池去处理workRequest对象读取客户端数据的流程
1.2.3 WorkerThread
WorkerThread相比上面的线程而言,调用关系颇为复杂,设计到了多个对象方法调用,主要用于处理IO,但并未对数据做出处理,数据处理将有业务链对象RequestProcessor处理,调用关系图如下:
public void schedule(WorkRequest workRequest) {
schedule(workRequest, 0);
}
public void schedule(WorkRequest workRequest, long id) {
ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
int size = workers.size();
int workerNum = ((int) (id % size) + size) % size;
ExecutorService worker = workers.get(workerNum);
worker.execute(scheduledWorkRequest);
}
WorkerService.ScheduledWorkRequest
private class ScheduledWorkRequest implements Runnable {
@Override
public void run() {
//IOWorkRequest.doWork
workRequest.doWork();
}
}
private class IOWorkRequest extends WorkerService.WorkRequest {
public void doWork() throws InterruptedException {
if (key.isReadable() || key.isWritable()) {
//执行IO数据处理
cnxn.doIO(key);
//再次见到这个方法,做服务续约的
touchCnxn(cnxn);
}
}
}
后面的一些细节我们就不展开了,通过doIO方法最终会调用到readPayload。
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
packetReceived();
incomingBuffer.flip();
//第一次未初始化时,读取连接请求
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。
private void readConnectRequest() throws IOException, InterruptedException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
zkServer.processConnectRequest(this, incomingBuffer);
//下次进来就不会再来创建了
initialized = true;
}
上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();//创建连接请求
connReq.deserialize(bia, "connect"); //反序列化连接请求参数
long sessionId = connReq.getSessionId(); //创建一个sessionId
int sessionTimeout = connReq.getTimeOut();
byte[] passwd = connReq.getPasswd();
cnxn.setSessionTimeout(sessionTimeout);
if (sessionId == 0) {
long id = createSession(cnxn, passwd, sessionTimeout); //创建session
}
}
创建会话调用createSession()
,该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()
源码如下:
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
if (passwd == null) {
// Possible since it's just deserialized from a packet on the wire.
passwd = new byte[0];
}
//sessionTracker去创建一个sessionId
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
//创建一个OpCode.createSession请求(根据SessionId提交一个创建会话的业务)
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
setLocalSessionFlag(si);
//提交业务
submitRequest(si);
return sessionId;
}
上面方法用到的sessionTracker.createSession(timeout)
做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:
public long createSession(int sessionTimeout) {
//获取下一个SessionId
long sessionId = nextSessionId.getAndIncrement();
//Session跟踪配置
addSession(sessionId, sessionTimeout);
return sessionId;
}
会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,addSession方法实现了Session创建、Session队列存储、Session
过期队列存储,trackSession
方法源码如下:
public synchronized boolean addSession(long id, int sessionTimeout) {
sessionsWithTimeout.put(id, sessionTimeout);
boolean added = false;
//获取一个Session,如果为空,则以SessionId创建一个Session
SessionImpl session = sessionsById.get(id);
if (session == null){
session = new SessionImpl(id, sessionTimeout);
}
// findbugs2.0.3 complains about get after put.
// long term strategy would be use computeIfAbsent after JDK 1.8
//Session存入到sessionById中,可以根据ID获取到Session
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null) {
session = existedSession;
} else {
added = true;
LOG.debug("Adding session 0x" + Long.toHexString(id));
}
if (LOG.isTraceEnabled()) {
String actionStr = added ? "Adding" : "Existing";
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- " + actionStr + " session 0x"
+ Long.toHexString(id) + " " + sessionTimeout);
}
//将Session添加到失效队列中
updateSessionExpiry(session, sessionTimeout);
return added;
}
第2章 Session刷新
也可以叫服务续约,客户端除了PING请求以外,其他正常的CRUD请求也会对session续约,这里以PING请求为例
ClientCnxn.SendThread
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
while (state.isAlive()) {
//如果连接建立,每隔段时间发送PING请求
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
//发送PING请求
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
}
}
发送PING请求给服务端
private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
...
//这段逻辑实际上就是唤醒一个sendThread线程,其实再去调用一下sendThread.run方法,在这个方法里面会重新发请求到服务端
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
//重复执行sendThread.run
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
//发送请求
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
//发送PING
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
服务端会收到客户端的PING请求,同样也是AcceptedThread接收请求,然后执行的逻辑同Session的创建流程,最后进入到了SelectorThread.run——>select()——>handleIO(key)——>touchCnxn(cnxn)。
第3章 Session过期
通过Session创建的源码分析其实大家应该也能看出来,对于Session的过期属性的管理的是SessionTrackerImpl这个类,而它也是一个线程类,继承了 ZooKeeperCriticalThread ,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭。
public void run() {
try {
while (running) {
//获取下一个失效时间
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
//休眠
Thread.sleep(waitTime);
continue;
}
//获取失效的客户端会话集合
for (SessionImpl s : sessionExpiryQueue.poll()) {
//把Session会话的 isClosing 状态设置为了true
setSessionClosing(s.sessionId);
//让客户端会话失效
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
让客户端失效的方法 expirer.expire(s); 其实也是一个业务操作,主要调用了ZooKeeperServer.expire() 方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession 的请求,并交给业务链处理,我们查看 ZooKeeperServer.expire() 方法源码如下:
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
close(sessionId);
}
private void close(long sessionId) {
//创建一个OpCode.closeSession业务请求
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
setLocalSessionFlag(si);
//提交给业务链处理
submitRequest(si);
}
道理发现又出现了submitRequest方法,这里暂时先不讲,在下文的请求处理中再详细介绍,在这里只需要知道,我们会调用该方法将我们的session关闭就好了。
第4章 请求处理
zookeeper 的业务处理流程就像工作流一样,其实就是一个单链表;在zookeeper启动的时候,会确立各个节点的角色特性,即leader、follower和observer,每个角色确立后,就会初始化它的工作责任链;
4.1 RequestProcessor结构
客户端请求过来,每次执行不同事务操作的时候,Zookeeper也提供了一套业务处理流程RequestProcessor。
我们来看一下RequestProcessor
初始化流程,ZooKeeperServer.setupRequestProcessors()
方法源码如下:
/**
* 初始化业务处理流程
*/
protected void setupRequestProcessors() {
//创建FinalRequestProcessor
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
//创建SyncRequestProcessor,并将FinalProcessor作为它下一个业务链
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
//启动syncProcessor
((SyncRequestProcessor)syncProcessor).start();
//创建PrepRequestProcessor,并作为第一个处理业务的RequestProcessor,将syncProcessor作为它的下一个业务链
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
//启动firstProcessor
((PrepRequestProcessor)firstProcessor).start();
}
syncProcessor
创建时,将finalProcessor
作为参数传递进来源码如下:
/**
* 创建SyncRequestProcessor,下一个责任链 FinalRequestProcessor
* @param zks
* @param nextProcessor
*/
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
//下一个责任链
this.nextProcessor = nextProcessor;
running = true;
}
firstProcessor
创建时,将syncProcessor
作为参数传递进来源码如下:
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ProcessThread(sid:" + zks.getServerId() + " cport:"
+ zks.getClientPort() + "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;
this.zks = zks;
}
PrepRequestProcessor/SyncRequestProcessor
关系图:
PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。
4.2 PrepRequestProcessor
PrepRequestProcessor是请求处理器的第1个处理器,我们把之前的请求业务处理衔接起来,一步一步分析。ZooKeeperServer.processPacket()>submitRequest()>enqueueRequest()>RequestThrottler.submitRequest() ,我们来看下RequestThrottler.submitRequest()源码,它将当前请求添加到submittedRequests队列中了,源码如下:
在submitRequest中会执行firstProcessor.processRequest方法,会进入到PrepRequestProcessor.processRequest(request)
public void processRequest(Request request) {
submittedRequests.add(request);
}
public void run() {
while (true) {
Request request = submittedRequests.take();
pRequest(request);
}
}
protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
switch (request.type) {
case OpCode.createSession: //针对连接请求做处理
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break;
}
}
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
switch (type) {
case OpCode.createSession:
int to = request.request.getInt();
request.setTxn(new CreateSessionTxn(to));
zks.sessionTracker.trackSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
}
}
从代码可以看出pRequest2Txn()方法主要做了权限校验、快照记录、事务信息记录相关的事,还并未涉及数据处理,也就是说PrepRequestProcessor其实是做了操作前权限校验、快照记录、事务信息记录相关的事。
4.3 SyncRequestProcessor
分析了PrepRequestProcessor处理器后,接着来分析SyncRequestProcessor,该处理器主要是将请求数据高效率存入磁盘,并且请求在写入磁盘之前是不会被转发到下个处理器的。
我们先看请求被添加到队列的方法:
public void processRequest(Request request) {
// request.addRQRec(">sync");
//将请求添加到queueRequest队列中
queuedRequests.add(request);
}
同样SyncRequestProcessor
是一个线程,执行队列中的请求也在线程中触发,我们看它的run方法,源码如下:
public void run() {
try {
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
//阻塞方法获取一个请求
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
//重置上次rollLog以来的txn数量
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
//保存快照数据
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
//将当前请求添加到toFlush队列中,toFlush队列是已经写入并等待刷新到磁盘的事务
toFlush.add(si);
if (toFlush.size() > 1000) {
//提交数据
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
run方法会从queuedRequests队列中获取一个请求,如果获取不到就会阻塞等待直到获取到一个请求对象,程序才会继续往下执行,接下来会调用Snapshot Thread线程实现将客户端发送的数据以快照的方式写入磁盘,最终调用flush()方法实现数据提交,flush()方法源码如下:
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
//数据提交
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
//调用下一个业务链
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
flush()
方法实现了数据提交,并且会将请求交给下一个业务链,下一个业务链为FinalRequestProcessor
。
4.4 FinalRequestProcessor
前面分析了SyncReqeustProcessor
,接着分析请求处理链中最后的一个处理器FinalRequestProcessor
,该业务处理对象主要用于返回Response。
在SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。
public void processRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request);
switch (request.type) {
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid);
zks.finishSessionInit(request.cnxn, true);
return;
}
}
if (path == null || rsp == null) {
cnxn.sendResponse(hdr, rsp, "response"); //服务端将请求返回,这时客户端会收到服务端响应
}
}
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
// register with JMX
if (valid) {
if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
serverCnxnFactory.registerConnection(cnxn);
} else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
secureServerCnxnFactory.registerConnection(cnxn);
}
}
}
调用sendResponse方法后会将请求信息返回给客户端
客户端收到服务端响应
ClientCnxnSocketNIO.doIO(Queue<Packet>, ClientCnxn)
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
if (sockKey.isReadable()) {
if (!initialized) {
readConnectResult();
}
}
}
void readConnectResult() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
this.sessionId = conRsp.getSessionId(); //连接建立完成
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
下文预告
- 理解Zookeeper的watcher机制原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76687.html