源码解析 synchronized
前置文章看图学源码之 synchronized 源码分析一 : 锁的介绍和锁优化
写文目的:以最直观的方式阅读源码,深入了解底层实现原理
重量级锁 Monitor
的数据结构
ObjectMonitor()
// 结构体
ObjectMonitor() {
_header = NULL;
// 用来表示该线程的获取锁的次数
_count = 0;
// 等待中的线程数
_waiters = 0,
// 线程的重入次数
_recursions = 0;
_object = NULL;
// 标识拥有该Monitor的线程
_owner = NULL;
// 等待线程组成的双向循环链表
_WaitSet = NULL;
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
// 多线程竞争锁进入时的单向链表
_cxq = NULL ;
FreeNext = NULL ;
// _owner从该双向循环链表中唤醒线程节点
_EntryList = NULL ;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
主要变量
-
_cxq : 竞争队列,所有
请求锁的线程
首先被放在这个竞争队列
中所有线程会先 cas 的方式获取锁,但是要是获取不到就会进入cxq (竞争队列)
-
_EntryList :_cxq中那些有资格成为候选资源的线程被移动到
EntryList
中Owner 线程释放锁的时候,JVM 会将 cxq 中的
线程
放到 EntryList 的集合中,随后再将 EntryList 中的某个线程指定为Ready Thread
。 -
_WaitSet:某个拥有
ObjectMonitor
的线程在调用Object.wait()
方法之后将被阻塞,然后该线程将被放置在WaitSet
链表中。Owner
线程在调用Object.wait()
方法之后将被阻塞,此时就会进入WaitSet,直到某个时刻通过Object.notify()
或者Object.notifyAll()
唤醒,该线程就会重新进入EntryList
中 -
_owner:标识
拥有该Monitor的线程
-
_recursions:线程的重入次数
流程:
- 当线程刚进来时,会进入 cxq 的队列中
- 当多个线程同时访问一段同步代码时,首先会进入
_EntryList
集合。 - 当线程获取到对象的
monitor
后进入_Owner
区域,并把monitor
中的owner
变量设置为当前线程
- 当我们的 owner
释放锁
时,会将 cxq 里面的线程放到 EntryList 中 - 这个时候由
OnDeck Thread
去进行锁竞争,竞争失败的则继续留在EntryList
中 - 当调用
Object.wait()
会进入 _WaitSet 队列,owner的变量就会恢复成null, ;只有被唤醒时,才会重新进入 EntryList 中
值得注意的是:唤醒操作
涉及到操作系统调度会有额外的开销
CASE(_monitorenter)
从字节码的mointerenter
到 源码的 mointerenter
openjdk/hotspot/src/share/vm/interpreter/bytecodeInterpreter.cpp
简化版本
CASE(_monitorenter): { // 执行的是 monitorenter指令
oop lockee = STACK_OBJECT(-1); //它从操作数栈中取出栈顶的对象引用,将其赋值给lockee变量
// 通过CHECK_NULL宏对lockee进行空指针检查。
CHECK_NULL(lockee);
BasicObjectLock* limit = istate->monitor_base(); //从istate中获取监视器的基地址
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();//从istate中获取堆栈的基地址
BasicObjectLock* entry = NULL;
//循环查找一个 空闲的监视器 或者 已经为该对象分配的监视器 然后在上述的两个地址之间进行遍历。
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent; //如果找到一个空闲的监视器(most_recent->obj() == NULL),则将其赋值给entry变量
else if (most_recent->obj() == lockee) break; // 如果找到一个与lockee相匹配的监视器(most_recent->obj() == lockee),则跳出循环。
most_recent++;
}
if (entry != NULL) {
entry->set_obj(lockee); //如果找到了一个空闲的监视器(entry != NULL),则将该监视器的对象设置为lockee。
int success = false; //定义了一个布尔变量success来表示加锁是否成功。
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;//将值转换为uintptr_t类型
markOop mark = lockee->mark(); //mark markOop 类型的变量,表示对象的标记信息--> 获取到 lockee 对象的标记信息,并将其赋值给 mark 变量
intptr_t hash = (intptr_t) markOopDesc::no_hash;
//处理偏向锁的情况
//代码中的mark是一个指针,mark->has_bias_pattern()用来判断该指针是否具有偏向锁模式。
if (mark->has_bias_pattern()) {
//如果具有偏向锁模式
uintptr_t thread_ident;// 当前线程的标识符thread_ident
uintptr_t anticipated_bias_locking_value;// 用于生成偏向锁的锁定值
thread_ident = (uintptr_t)istate->thread();
anticipated_bias_locking_value = // 生成偏向锁的锁定值计算的过程
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
// 要是 偏向锁的锁定值 是 0 ,如果是的话,表示 已经偏向了当前线程
if (anticipated_bias_locking_value == 0) {
// 如果启用了偏向锁统计功能,并且 成功进入了一个偏向锁 ,就会增加偏向锁进入计数的值
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
// 不需要进行任何操作,直接将success设置为true。
success = true;
}
// 如果结果不等于0,表示anticipated_bias_locking_value中的偏向锁标志位被设置,即对象已经被偏向于某个线程
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// 尝试撤销偏向锁
markOop header = lockee->klass()->prototype_header(); // 获取lockee对象的原型头部header
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash); // 根据给定的hash值设置新的头部
}
//使用原子操作cmpxchg_ptr比较并交换lockee对象的标记地址上的值,如果交换成功,表示偏向锁撤销成功,将success设置为true。
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
//cas 成功并且 如果启用了 偏向锁统计功能 ,并且 成功撤销了一个偏向锁 ,就会增加偏向锁撤销计数的值。
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
// 如果 epoch 标志位不为0 ,就是说 anticipated_bias_locking_value中的epoch 标志位被设置
// epoch_mask_in_place 意思就是 ===> 从标记(mark)中提取出对象的时间戳(epoch)
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
//尝试 重偏向
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident); // 构造新的头部new_header,将lockee对象的原型头部与当前线程的标识进行位操作合并
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);//根据给定的hash值设置新的头部
}
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
// 如果cas 交换成功,表示偏向锁重新偏向成功,将success设置为true。
if (PrintBiasedLockingStatistics)
//如果启用了偏向锁统计功能,并且 成功重偏向 ,就会增加 重偏向锁进入计数的值
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
// 如果cas 交换失败,则调用虚拟机的monitorenter方法来获取锁。
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
//设置对象的偏向锁标记,并在成功设置后增加匿名偏向锁的计数。
else {
//
/*
匿名偏向锁是一种优化手段,用于在没有竞争的情况下提高单线程执行同步代码块的性能。
在正常情况下,偏向锁会将锁的持有者的 线程ID 记录在对象头中,以便在下次获取锁时进行快速检查。
但是,匿名偏向锁 不会记录具体的线程ID ,而是将偏向锁标记 设置为一个特殊的值 ,表示该对象的偏向锁是匿名的。
匿名偏向锁的使用场景是当对象的锁 在短时间内被多个线程获取和释放,或者在高度并发的情况下。
这种情况下,具体的线程ID记录可能会导致频繁的 偏向撤销和重新偏向 ,而匿名偏向锁则可以避免这种开销。
通过使用匿名偏向锁,Java虚拟机可以在高并发环境下更好地适应对象锁的竞争情况,提高程序的执行效率。
*/
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place | (uintptr_t)markOopDesc::age_mask_in_place | epoch_mask_in_place)); // 计算得到 header
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash); //根据给定的hash值设置新的头部
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident); //计算得到 new_header
// 用于调试目的,它在非调试模式下被忽略。
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
//使用原子操作cmpxchg_ptr将new_header和lockee对象的mark_addr()地址处的值进行比较并交换
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
// 如果交换成功,说明该对象的偏向锁标记已经被设置为new_header。
if (PrintBiasedLockingStatistics)
//如果启用了打印偏向锁统计信息的选项,则会增加匿名偏向锁的计数
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
// 如果cas 交换失败,则调用虚拟机的monitorenter方法来获取锁。
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
// 获取锁失败
if (!success) {
markOop displaced = lockee->mark()->set_unlocked();// 获取lockee对象的标记(mark)。将displaced标记设置为未锁定状态。
entry->lock()->set_displaced_header(displaced); // 将displaced标记设置为entry的锁(lock)的替代标头(displaced header)
// 如果指定了-XX:+UseHeavyMonitors,则call_vm=true,代表禁用偏向锁和轻量级锁
bool call_vm = UseHeavyMonitors;
//根据条件是否调用虚拟机(call_vm) 或 使用原子操作(cmpxchg_ptr)将entry的锁的地址与displaced进行比较和交换。如果比较结果不等于displaced
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 检查是否是简单的递归情况。是的话,当线程已经拥有了被替换的标记所对应的锁时
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL); // 将锁的替代标头设置为NULL,可以避免重复获取锁的操作,提高性能。
} else {// 如果不是简单的递归情况,则调用虚拟机的monitorenter方法,该方法用于获取监视器锁。
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
//更新程序计数器(PC)和栈顶指针(TOS),并继续执行
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
//Entry == null
else {
//处理在多线程环境中出现竞争条件时的情况,通过重新执行代码来尝试解决竞争问题
istate->set_msg(more_monitors); // istate 的消息设置为 more_monitors
UPDATE_PC_AND_RETURN(0); // 更新程序计数器(PC)并返回,以重新执行代码
}
}
mointerenter
虚拟机的monitorenter方法
openjdk根路径/hotspot/src/share/vm/interpreter
路径下的interpreterRuntime.cpp
文件中对mointerenter
/*
流程为:
1. 验证监视器:在执行获取锁操作之前,会验证监视器的状态,确保操作的正确性。
2. 如果启用了偏向锁打印统计信息,会增加慢路径计数,用于统计偏向锁的使用情况。
3. 获取对象:从BasicObjectLock中获取需要获取锁的对象。
4. 确保对象是在堆中或为null:校验需要获取锁的对象是否在堆中,或者是否为null。如果不满足条件,会抛出IllegalMonitorStateException异常。
5. 根据是否启用了偏向锁选择路径:根据是否启用了偏向锁(UseBiasedLocking)来选择使用快速路径还是慢速路径来获取锁。
5.1如果启用了偏向锁,会调用ObjectSynchronizer::fast_enter方法,尝试快速获取锁。
5.2如果未启用偏向锁,会调用ObjectSynchronizer::slow_enter方法,使用慢速路径获取锁。
6. 再次确保对象是在堆中或为null:在获取锁操作完成后,再次校验获取锁的对象是否在堆中,或者是否为null。
7. 再次验证监视器:在获取锁操作完成后,再次验证监视器的状态,确保操作的正确性。
InterpreterRuntime::monitorenter方法的作用是进入监视器(获取锁);它会验证监视器的状态,获取需要获取锁的对象,并根据是否启用了偏向锁选择相应的路径来获取锁。获取锁完成后,再次校验对象的状态和监视器的状态,确保操作的正确性。
*/
// 进入监视器(获取锁)
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
// 验证监视器是否正确
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
// 如果启用了偏向锁打印统计信息,原子类的方式 增加 偏向锁统计数据的计数器。
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
// 从BasicObjectLock中获取对象
Handle h_obj(thread, elem->obj()); //创建了一个名为h_obj的句柄对象
// 确保对象是在堆中或者为null
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),"must be NULL or an object");
// 如果启用了偏向锁,尝试快速获取锁进入同步块
if (UseBiasedLocking) {
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {// 如果没有启用了偏向锁,使用慢路径获取锁进入同步块
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
// 再次确保对象是在堆中或者为null
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()), "must be NULL or an object");
#ifdef ASSERT
// 再次验证监视器
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
fast_enter
/*
用于在快速路径下获取对象的锁。
流程如下:
1. 判断是否启用偏向锁:首先,会判断是否启用了偏向锁(UseBiasedLocking)。
2. 处理偏向锁的情况:如果启用了偏向锁,会进行以下操作:
- 确保当前不在安全点:由于偏向锁的撤销和重偏向操作不能在安全点进行,需要确保当前线程不在安全点。
- 尝试撤销并重偏向偏向锁:调用BiasedLocking::revoke_and_rebias方法,尝试撤销并重偏向对象的偏向锁。
- 如果偏向锁已经被撤销并且重偏向成功,直接返回,无需进行后续操作。
- 如果当前线程在安全点,无法向**VM线程**重偏向,此时只进行偏向锁的撤销操作。
- 确保对象没有偏向锁的标记:确保对象的标记字段没有偏向锁的标记,表示偏向锁已经被撤销。
- 使用慢路径获取锁:如果快速获取锁失败(无论是因为偏向锁处理或其他原因),会调用slow_enter方法来使用慢路径获取对象的锁。
如果启用了偏向锁,会尝试撤销并重偏向偏向锁。无论是否成功,都会调用slow_enter方法进行锁的获取,以确保能够成功获取锁。
这样可以尽量使用快速路径来获取锁,提高性能。
流程简单来说就是:
如果启用了偏向锁,并且当前不在安全点,那么会尝试撤销并重偏向该对象的偏向锁。如果偏向锁已经被撤销并且重偏向成功,那么就直接返回,
否则,如果当前在安全点,就在安全点撤销偏向锁。
在这之后,会检查对象是否已经没有偏向锁的标记。如果快速获取锁失败,那么就使用慢路径获取锁。
*/
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
// 判断是否启用偏向锁
if (UseBiasedLocking) {
// 启用偏向锁 并且 当前不在安全点, 因为偏向锁的 撤销和重偏向 不能在安全点进行
if (!SafepointSynchronize::is_at_safepoint()) {
// 尝试撤销并重偏向该对象的偏向锁
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
/*
enum Condition {
NOT_BIASED = 1, 没有偏向
BIAS_REVOKED = 2, 撤销偏向
BIAS_REVOKED_AND_REBIASED = 3 撤销偏向 并 重偏向
};
*/
// 如果偏向锁已经被撤销并且重偏向成功,直接返回,不需要进入同步块
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
// 如果当前在安全点,断言不能尝试向VM线程重新偏向
assert(!attempt_rebias, "can not rebias toward VM thread");
// 在安全点撤销偏向锁
BiasedLocking::revoke_at_safepoint(obj);
}
// 断言此时对象的标记不应该有偏向模式
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 如果快速获取锁失败,使用慢路径获取锁
slow_enter (obj, lock, THREAD);
}
revoke_and_rebias
// 这段代码的作用是根据对象的标记信息和偏向锁的启发式结果,执行相应的撤销和重新调整偏向锁的操作,并返回相应的状态码。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
// 检查是否在 安全点 之外调用,如果在安全点,则会触发断言错误
/*
安全点(Safepoint)是指程序执行的一种状态,在该状态下,所有线程都被暂停,以确保线程之间的一致性和正确性
*/
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
markOop mark = obj->mark();
// 撤销匿名偏向对象的偏向性
//检查对象的标记(mark)是否是匿名偏向状态,并且不需要重新调整偏向锁,那么就 撤销偏向锁
if (mark->is_biased_anonymously() && !attempt_rebias) {
// if 成立
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());// 创建一个未偏向的原型对象
// 通过使用原子操作比较和交换(cmpxchg)将 未偏向的标记 设置到 对象的标记位置实现 。
// 尝试在不在 安全点的情况下撤销对象的偏向锁
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
// 如果成功,则返回BIAS_REVOKED。
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
// 如果对象的标记具有偏向模式
/*
1. 如果对象的原型标记没有偏向模式,说明该对象的偏向已经过时,直接通过原子操作cmpxchg_ptr将对象的标记更新为原型标记,然后返回BIAS_REVOKED。
2. 如果对象的原型标记的偏向epoch与对象的标记的偏向epoch不相同,说明该对象的偏向已经过期,根据需要进行撤销或重新调整偏向锁。代码使用原子操作cmpxchg_ptr尝试撤销或重新调整偏向锁,然后返回相应的状态码。
*/
else if (mark->has_bias_pattern()) {
Klass* k = obj->klass(); // 获取obj对象的类
markOop prototype_header = k->prototype_header(); // 获取类的原型头部
// 检查原型标记(prototype_header)是否具有偏向模式,
if (!prototype_header->has_bias_pattern()) {
//如果没有偏向模式,表示该对象的偏向已经过时,需要更新标记头
markOop biased_value = mark;
// 使用CAS(比较和交换)更新对象的标记
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
// 确保在比较并交换操作之后,无论是否发生竞争,对象的偏向模式都被成功撤销
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
}
//原型标记的偏向 epoch 与对象标记的偏向 epoch 不匹配,说明对象的偏向已过期,需要根据尝试重新偏向的标志进行相应处理。
else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
// 如果需要重新偏向,创建一个重新偏向的原型对象,并使用原子操作将其与obj对象的标记地址进行比较并交换
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");//检查当前线程是否为Java线程
markOop biased_value = mark;//将mark赋值给biased_value变量
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());//使用mark的年龄和prototype_header的偏向周期来编码一个新的mark对象,赋值给rebiased_prototype变量。
// 比较 rebiased_prototype 指向的内存地址上的值是否等于 obj 对象的 mark 属性的值。如果相等,则将 mark 的值更新到 rebiased_prototype 指向的内存地址上,否则不进行任何操作。
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
//如果比较交换成功,即res_mark等于biased_value,则返回BIAS_REVOKED_AND_REBIASED。
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
}
//如果不需要重新偏向,创建一个未偏向的原型对象,则将并使用原子操作将其与obj对象的标记地址进行比较并交换,并返回BIAS_REVOKED。
else {
markOop biased_value = mark; //将mark赋值给biased_value变量。
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());//创建一个无偏向的原型mark对象,赋值给unbiased_prototype变量。
//使用原子操作cmpxchg_ptr将unbiased_prototype与obj的mark_addr()位置的值进行比较并交换。
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
//如果比较交换成功,即res_mark等于biased_value,则返回BIAS_REVOKED
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
//根据更新后的偏向信息,代码会调用相应的方法来 更新偏向锁的启发式结果。
/*
1. 如果启发式结果为HR_NOT_BIASED,表示对象没有偏向锁,直接返回NOT_BIASED。
2. 如果启发式结果为HR_SINGLE_REVOKE,表示需要单独撤销对象的偏向锁。代码根据不同情况进行处理,可能会涉及到撤销偏向锁的操作和相关事件的记录。
3. 如果启发式结果为HR_BULK_REVOKE或HR_BULK_REBIAS,表示需要批量撤销或重新调整对象的偏向锁。代码创建相应的事件对象,并调用VM_BulkRevokeBias对象的execute方法进行操作,然后根据事件的条件决定是否提交事件记录。
*/
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
// 如果更新后的结果是HR_NOT_BIASED,则返回NOT_BIASED。 ---- 不偏向
if (heuristics == HR_NOT_BIASED) {
return NOT_BIASED;
}
// 如果结果是HR_SINGLE_REVOKE
else if (heuristics == HR_SINGLE_REVOKE) { ------ 单个撤销
Klass *k = obj->klass(); //获取对象的类信息,
markOop prototype_header = k->prototype_header(); // 获取对象的原型头部并赋值给变量"prototype_header"。
//对象的偏向锁状态是否为THREAD,并且检查偏向锁的epoch是否与当前线程的epoch相同。
if (mark->biased_locker() == THREAD && prototype_header->bias_epoch() == mark->bias_epoch()) {
/*
一个线程正在尝试撤销对一个偏向于它自己的对象的偏向锁,这通常是由于计算标识哈希码而引起的。
在这种情况下,我们可以再次避免进行安全点,因为我们只需要遍历自己的堆栈。
在撤销路径中没有其他线程进行撤销操作的竞争,因为我们没有到达安全点。
同时还需要检查偏向 epoch ,因为即使线程匹配,另一个线程也可能使用CAS操作来窃取具有过时 epoch 的对象的偏向锁。
*/
ResourceMark rm; // 并赋值给变量"prototype_header"。
if (TraceBiasedLocking) { //如果启用了TraceBiasedLocking,则打印一条调试信息。
tty->print_cr("Revoking bias by walking my own stack:");
}
EventBiasedLockSelfRevocation event; //创建一个事件。
//调用revoke_bias函数来 撤销对象的偏向锁
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD, NULL);
//清空当前线程的cached_monitor_info。
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
//如果事件需要提交(commit),则设置事件的锁类 为变量"k"。
if (event.should_commit()) {
event.set_lockClass(k);
event.commit();
}
// 返回 revoke_bias 的返回值
return cond;
}
// //对象的偏向锁状态 不是 THREAD,或者 检查偏向锁的epoch与当前线程的epoch 不相同。
else {
EventBiasedLockRevocation event;
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
// 如果事件需要提交(commit)且revoke的状态码不等于NOT_BIASED,
if (event.should_commit() && (revoke.status_code() != NOT_BIASED)) {
event.set_lockClass(k); //则设置事件的锁类为变量"k",
event.set_safepointId(SafepointSynchronize::safepoint_counter() - 1);// 设置事件的安全点ID为当前安全点计数器减1,
event.set_previousOwner(revoke.biased_locker());// 设置事件的前一个所有者为revoke的biased_locker()。
event.commit();
}
//返回revoke的状态码。
return revoke.status_code();
}
}
assert((heuristics == HR_BULK_REVOKE) || (heuristics == HR_BULK_REBIAS), "?");
EventBiasedLockClassRevocation event;
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
// 批量撤销锁,在特定的时刻执行
VMThread::execute(&bulk_revoke);
if (event.should_commit()) {
event.set_revokedClass(obj->klass());
event.set_disableBiasing((heuristics != HR_BULK_REBIAS));
// Subtract 1 to match the id of events committed inside the safepoint
event.set_safepointId(SafepointSynchronize::safepoint_counter() - 1);
event.commit();
}
return bulk_revoke.status_code();
}
revoke_bias
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread, JavaThread** biased_locker) {
markOop mark = obj->mark();
if (!mark->has_bias_pattern()) { //检查对象的标记是否具有偏向锁模式
//如果没有,则直接返回NOT_BIASED。
if (TraceBiasedLocking) {
ResourceMark rm;
tty->print_cr(" (Skipping revocation of object of type %s because it's no longer biased)",
obj->klass()->external_name());
}
return BiasedLocking::NOT_BIASED;
}
// 获取对象的年龄,并根据年龄创建 偏向锁原型 和 无锁原型 。
uint age = mark->age();
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
//如果需要打印调试信息,则打印相关信息。
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
ResourceMark rm;
tty->print_cr("Revoking bias of object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s , prototype header " INTPTR_FORMAT " , allow rebias %d , requesting thread " INTPTR_FORMAT,
p2i((void *)obj), (intptr_t) mark, obj->klass()->external_name(), (intptr_t) obj->klass()->prototype_header(), (allow_rebias ? 1 : 0), (intptr_t) requesting_thread);
}
//获取持有偏向锁的线程
JavaThread* biased_thread = mark->biased_locker();
//如果为空,则表示对象是匿名偏向的,(可能的情况是,由于计算了对象的标识哈希码,导致偏向锁被撤销。)
if (biased_thread == NULL) {
// 根据是否 允许重新偏向 来设置对象的标记,并返回BIAS_REVOKED。
//如果不允许重新偏向(allow_rebias为false),则将对象的标记设置为无偏向原型(unbiased_prototype)。
if (!allow_rebias) {
obj->set_mark(unbiased_prototype);
}
//如果启用了偏向锁跟踪并且满足跟踪条件,则打印一条相关信息。
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of anonymously-biased object");
}
//返回BiasedLocking::BIAS_REVOKED表示偏向锁被撤销
return BiasedLocking::BIAS_REVOKED;
}
//判断一个对象所偏向的线程是否仍然存活
bool thread_is_alive = false;
//请求的线程是否与偏向的线程相同,如果相同,则将 thread_is_alive 设置为 true
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
// 如果不相同,则遍历所有的 Java 线程,逐个与偏向的线程进行比较
for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
//如果找到相同的线程,则将 thread_is_alive 设置为 true,并且跳出循环
if (cur_thread == biased_thread) {
thread_is_alive = true;
break;
}
}
}
//如果线程已经死亡
if (!thread_is_alive) {
//则根据是否允许重新偏向来设置对象的标记。
if (allow_rebias) {
//如果允许重新偏向,则将对象的标记设置为biased_prototype
obj->set_mark(biased_prototype);
} else {
//否则,将对象的标记设置为unbiased_prototype。
obj->set_mark(unbiased_prototype);
}
// 如果启用了跟踪偏向锁并且(冗长模式或者不是批量模式),则会打印一条消息。
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of object biased toward dead thread");
}
// 最后返回BIAS_REVOKED。
return BiasedLocking::BIAS_REVOKED;
}
//线程拥有偏向锁仍然活动。 检查它是否当前拥有锁,如果是,则将所需的替代头写入该线程的堆栈。 否则,将对象的头恢复为未锁定或无偏向状态。
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);//获取或计算监视器信息
BasicLock* highest_lock = NULL;
// 遍历一个可增长数组 cached_monitor_info,查找其中与给定对象 obj 拥有者相同的监视器信息,并对最高锁进行修复。
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);//获取当前索引对应的监视器信息
if (mon_info->owner() == obj) {//如果 mon_info 的拥有者与给定的对象 obj 相同
if (TraceBiasedLocking && Verbose) {
//打印日志信息,显示 mon_info 的拥有者和 obj 的指针地址。
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") == obj (" PTR_FORMAT ")",
p2i((void *) mon_info->owner()),
p2i((void *) obj));
}
// 将一个空指针转换为 markOop 类型,并将结果赋值给 mark。
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();//将 mon_info 的锁赋值给 highest_lock。
highest_lock->set_displaced_header(mark);//将 mark 设置为 highest_lock 位移头。
}
//如果 mon_info 的拥有者与给定的对象 obj 不相同
else {
if (TraceBiasedLocking && Verbose) {
//打印日志信息
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") != obj (" PTR_FORMAT ")",
p2i((void *) mon_info->owner()),
p2i((void *) obj));
}
}
}
// 根据对象的锁状态来执行偏向锁的撤销操作
// 检查highest_lock是否为NULL,如果不是NULL,说明对象当前被偏向锁所拥有
if (highest_lock != NULL) {
highest_lock->set_displced_header(unbiased_prototype); // 将被偏向锁替换为普通对象头。
obj->release_set_mark(markOopDesc::encode(highest_lock)); // 将对象的头部指向displaced mark
assert(!obj->mark()->has_bias_pattern(), "illegal mark state: stack lock used bias bit");//检查标记状态是否合法
//如果启用了TraceBiasedLocking且Verbose为真,或者不是批量操作,则打印
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-locked object");
}
}
//如果highest_lock为NULL,说明对象当前没有被偏向锁所拥有
else {
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-unlocked object");
}
//如果 允许重新偏向,则将对象的标记设置为 biased_prototype。
if (allow_rebias) {
obj->set_mark(biased_prototype);
}
// 否则,将未锁定的值存储到对象的头部。
else {
obj->set_mark(unbiased_prototype);
}
}
#if INCLUDE_JFR
// 如果有需要,返回持有偏向的线程的信息。
if (biased_locker != NULL) {
*biased_locker = biased_thread;
}
#endif // INCLUDE_JFR
return BiasedLocking::BIAS_REVOKED;
}
slow_enter
/*
流程:
1. 获取对象的标记字段:首先,会获取对象的标记字段,用于判断对象的锁状态。
2. 处理中立状态的情况:如果对象的标记字段是中立状态,说明对象还没有被加锁。此时,会尝试使用CAS操作将锁设置为轻量级锁(自旋锁)。
- 设置displaced header:将锁的displaced header设置为对象的标记字段。
- 尝试使用CAS操作设置锁:使用CAS操作将对象的标记字段替换为锁对象。如果CAS操作成功,表示成功获取了轻量级锁,直接返回。
- 如果CAS操作失败,进入下一步锁膨胀的处理。
- 处理已持有锁的情况:如果对象的标记字段中有锁定者,并且当前线程已经持有该锁,说明当前线程已经拥有锁,直接返回。
- 设置displaced header为unused_mark:如果以上条件都不满足,说明当前线程需要进行锁膨胀。将锁的displaced header设置为unused_mark,表示该锁不再可用。
- 锁膨胀:调用ObjectSynchronizer::inflate方法来进行锁的膨胀处理,将轻量级锁膨胀为重量级锁。
slow_enter方法的作用是在**慢速路径下获取对象的锁**。
通过CAS操作将锁设置为轻量级锁,如果CAS操作失败,则进行锁膨胀处理。这样可以尽量使用快速路径来获取锁,提高性能。
*/
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
// 获取对象的标记字段
markOop mark = obj->mark();
// 确保此时没有偏向锁的标记
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
/*
检查对象的标记是否是中性状态。如果是中性状态,表示对象没有被锁定。在这种情况下,代码会使用CAS操作将锁对象的标记替换为当前对象的标记,并将当前对象的标记设置为锁对象的displaced_header。如果CAS操作成功,即替换标记的ST必须是可见的,那么代码会释放堆栈锁并返回。如果CAS操作失败,则会进入锁膨胀的流程。
第二个条件分支是检查对象的标记是否具有锁拥有者(locker),并且当前线程拥有该锁。如果满足这些条件,表示当前线程已经拥有了锁,那么代码会避免重新锁定相同的锁,并将锁对象的displaced_header设置为NULL,并返回。
*/
if (mark->is_neutral()) {
// 预期成功的CAS --> 替换标记的ST必须是可见的 <= CAS执行的ST。优先使用轻量级锁(又叫:自旋锁)
lock->set_displaced_header(mark); //设置被替换的标记为锁对象(lock)的displaced_header。
// 如果CAS操作成功,即将锁对象的标记地址(obj()->mark_addr())的值替换为标记(mark)
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
// 释放堆栈锁并返回
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ... 如果CAS操作失败,进入锁膨胀,只能向下执行inflate()锁膨胀方法了
}
//如果标记具有锁拥有者(locker)并且当前线程(THREAD)拥有该锁
else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
// 避免重新锁定相同的锁。
assert(lock != mark->locker(), "must not re-lock the same lock");
// 避免使用相同的BasicLock重新锁定。
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
//设置displaced_header为NULL,并返回。
lock->set_displaced_header(NULL);
return;
}
#if 0
// 如果标记具有监视器(monitor)并且监视器已经被当前线程(THREAD)进入,则进行以下操作(被注释掉的优化代码):设置displaced_header为NULL,并返回。
if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {
lock->set_displaced_header (NULL) ;
return ;
}
#endif
// 设置displaced_header为未使用的标记(unused_mark()),避免看起来像是可重入锁,并且不能看起来已经被锁定。
lock->set_displaced_header(markOopDesc::unused_mark());
// 锁膨胀
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
inflate
/*
用于将锁从`轻量级锁或中立状态`膨胀`为重量级锁`。
流程:
1. 检查`对象的锁状态`:首先,会检查对象的标记(mark)字段,判断对象当前的锁状态。
2. 处理`已经是重量级锁`的情况:如果对象的标记字段中已经有了重量级锁的信息,说明对象已经是重量级锁,直接返回。
3. 处理`正在膨胀`的情况:如果对象的标记字段中是`INFLATING`状态,表示锁正在膨胀过程中。此时,会自旋等待膨胀完成。
4. 处理`轻量级锁或中立状态`的情况:如果对象的标记字段中是轻量级锁或中立状态,说明需要将锁膨胀为重量级锁。
- 为对象分配ObjectMonitor:首先,会分配一个ObjectMonitor对象,用于存储重量级锁的信息。
- 设置ObjectMonitor的字段:然后,会设置ObjectMonitor对象的字段,如`_Responsible、OwnerIsThread、_recursions`等。
- 使用CAS操作将对象的标记字段设置为ObjectMonitor的指针:通过CAS操作,将对象的标记字段从轻量级锁或中立状态设置为指向ObjectMonitor对象的指针。
- 处理CAS操作失败的情况:如果CAS操作失败,说明有其他线程正在操作该对象,会释放刚分配的ObjectMonitor对象,并重新尝试整个膨胀过程。
5. 返回ObjectMonitor对象:膨胀成功后,返回分配的ObjectMonitor对象。
`inflate`方法的`作用`是将对象的锁从`轻量级锁或中立状态膨胀为重量级锁`。在膨胀过程中,会分配ObjectMonitor对象,并将对象的标记字段设置为指向`ObjectMonitor`的指针。这样可以确保对象在并发情况下能够正确地进行同步操作。
在inflate方法中,首先会检查对象的锁状态。如果已经是重量级锁,直接返回。如果正在膨胀,进行自旋等待膨胀完成。如果是轻量级锁或中立的,会开始膨胀锁。膨胀过程中,需要分配一个ObjectMonitor,设置好其字段,然后使用CAS操作将对象的标记字段设置为指向ObjectMonitor的指针。如果CAS操作失败,说明有其他线程正在操作该对象,释放刚才分配的ObjectMonitor,然后重试整个过程。
*/
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// 确保当前不在安全点,因为在安全点时不能膨胀锁
assert (Universe::verify_in_progress() ||!SafepointSynchronize::is_at_safepoint(), "invariant") ;
for (;;) {
const markOop mark = object->mark() ;
// 确保此时没有偏向锁的标记
assert (!mark->has_bias_pattern(), "invariant") ;
//mark 可以处于以下状态之一:
// * Inflated - just return 仅仅返回
// * Stack-locked - coerce it to inflated 轻量级锁,需强迫它膨胀
// * INFLATING - busy wait for conversion to complete 膨胀中,需自旋等待转换完成
// * Neutral - aggressively inflate the object. 积极地使object发生膨胀
// * BIASED - Illegal. We should never see this 进入此方法必定不是偏向锁状态,直接忽略即可
// CASE: inflated 如果对象已经是重量级锁(inflated),则直接返回对象的监视器。
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");//确保监视器对象的头部是中性状态。
assert (inf->object() == object, "invariant") ;//确保监视器对象所监视的对象与给定的object对象相同。
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");//确保监视器对象在对象池中是有效的。
return inf ; //返回监视器对象inf
}
// CASE: INFLATING 锁膨胀正在进行中,膨胀的堆栈锁(轻量级锁)
// 其他线程正在从堆栈锁(轻量级锁)定转换为膨胀。只有那个线程才能完成膨胀——其他线程必须等待。
// INFLATING状态是暂时的、并发地,我们 spin/yield/park和poll的markword,等待inflation结束。
// 我们总是可以通过将线程停在某个辅助列表上来消除轮询。
// 如果正在膨胀,自旋等待膨胀完成 如果对象处于膨胀中(inflating)的状态,则自旋等待膨胀完成。
if (mark == markOopDesc::INFLATING()) {
// 表示正在进行膨胀操作
TEVENT (Inflate: spin while INFLATING) ;
// 读取稳定的标记
ReadStableMark(object) ;
continue ;
}
// CASE: stack-locked 此时锁为:轻量级锁,需强迫它膨胀为重量级锁
// Could be stack-locked either by this thread or by some other thread. 可能被此线程或其他线程堆栈锁定
//
// Note that we allocate the objectmonitor speculatively, _before_ attempting
// to install INFLATING into the mark word. We originally installed INFLATING,
// allocated the objectmonitor, and then finally STed the address of the
// objectmonitor into the mark. This was correct, but artificially lengthened
// the interval in which INFLATED appeared in the mark, thus increasing
// the odds of inflation contention.
// 我们大胆地分配objectmonitor,在此之前尝试将INFLATING状态先设置到mark word。
// 我们先设置了INFLATING状态标记,然后分配了objectmonitor,最后将objectmonitor的地址设置到mark word中。
// 这是正确的,但人为地延长了INFLATED出现在mark上的时间间隔,从而增加了锁膨胀的可能性。
// 意思就是:markword设置状态INFLATING -> 分配锁 -> markword设置状态INFLATED(膨胀重量级锁成功)
// 如果是轻量级锁,开始膨胀 检查一个标记对象(mark)是否拥有一个锁 如果对象是轻量级锁(stack-locked),则尝试将其膨胀为重量级锁。
if (mark->has_locker()) {
// 将对象监视器重置为初始状态
//分配一个对象监视器(ObjectMonitor),然后通过CAS(Compare and Swap)操作将对象的标记状态改为INFLATING,表示正在进行锁膨胀。
ObjectMonitor * m = omAlloc (Self) ; // 一个指向对象监视器的指针
// 乐观地准备对象监视器 - 预期CAS成功。我们在CAS之前这样做是为了最小化INFLATING(膨胀中)在标记中出现的时间长度。
m->Recycle(); // 对象监视器的状态被重置为初始值
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
// 使用原子操作更新对象的标记。
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
// 代码检查交换的结果是否与mark相等。
// 如果不相等,说明在更新过程中发生了干扰,
if (cmp != mark) {
omRelease (Self, m, true) ; // 释放资源
continue ; //并继续循环执行以重试更新操作。
}
markOop dmw = mark->displaced_mark_helper() ; // 获取被锁定对象的标记(mark)。
//确保被锁定对象的标记是中性状态。
assert (dmw->is_neutral(), "invariant") ;
// Setup monitor fields to proper values -- prepare the monitor
m->set_header(dmw) ; // 设置监视器的头部为displaced_mark_helper()返回的值。
// Optimization: if the mark->locker stack address is associated
// with this thread we could simply set m->_owner = Self and
// m->OwnerIsThread = 1. Note that a thread can inflate an object
// that it has stack-locked -- as might happen in wait() -- directly
// with CAS. That is, we can avoid the xchg-NULL .... ST idiom.
m->set_owner(mark->locker()); //设置监视器的所有者为mark->locker()返回的值。
m->set_object(object); // 设置监视器的对象为给定的object。
// TODO-FIXME: assert BasicLock->dhw != 0.
// Must preserve store ordering. The monitor state must
// be stable at the time of publishing the monitor address.
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ; //使用断言(guarantee)来验证对象的当前标记是否为markOopDesc::INFLATING(),如果不是,则会触发断言失败
object->release_set_mark(markOopDesc::encode(m)); // 释放之前的标记
// 希望性能计数器在不同的缓存行上分配,以避免在多处理器系统上的伪共享问题。
// ObjectMonitor::_sync_Inflations 是一个静态变量,用于记录对象的膨胀次数。
// ObjectMonitor::_sync_Inflations->inc() 是对 _sync_Inflations 的计数加一操作。
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ; // 事件记录,用于追踪对象膨胀的过程。
if (TraceMonitorInflation) {
if (object->is_instance()) { // 用于判断对象是否为实例对象。
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", (void *) object, (intptr_t) object->mark(),object->klass()->external_name()); // 获取对象所属类的外部名称
}
}
return m ;
}
// 如果对象是中性状态(neutral),则将其积极膨胀为重量级锁
// CASE: neutral
// TODO-FIXME: 对于进入(entry),我们目前是先膨胀(inflate),然后尝试CAS _owner。 如果我们知道我们是为了进入而膨胀,最好的方式是通过将一个预先锁定的objectMonitor指针摆动到对象头来进行膨胀。成功的CAS膨胀了对象,并将所有权授予了正在膨胀的线程。 在当前实现中,我们使用了一个两步机制,首先使用CAS()进行膨胀,然后再次使用CAS()尝试将_owner从NULL摆动到Self。 一个我们可以从fast_enter()和slow_enter()中调用的inflateTry()方法将会很有用。
assert (mark->is_neutral(), "invariant"); // 确保给定的mark是中性的
ObjectMonitor * m = omAlloc (Self) ; // 分配一个ObjectMonitor对象,并将其赋值给变量m。
// prepare m for installation - set monitor to initial state
// 对m进行初始化操作,设置monitor的初始状态。
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
// cas 的 方式进行 将m对象的编码值与object的mark地址进行比较交换,如果比较交换失败,则重试。
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
// 如果比较交换成功,代码执行一些性能计数器的操作,并输出一些跟踪信息
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there's no chance of
// live-lock -- "Inflated" is an absorbing state.
// 干扰 - 标记字(markword)已更改 - 只需重试。 状态转换是单向的,因此不会发生活锁(live-lock)- "Inflated"是一个吸收状态。
}
// Hopefully the performance counters are allocated on distinct
// cache lines to avoid false sharing on MP systems ...
// 检查ObjectMonitor::_sync_Inflations是否为NULL,如果不为NULL,则调用inc()方法增加其计数
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
}
omAlloc
// 进行 分配对象监视器(ObjectMonitor)
ObjectMonitor * ATTR ObjectSynchronizer::omAlloc (Thread * Self) {
// A large MAXPRIVATE value reduces both list lock contention
// and list coherency traffic, but also tends to increase the
// number of objectMonitors in circulation as well as the STW
// scavenge costs. As usual, we lean toward time in space-time
// tradeoffs.
// 较大的MAXPRIVATE值可以减少列表锁争用和列表一致性流量,但也倾向于增加正在使用的objectMonitor数量以及STW(停顿式垃圾回收)的成本。通常情况下,我们在时间和空间之间进行权衡。
const int MAXPRIVATE = 1024 ;
for (;;) {
ObjectMonitor * m ;
/*
1:尝试从线程的本地omFreeList分配。
线程首先尝试从其本地列表分配,然后从全局列表分配,只有在这些尝试失败后,线程才会尝试实例化新的监视器。
尝试从线程的本地空闲列表(omFreeList)中分配对象监视器。如果本地列表不为空,则从本地列表中取出一个对象监视器,并更新相关的指针和计数
线程本地的空闲列表减轻了ListLock的负担,提高了分配延迟,并减少了共享全局列表上的一致性流量。
*/
m = Self->omFreeList ;
if (m != NULL) {
Self->omFreeList = m->FreeNext ; // 自由列表的头指针更新为 m 的下一个节点,即移除了一个对象
Self->omFreeCount -- ; // 自由列表中可用对象的数量减一
// CONSIDER: set m->FreeNext = BAD -- diagnostic hygiene
guarantee (m->object() == NULL, "invariant") ;
if (MonitorInUseLists) { // 如果开启了监视器使用列表
// 则将对象 m 添加到使用列表中,并更新使用列表的头指针和计数。
m->FreeNext = Self->omInUseList;
Self->omInUseList = m;
Self->omInUseCount ++;
// verifyInUse(Self);
} else {
//如果没有开启监视器使用列表,则将对象 m 的 FreeNext 属性设置为 NULL。
m->FreeNext = NULL;
}
return m ;
}
/*
2:尝试从全局gFreeList分配。 考虑使用muxTry()而不是muxAcquire()。
如果muxTry()失败,则立即进入第3种情况。
如果我们正在使用线程本地的空闲列表,则尝试重新为调用者提供空闲列表。
如果本地列表为空,再尝试从全局空闲列表(gFreeList)中分配对象监视器。如果全局列表不为空,则获取一个互斥锁(ListLock),从全局列表中取出一定数量的对象监视器,并进行相应的处理。
*/
if (gFreeList != NULL) {
// 重新为线程提供omFreeList。 使用批量传输来减少分配速率和对各种锁的负荷。
Thread::muxAcquire (&ListLock, "omAlloc") ;
// 释放一定数量的对象监视器
for (int i = Self->omFreeProvision; --i >= 0 && gFreeList != NULL; ) {
//首先将MonitorFreeCount减1,表示释放了一个对象监视器
MonitorFreeCount --;
//从gFreeList中取出一个对象监视器,并将gFreeList指向下一个对象监视器。
ObjectMonitor * take = gFreeList ;
gFreeList = take->FreeNext ;
// 验证取出的对象监视器的状态是否符合预期
guarantee (take->object() == NULL, "invariant") ;
guarantee (!take->is_busy(), "invariant") ;
// 对取出的对象监视器进行回收
take->Recycle() ;
// 调用omRelease(Self, take, false)方法释放对象监视器。
omRelease (Self, take, false) ;
}
// 释放一个互斥锁(ListLock)
Thread::muxRelease (&ListLock) ;
//然后对Self->omFreeProvision进行增加操作,增加的值是 1加上Self->omFreeProvision除以2的结果
Self->omFreeProvision += 1 + (Self->omFreeProvision/2) ;
// 如果Self->omFreeProvision的值超过了MAXPRIVATE,就将其设置为MAXPRIVATE。
if (Self->omFreeProvision > MAXPRIVATE ) Self->omFreeProvision = MAXPRIVATE ;
// 触发一个名为omFirst - reprovision的事件(TEVENT)。
TEVENT (omFirst - reprovision) ;
const int mx = MonitorBound ;
if (mx > 0 && (MonitorPopulation-MonitorFreeCount) > mx) {
//由于我们的线程状态可能不适合进行STW(停顿式垃圾回收)安全点的操作,并且调用者可能持有裸露的oops(对象指针),因此我们不能在omAlloc()中安全地引发STW安全点,所以我们推迟了这个操作。
InduceScavenge (Self, "omAlloc") ;
}
continue;
}
/*
3:分配一块新的ObjectMonitors。
本地和全局的空闲列表都为空 - 使用malloc()进行分配。
在当前的实现中,objectMonitors是TSM(Thread-Specific Memory) - 不可释放的。
如果全局列表也为空,则通过new操作符分配一块新的对象监视器内存块(ObjectMonitor[_BLOCKSIZE]),并进行相应的初始化操作
*/
assert (_BLOCKSIZE > 1, "invariant") ;
ObjectMonitor * temp = new ObjectMonitor[_BLOCKSIZE];
// 注意:如果分配失败,几乎没有办法恢复。 我们可能能够引发STW安全点并回收足够的objectMonitors以允许进展。
if (temp == NULL) {
// 在虚拟机中分配一块内存来存储ObjectMonitor对象
vm_exit_out_of_memory (sizeof (ObjectMonitor[_BLOCKSIZE]), OOM_MALLOC_ERROR,
"Allocate ObjectMonitors");
}
// 将新分配的内存块添加到全局块列表(gBlockList)中,并将其链接到全局空闲列表(gFreeList)中。
// 格式化该块。
// 初始化链表,每个监视器指向其下一个,形成单链表的空闲列表,第一个监视器将指向下一个块,形成块列表。
// 使用块中的第一个元素作为gBlockList链接的技巧应该重新考虑。
// 更好的实现方式可以是:class Block { Block * next; int N; ObjectMonitor Body [N]; }
for (int i = 1; i < _BLOCKSIZE ; i++) {
temp[i].FreeNext = &temp[i+1];
}
// 将最后一个监视器作为列表的结束终止
temp[_BLOCKSIZE - 1].FreeNext = NULL ;
// Element元素[0]保留用于全局列表链接
temp[0].set_object(CHAINMARKER);
// 考虑从当前块中切除该线程的当前请求。这样可以避免一些锁的流量和冗余的列表操作。
// 获取ListLock以操作BlockList和FreeList。
Thread::muxAcquire (&ListLock, "omAlloc [2]") ; //使用了Thread类的muxAcquire函数来获取一个名为ListLock的互斥锁
MonitorPopulation += _BLOCKSIZE-1;
MonitorFreeCount += _BLOCKSIZE-1;
// 将新的块添加到现有块的列表(gBlockList)中。块中的第一个objectMonitor被保留并专用。它用作块列表的"next"链接。
temp[0].FreeNext = gBlockList;
// gBlockList存在无锁使用,因此确保在更新gBlockList之前先进行先前的存储操作。
// 释放(release)gBlockList指针,将其指向temp所指向的内存地址
OrderAccess::release_store_ptr(&gBlockList, temp);
// 分配一块监视器的内存块,并将其加入到全局的空闲监视器列表中。
temp[_BLOCKSIZE - 1].FreeNext = gFreeList ;
gFreeList = temp + 1;
Thread::muxRelease (&ListLock) ;
TEVENT (Allocate block of monitors) ;
}
}
openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp
enter
方法则为重量级锁的入口源码如下
enter
monitor 竞争过程
- 通过
CAS
尝试把monitor
的owner
字段设置为当前线程。 - 如果设置之前的
owner
指向当前线程,说明当前线程再次进入monitor
,即重入锁执行recursions ++
, 记录重入的次数
- 如果当前线程是
第一次
进入该monitor
, 设置recursions 为 1
,_owner 为当前线程,该线程成功获得锁并返回。 - 如果获取锁失败,则等待锁的释放。
void ATTR ObjectMonitor::enter(TRAPS) {
// 下面的代码是为了首先检查最常见的情况,并减少在SPARC和IA32处理器上的RTS->RTO缓存线升级。
Thread * const Self = THREAD ;
void * cur ;
// 通过 CAS 操作尝试把 monitor 的 _owner 字段设置为当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
// 如果cur为NULL,表示原来的_owner为NULL,即未被其他线程拥有
if (cur == NULL) {
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// 考虑:设置或断言OwnerIsThread == 1
return ;
}
// 如果cur等于Self,表示原来的_owner为当前线程Self。代码执行以下操作
// 线程重入,recursions++
if (cur == Self) {
_recursions ++ ;
return ;
}
// 如果当前线程已经持有cur锁,说明是重入,设置_recursions为1,_owner为当前线程
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
// 真正的锁争用情况
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
// 在排队Self和经历尴尬和昂贵的状态转换之前,尝试一轮旋转。下面的旋转是严格可选的…
// 注意,如果我们从初始旋转中获取监视器,我们将放弃发布JVMTI事件并触发DTRACE探测。
// 尝试自旋获取锁
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
assert (_owner != Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (Self->is_Java_thread() , "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
assert (jt->thread_state() != _thread_blocked , "invariant") ;
assert (this->object() != NULL , "invariant") ;
assert (_count >= 0, "invariant") ;
// 在STW-time 防止锁膨胀。参见deflate_idle_monitors()和is_busy()。
// 确保对象-监视器关系在存在争用时保持稳定。
// 开始阻塞等待获取锁
Atomic::inc_ptr(&_count);
EventJavaMonitorEnter event;
{ // 更改 Java 线程的状态为阻塞状态,并在进入监视器时触发 DTrace 监视器事件和 Jvmti 监视器争用进入通知
// 创建一个 JavaThreadBlockedOnMonitorEnterState 对象 jtbmes,并将当前的 Java 线程 jt 和监视器对象 this 作为参数传入。这将更改线程的状态为阻塞状态,并将线程与监视器绑定。
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
// 如果启用了 DTrace 监视器探测,并且定义了 DTRACE_MONITOR_PROBE 宏,将会触发一个名为 contended__enter 的监视器事件,并传入当前监视器对象、关联的对象以及 Java 线程。
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
// 如果满足 JvmtiExport 导出类的条件,即需要在监视器争用进入时进行通知,那么将调用 JvmtiExport::post_monitor_contended_enter(jt, this) 方法。这将向 Jvmti 代理导出的工具发送一个监视器争用进入的通知
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
}
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
Self->set_current_pending_monitor(this);
// 无限循环,直到获取到锁
for (;;) {
jt->set_suspend_equivalent();
// 由 handle_special_suspend_equivalent_condition() or java_suspend_self() 清除
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
// 我们已经获得了争用的监视器,但是当我们等待的时候,另一个线程挂起了我们。我们不希望在挂起时进入监视器,因为这会让挂起我们的线程感到惊讶。
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
// 释放对锁的争用
Atomic::dec_ptr(&_count);
assert (_count >= 0, "invariant") ;
Self->_Stalled = 0 ;
// Must either set _recursions = 0 or ASSERT _recursions == 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
//触发 DTrace 监视器事件和 Jvmti 监视器争用进入完成通知,并进行相关的事件处理和计数。
// 线程(现在是所有者)返回vm模式。
// 通过TI,DTrace和jvmstat报告光荣的消息。
// 探测效果是非常重要的。所有的报道都发生在我们拿着监视器的时候,增加了临界区域的长度。Amdahl的平行加速定律开始生动地发挥作用。
// 另一种选择可能是聚合事件(线程本地聚合或每个监视器聚合),并将报告推迟到更合适的时间——例如下一次某个线程遇到争用但尚未获得锁的时候。当线程可以旋转时,我们可以增加JVMStat计数器,等等。
// 发送监视器争用进入完成事件
DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_entered()) {
JvmtiExport::post_monitor_contended_entered(jt, this);
}
if (event.should_commit()) {
event.set_klass(((oop)this->object())->klass());
event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid);
event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr()));
event.commit();
}
if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
}
}
首先尝试通过CAS操作获取锁,如果成功则直接返回。如果锁已经被当前线程持有,则增加重入计数并返回。如果锁被其他线程持有,则尝试自旋获取锁,如果失败则阻塞等待获取锁。
EnterI
monitor 等待
- 当前线程被封装成
ObjectWaiter
对象node
,状态设置成ObjectWaiter::TS_CXQ
。 - for 循环通过
CAS
把node
节点push
到_cxq
列表中,同一时刻可能有多个线程把自己的node
节点push
到_cxq
列表中。 node
节点push
到_cxq
列表之后,通过自旋尝试获取锁,如果还是没有获取到锁则通过park
将当前线程挂起等待被唤醒。- 当该线程被唤醒时会从挂起的点继续执行,通过
ObjectMonitor::TryLock
尝试获取锁。
void ATTR ObjectMonitor::EnterI (TRAPS) {
// 获取当前线程
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
// 尝试获取锁 - TATAS (Test And Test And Set)
if (TryLock (Self) > 0) {
// // 如果成功获取锁,进行一系列的断言检查
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
如果无法获取锁,进行初始化
DeferredInitialize () ;
// 我们在将 Self 入队之前尝试一轮旋转。
// 如果 _owner 已准备好但 OFFPROC,我们可以使用 YieldTo() 操作将该线程的剩余量捐赠给所有者。这具有微妙但有益的亲和力效应。
在将线程放入队列前,尝试一次自旋锁
if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
如果自旋失败,将线程放入队列并挂起
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
// 将“Self”放入 ObjectMonitor 的 _cxq 中。
// Node 充当 Self 的代理。
// 顺便说一句,如果主要用 Java 重写同步代码,WaitNodes、ObjectMonitors 和 Events 将成为第一类 Java 对象。这将避免尴尬的生命周期和活性问题,
// 以及消除 ABA 问题的子集。
// TODO:消除 ObjectWaiter 并将线程或事件放入队列。
创建一个ObjectWaiter节点,将当前线程封装在此节点中
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
// 将“Self”推到 _cxq 的前面。
// 一旦进入 cxq/EntryList,Self 就会保留在队列中,直到获得锁。
// 请注意,旋转往往会降低线程在 EntryList|cxq 上入队和出队的速率。
/ 创建一个ObjectWaiter指针nxt用于保存队列的头节点
ObjectWaiter * nxt ;
// // 接下来的循环用于将当前线程放入等待队列:
/*将_cxq赋值给nxt,并将nxt赋值给node._next。
使用Atomic::cmpxchg_ptr原子操作尝试将node放入等待队列。
如果CAS操作成功(返回值等于nxt),则跳出循环。
如果CAS操作失败,说明等待队列的头节点已经改变,需要再次尝试。作为一种优化,尝试获取锁。
如果成功获取锁,则断言确保不变量成立,并返回。
*/
for (;;) {
// // 获取当前等待队列的头节点
node._next = nxt = _cxq ;
使用CAS操作将当前线程放入等待队列
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// 如果CAS操作失败,说明等待队列的头节点已经改变,需要再次尝试;作为一种优化,我们再次尝试获取锁
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
// 检查 cxq|EntryList 边缘是否转换为非空。这表明争用的开始。当争用持续存在时,退出线程将使用 ST:MEMBAR:LD 1-1 退出协议。当争用减少时,退出操作恢复到更快的 1-0 模式。此进入操作可能会交错(竞争)并发的 1-0 退出操作,从而导致搁浅,因此我们安排其中一个竞争线程使用定时 Park() 操作来检测竞争并从竞争中恢复。 (搁浅是一种进度失败的形式,其中监视器已解锁,但所有竞争线程仍处于停放状态)。
// 也就是说,至少有一个竞争线程会定期轮询 _owner。
// 竞争线程之一将成为指定的“负责”线程。
// Responsible 线程使用定时停放而不是正常的无限期停放操作 - 它定期唤醒并检查 1-0 退出操作承认的潜在搁浅并从中恢复。在任何给定时刻,每个监视器最多需要一个负责线程。只有 cxq|EntryList 上的线程可以负责监视器。
// 当前,竞争线程之一承担了“负责”的附加角色。
// 一个可行的替代方案是使用专用的“搁浅检查器”线程,该线程定期迭代所有线程(或活动监视器)以及存在搁浅风险的未停放的后继线程。这将有助于消除我们在某些平台上看到的计时器可扩展性问题,因为我们只有一个线程(检查器)停放在计时器上。
如果等待队列为空,且当前线程是唯一一个等待获取锁的线程,那么尝试将当前线程设为负责线程
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
// 尝试使用CAS操作将当前线程设为负责线程
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// 当该线程占用将自身排队到 _cxq 时,锁已被释放。为了结束比赛并避免“搁浅”和进度活跃失败,我们必须在停车前重新采样重试_owner。注意 Dekker/Lamport 对偶性:ST cxq;会员; LD 所有者。在这种情况下,ST-MEMBAR 通过 CAS() 完成。
// TODO:将所有线程状态转换推迟到停放时间。
// 由于状态转换繁重且效率低下,我们希望将状态转换推迟到绝对必要时,并在这样做时避免一些转换...
// 记录锁的争用情况
TEVENT (Inflated enter - Contention) ;
初始化唤醒次数和重新检查间隔
int nWakeups = 0 ;
int RecheckInterval = 1 ;
/ 循环尝试获取锁,直到成功为止
for (;;) {
// // 尝试获取锁
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
// // 如果当前线程不是负责线程,那么尝试将当前线程设为负责线程
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
如果当前线程是负责线程,那么阻塞一段时间后再次尝试获取锁
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
// // 增加重新检查间隔,但是限制其最大值为1000
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
如果当前线程不是负责线程,那么直接阻塞,等待被唤醒
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
// // 如果阻塞唤醒后能够获取到锁,那么直接退出循环
if (TryLock(Self) > 0) break ;
// 锁定仍然存在争议。
// 记录无效唤醒的数量。
// 请注意,计数器不受锁保护或由原子更新。
// 这是设计使然 - 我们交易“有损”计数器,这些计数器在更新期间暴露于竞争以获得较低的探测效果。
记录无效唤醒事件
TEVENT (Inflated enter - Futile wakeup) ;
如果存在无效唤醒计数器,增加计数
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
/ 增加唤醒次数
++ nWakeups ;
// 假设这不是虚假唤醒,我们通常会发现 _succ == Self。
//我们可以推迟清除 _succ 直到旋转完成后 TrySpin() 必须容忍以 _succ == Self 的方式调用。
//尝试另一轮自适应旋转。
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
//我们可以发现我们在旋转时被 unpark()ed 并重新指定为 _succ。那是无害的。如果我们迭代并调用park(),park()将消耗事件并立即返回,我们将再次旋转。此模式可以重复,让 _succ 在 CPU 上简单地旋转。启用 Knob_ResetEvent 以清除挂起的 unparks()。或者,我们可以在这里对fired()进行采样,如果设置了,则在下一次迭代中放弃旋转。
// 如果Knob_ResetEvent标志位为1且Self的park事件已经触发
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
// 重置Self的park事件
Self->_ParkEvent->reset() ;
// 创建内存屏障,保证之后的操作不会被重排序到屏障之前
OrderAccess::fence() ;
}
// 如果当前线程是下一个应该被唤醒的线程,清除_succ标志
if (_succ == Self) _succ = NULL ;
// 创建内存屏障,保证之后的操作不会被重排序到屏障之前
OrderAccess::fence() ;
}
/* 出口:
Self 已获取锁 - 将 Self 与 cxq 或 EntryList 取消链接。
通常我们会在 EntryList 中找到 Self 。
从锁拥有者(本线程)的角度来看,
EntryList 是稳定的,cxq 是仅前置的。
cxq的头部波动较大,但内部却很稳定。
另外,Self.TState 是稳定的。
*/
// 断言当前线程是锁的拥有者
assert (_owner == Self , "invariant") ;
// 断言锁关联的对象不为空
assert (object() != NULL , "invariant") ;
// 从等待队列中移除当前线程
UnlinkAfterAcquire (Self, &node) ;
// 如果当前线程是下一个应该被唤醒的线程,清除_succ标志
if (_succ == Self) _succ = NULL ;
// 断言当前线程不是下一个应该被唤醒的线程
assert (_succ != Self, "invariant") ;
// 如果当前线程是负责线程,清除_Responsible标志
if (_Responsible == Self) {
_Responsible = NULL ;
// 创建内存屏障,保证之后的操作不会被重排序到屏障之前
OrderAccess::fence(); // Dekker pivot-point
}
/*
我们可以在 cxq|EntryList 上留下没有指定“负责”线程的线程。这是良性的。当该线程随后退出监视器时,它可以通过 LDing cxq|EntryList“看到”此类预先存在的“旧”线程——在上面的栅栏之前到达 cxq|EntryList 的线程。新到达的线程——即上面 ST:MEMBAR 之后到达 cxq 的线程——将设置 Responsible 非空并选择一个新的“Responsible”计时器线程。
该线程执行:
ST负责=null; MEMBAR(在此处输入结语)
LD cxq|EntryList(在随后的退出中)
在慢速/竞争路径中输入线程执行:
ST cxq=非空;会员; LD 负责人(在输入序言中)
(ST cxq; MEMBAR) 通过 CAS() 完成。
上面的 MEMBAR 防止后续退出操作中 cxq|EntryList 的 LD 浮动到 ST Responsible=null 之上。
*/
}
// 我们已经获得了 CAS() 的所有权。
// CAS 正在序列化——它具有与 MEMBAR/FENCE 等效的语义。但由于 CAS(),该线程可能也存储到 _succ、EntryList、cxq 或 Responsible 中。这些元数据更新必须在该线程随后释放锁之前可见。考虑一下如果我们不强制执行此约束会发生什么——监视元数据和用户数据的 ST 可能会随着退出中的 ST 重新排序(在退出后变得可见),从而放弃锁的所有权。然后,其他一些线程可以获取锁,但观察到不一致或旧的监视器元数据和堆数据。这违反了 JMM。
// 为此,1-0 exit() 操作必须至少具有 STST|LDST“释放”屏障语义。具体来说,在将 null 放入 _owner 并释放锁的 ST 之前,exit() 中必须至少有一个 STST|LDST 屏障。屏障确保在我们释放锁之前,因此在其他线程 (CPU) 有机会获取锁之前,监控元数据和受锁保护的数据的更改将是可见的。
// 另请参阅:http://gee.cs.oswego.edu/dl/jmm/cookbook.html。
//至关重要的是,任何先前的 _succ 或 EntryList 的 ST 必须在 *后续*(后续)相应的 MonitorExit 中的 _owner 中的 null ST 之前可见。还记得,在 1-0 模式下,monitorexit 不一定执行序列化指令。
// 如果SyncFlags的第8位为1,创建内存屏障
if (SyncFlags & 8) {
OrderAccess::fence() ;
}
// 退出方法
return ;
}
exit
monitor 释放
当某个持有锁的线程执行完同步代码块时,会释放锁并 unpark
后续线程。
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
获取当前线程
Thread * Self = THREAD ;
如果当前线程不是锁的拥有者
if (THREAD != _owner) {
// 如果当前线程拥有锁的所有权
if (THREAD->is_lock_owned((address) _owner)) {
// 将 _owner 从 BasicLock 指针转换为线程地址。
// 我们不需要为此转换保留 _mutex。
// 只要所有读者都能容忍其中任何一种风格,非空到非空都是安全的
assert (_recursions == 0, "invariant") ;
// 将锁的拥有者设置为当前线程
_owner = THREAD ;
// 将递归锁的层数设置为0
_recursions = 0 ;
// 将OwnerIsThread标志设置为1
OwnerIsThread = 1 ;
} else {
//注意:我们需要通过抛出异常来处理本机代码中不平衡的监视器进入/退出。
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
/ 如果上述断言失败,抛出IllegalMonitorStateException异常
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
如果当前线程是锁的拥有者,并且递归锁的层数不为0
if (_recursions != 0) {
// 递归锁的层数减1
_recursions--; // this is simple recursive enter
// 记录递归退出事件
TEVENT (Inflated exit - recursive) ;
return ;
}
// 不变:设置 Responsible=null 后,线程必须在获取 EntryList|cxq 之前执行 MEMBAR 或其他序列化指令。
如果SyncFlags的第4位为0(表示没有其他线程等待获取锁),将负责线程设置为null
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
#if INCLUDE_TRACE
// 如果启用了 MonitorEnter 事件且线程未挂起,则获取该事件的所有者的线程 ID
if (not_suspended && Tracing::is_event_enabled(TraceJavaMonitorEnterEvent)) {
_previous_owner_tid = SharedRuntime::get_java_tid(Self);
}
#endif
无限循环,直到成功释放锁并唤醒后续线程
for (;;) {
// 当前线程是持有该对象监视器的所有者
assert (THREAD == _owner, "invariant") ;
// 根据ExitPolicy的值选择不同的退出策略
if (Knob_ExitPolicy == 0) {
/* 释放语义:关键部分内的先前加载和存储不得浮动(重新排序)超过删除锁的后续存储。
在 SPARC 上,需要 MEMBAR #loadstore|#storestore。
但当然在 TSO 中 #loadstore|#storestore 不是必需的。
我想写以下内容之一:
A. OrderAccess::release() ; _所有者=空
B.OrderAccess::loadstore(); OrderAccess::storestore(); _所有者=空;
不幸的是 OrderAccess::release() 和 OrderAccess::loadstore() 两者
存储到 _dummy 变量中。该存储不是必需的,但可能会导致经典 SMP 系统上的大量浪费的一致性流量。
相反,我使用release_store(),它在 x64、x86 和 SPARC 上仅作为简单 ST 实现。
*/
释放对象监视器。然后使用 OrderAccess::storeload() 操作来保证前面的加载和存储操作不会被重排序。最后,根据条件判断是否需要唤醒后继线程
// 释放锁
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
/ 查看是否需要唤醒后继线程
OrderAccess::storeload() ; // See if we need to wake a successor
// 如果EntryList和cxq都为空,或者已经有后继线程,那么退出方法
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
/ 记录复杂的退出事件
TEVENT (Inflated exit - complex egress) ;
/*
通常,退出线程负责确保继承,但如果其他后继者已准备好或其他进入线程正在旋转,则该线程可以简单地将 NULL 存储到 _owner 中并退出,而无需唤醒后继者。旋转者或准备好的后继者的存在保证了适当的继承(活跃性)。责任转移至
准备好或正在运行的继任者。退出线程委托该职责。更准确地说,如果后继者已经存在,则该线程就免除了唤醒(取消停放)后继者的责任。 _succ 变量对于减少无效唤醒频率至关重要。 _succ 标识已准备好(未停放)但尚未运行的“假定继承人”线程。我们只需要一个这样的后继线程来保证进度。请参阅http://www.usenix.org/events/jvm01/full_papers/dice/dice.pdf
有关详细信息,请参阅第 3.3 节“无效唤醒限制”。请注意,Enter() 中的微调器还将 _succ 设置为非空。在当前的实现中,旋转器会机会性地设置 _succ,以便退出线程可以避免唤醒后继线程。另一个不太吸引人的替代方案是退出线程放弃锁,然后短暂旋转以查看旋转器是否设法获取锁。如果是这样,退出线程可以立即退出而不唤醒后继者,否则退出线程将需要出队并唤醒后继者。(请注意,我们需要使后删除旋转较短,但不短于最坏的情况-这种情况是往返缓存行迁移时间。被删除的锁需要对旋转器可见,然后旋转器获取锁必须对退出线程可见)。看来必须准备好一位推定继承人(继任者)了。只有当前锁的所有者才能操作EntryList或drain _cxq,因此我们需要重新获取锁。如果我们无法重新获得锁,那么确保继承的责任就落到了新的所有者身上。
*/
尝试将_owner设置为当前线程,如果成功,说明当前线程重新获取了锁,直接退出方法
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
记录重新获取锁的事件
TEVENT (Exit - Reacquired) ;
} else {
如果EntryList和cxq都为空,或者已经有后继线程
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
// // 释放锁
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
/// // 查看是否需要唤醒后继线程
OrderAccess::storeload() ;
// Ratify the previously observed values.
如果cxq为空或者已经有后继线程,那么退出方法
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
/*不合时宜的交错——退出线程(本线程)快速退出路径中的线程与慢速进入路径中的进入线程竞争。我们有两个选择:
A. 尝试重新获取锁。如果 CAS() 失败,则立即返回,否则我们要么重新启动/重新运行退出操作,要么简单地进入下面的代码,唤醒后继者。
B. 如果形成 EntryList|cxq 的元素是 TSM,我们可以简单地 unpark() 引导线程并返回,而无需设置 _succ。*/
/// // 尝试将_owner设置为当前线程,如果成功,说明当前线程重新获取了锁,直接退出方法/
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
// 记录重新获取锁失败的事件
TEVENT (Inflated exit - reacquired failed) ;
} else {
记录复杂的退出事件
TEVENT (Inflated exit - complex egress) ;
}
}
/ 确保当前线程是锁的拥有者
guarantee (_owner == THREAD, "invariant") ;
根据不同的队列模式(QMode)进行操作
定义一个ObjectWaiter对象w
ObjectWaiter * w = NULL ;
/ 获取队列模式
int QMode = Knob_QMode ;
/ 如果队列模式为2且cxq队列不为空
if (QMode == 2 && _cxq != NULL) {
// QMode == 2 :cxq 优先于 EntryList。尝试直接从 cxq 唤醒后继者。如果成功,继任者将需要取消与 cxq 的链接。
w = _cxq ;
// 断言w不为空
assert (w != NULL, "invariant") ;
// 断言w的状态是TS_CXQ
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
// 唤醒后继线程
ExitEpilog (Self, w) ;
return ;
}
/ 如果队列模式为3且cxq队列不为空
if (QMode == 3 && _cxq != NULL) {
// 一有机会就积极地将 cxq 引入 EntryList。此策略确保最近运行的线程位于 EntryList 的头部。将 _cxq 排入 EntryList - 批量传输。首先,分离_cxq。以下循环相当于: w = swap (&cxq, NULL)
//如果 QMode 为 3 或 4,并且 _cxq 不为空,则将 _cxq 中的线程转移到 _EntryList 中,并按照一定的顺序进行排列。
/ 首先,分离_cxq
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
/// // 将_cxq设置为NULL,成功后将返回旧的_cxq
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
// 如果cmpxchg成功,退出循环
if (u == w) break ;
/// // 否则,更新w为旧的_cxq,并继续循环
w = u ;
}
assert (w != NULL , "invariant") ;
/// 定义ObjectWaiter对象q和p,用于遍历cxq队列
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
/ // 遍历cxq队列,将所有线程的状态设为TS_ENTER,并链接到q
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// 将 RAT 添加到 EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.\
// 将cxq队列添加到EntryList队列
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// 陷入试图从 EntryList 中唤醒后继者的代码中
}
如果QMode等于4并且_cxq不为NULL,则进入下面的代码块。
if (QMode == 4 && _cxq != NULL) {
// 一有机会就积极地将 cxq 引入 EntryList。
// 此策略确保最近运行的线程位于 EntryList 的头部。
// 将 _cxq 排入 EntryList - 批量传输。
// 首先,分离 _cxq。
// 下面的循环相当于:w = swap (&cxq, NULL)
w = _cxq ;将_cxq赋值给变量w。
/循环地将_cxq从原来的位置分离出来,并将其赋值给变量w。cmpxchg_ptr是一个原子操作,将_cxq与NULL进行比较并交换,将结果赋值给u。如果u等于w,则跳出循环,否则将u赋值给w继续循环。
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
确保w不为NULL。
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
/将w中的元素逐个加入EntryList中。将每个元素的TState状态设置为ObjectWaiter::TS_ENTER,并维护前驱和后继关系
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
//将w作为新的头部加入EntryList,并维护相应的前驱和后继关系
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
// Fall thru into code that tries to wake a successor from EntryList
}
//将_EntryList赋值给变量w,如果w不为NULL,则断言w->TState等于ObjectWaiter::TS_ENTER,然后调用ExitEpilog(Self, w)函数并返回。
w = _EntryList ;
if (w != NULL) {
// 我想写:保证(w->_thread!= Self)。
// 但实际上,退出线程可能会发现自己位于 EntryList 上。
// 假设线程 T1 调用 O.wait()。 Wait() 将 T1 放入 O 的等待集中,然后调用 exit()。 Exit 通过将 O._owner 设置为 NULL 来释放锁。
// 假设 T1 然后停止。 T2 获取 O 并调用 O.notify()。 notification() 操作将 T1 从 O 的 waitset 移动到 O 的 EntryList。然后T2释放锁“O”。 T2 在 null 进入 _owner 的 ST 之后立即恢复,如上所述。 T2 注意到 EntryList 已被填充,因此它重新获取锁,然后发现自己位于 EntryList 上。
// 鉴于这一切,我们必须容忍“w”与 Self 相关联的情况。
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
// 如果我们发现 _cxq 和 EntryList 都为 null,那么只需从顶部重新运行退出协议即可。
/将_cxq赋值给变量w,如果w为NULL,则继续循环。循环地将_cxq从原来的位置分离出来,并将其赋值给变量w。cmpxchg_ptr是一个原子操作,将_cxq与NULL进行比较并交换,将结果赋值给u。如果u等于w,则跳出循环,否则将u赋值给w继续循环。
w = _cxq ;
if (w == NULL) continue ;
// 将 _cxq 排入 EntryList - 批量传输。 无限循环,用于将等待队列 _cxq 中的线程移除
// 首先,分离 _cxq。
// 下面的循环相当于:w = swap (&cxq, NULL)
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
assert (w != NULL , "invariant") ;
assert (_EntryList == NULL , "invariant") ;
// 将 _cxq 锚定的 LIFO SLL 转换为 DLL。
// 列表重组步骤的运行时间为 O(LENGTH(w))。
// 这一步必须快速运行,因为“Self”仍然持有外锁,限制并行性并有效地延长临界区。
// 不变式:s 追 t 追 u。
// TODO-FIXME:考虑将 EntryList 从 DLL 更改为 CDLL,以便我们可以更快地访问尾部。
//如果QMode等于1,则将_cxq中的元素逐个加入EntryList中,并且反转顺序。将每个元素的TState状态设置为ObjectWaiter::TS_ENTER,并维护前驱和后继关系。
//如果QMode不等于1,则将_cxq中的元素逐个加入EntryList中。将每个元素的TState状态设置为ObjectWaiter::TS_ENTER,并维护前驱和后继关系
if (QMode == 1) {
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
// 在1-0模式下我们需要:ST EntryList;MEMBAR #storestore; ST _owner = NULL MEMBAR 由 ExitEpilog() 中的release_store() 操作满足。
// 看看我们是否可以让位给微调器而不是唤醒线程。实现的主要目标是降低上下文切换率。
// 如果_succ不为NULL,则继续循环。
if (_succ != NULL) continue;
w = _EntryList ;
// w 不为 NULL,则表示存在等待队列中的后继线程。
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
//调用 ExitEpilog(Self, w) 函数来执行后续操作,包括唤醒后继线程。
ExitEpilog (Self, w) ;
return ;
}
}
}
CASE(_monitorexit)
简化版本
CASE(_monitorexit): {
oop lockee = STACK_OBJECT(-1); // 取栈顶对象作为锁对象
CHECK_NULL(lockee); // 空指针检查
// derefing's lockee ought to provoke implicit null check
// find our monitor slot
BasicObjectLock* limit = istate->monitor_base(); // 返回 istate 对象的监视器基地址
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); //返回 istate 对象的堆栈基地址
while (most_recent != limit ) {
// 它在当前线程的监视器栈中查找与锁对象相匹配的监视器槽,如果找到了匹配的监视器槽
if ((most_recent)->obj() == lockee) {
// 获取该对象的锁lock和被替换的头部header
BasicLock* lock = most_recent->lock();
markOop header = lock->displaced_header();
most_recent->set_obj(NULL);
// lockee对象的标记是否具有偏向模式 , 如果没有偏向模式,说明该对象的锁没有被偏向其他线程,需要进行释放操作。
if (!lockee->mark()->has_bias_pattern()) {
bool call_vm = UseHeavyMonitors; // call_vm 值取决于是否使用了重量级监视器(UseHeavyMonitors)。
// If it isn't recursive we either must swap old header or call the runtime
if (header != NULL || call_vm) {
// 调用: monitorexit
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
}
// //更新程序计数器(PC)和栈顶指针(TOS),并继续执行
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
//调用: throw_illegal_monitor_state_exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere(); // 触发一个异常或停止程序的执行
}
mointerexit
/*
退出监视器(释放锁)。
流程:
1. 验证监视器:在执行释放锁操作之前,会验证监视器的状态,确保操作的正确性。
2. 获取对象:从BasicObjectLock中获取被锁定的对象。
3. 确保对象是在堆中或为null:校验被锁定的对象是否在堆中,或者是否为null。如果不满足条件,会抛出`IllegalMonitorStateException`异常。
4. 慢路径释放锁:调用ObjectSynchronizer::slow_exit方法,在慢速路径下释放对象的锁。
5. 清除BasicObjectLock中的对象引用:在释放锁后,将BasicObjectLock中的对象引用设置为NULL。这样做是为了避免在异常处理代码中再次尝试解锁监视器。
6. 验证监视器:在释放锁操作完成后,再次验证监视器的状态,确保操作的正确性。
InterpreterRuntime::monitorexit方法的作用是**退出监视器(释放锁)**。它会验证监视器的状态,获取被锁定的对象,并在慢速路径下释放对象的锁。同时,还会清除BasicObjectLock中的对象引用,以避免在异常处理代码中再次尝试解锁监视器。
*/
// 退出监视器(释放锁)
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
// 验证监视器
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
// 从BasicObjectLock中获取对象
Handle h_obj(thread, elem->obj());
// 确保对象是在堆中或者为null
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),"must be NULL or an object");
// 如果BasicObjectLock为null或对象未被锁定,抛出IllegalMonitorStateException异常
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
// 慢路径释放锁
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// 清除BasicObjectLock中的对象引用。这必须在这里完成,因为在退出时可能会安装一个待处理的异常。
// 如果不清除,异常处理代码将再次尝试解锁监视器。
elem->set_obj(NULL);
#ifdef ASSERT
// 验证监视器
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
openjdk
根路径/hotspot/src/share/vm/runtime/synchronizer.cpp
路径下的synchronized.cpp
slow_exit
/*
流程:
调用fast_exit方法:在慢速路径下,直接调用fast_exit方法来释放对象的锁。fast_exit方法是一个快速路径,用于释放轻量级锁或中立状态的锁。
slow_exit方法的作用是在慢速路径下释放对象的锁,通过调用fast_exit方法来实现。这样可以确保在锁的释放过程中尽量使用快速路径,提高性能。
在slow_enter方法中,如果对象的标记字段是中性的,那么会尝试使用CAS操作设置锁。如果CAS操作失败,那么就进入锁膨胀。如果当前线程已经拥有锁,那么就直接返回。在slow_exit方法中,直接调用fast_exit方法即可释放锁。
*/
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
// 慢速路径释放锁,直接调用fast_exit即可
fast_exit (object, lock, THREAD) ;
}
fast_exit
/*
流程:
1. 检查偏向锁标记:首先,会检查对象的标记字段,判断对象是否有偏向锁的标记。
2. 获取displaced header:如果对象有偏向锁标记,并且存在displaced header(用于存储锁对象Mark Word的拷贝),则获取displaced header。
3. 处理递归加锁的情况:如果displaced header为空,说明之前的加锁是递归加锁,不需要进行任何操作,直接返回。
4. 进行快速释放锁:如果displaced header不为空,说明之前的加锁是轻量级锁。此时,会尝试使用CAS操作将displaced header和对象的标记字段替换回来。
- 如果CAS操作成功,表示成功释放了轻量级锁,直接返回。
- 如果CAS操作失败,表示有其他线程正在操作该对象,无法成功释放轻量级锁,此时会使用慢路径释放锁,调用ObjectSynchronizer::inflate方法来进行锁的膨胀处理。
fast_exit方法的作用是在轻量级锁状态下**快速释放对象的锁**。通过CAS操作将displaced header和对象的标记字段替换回来,如果CAS操作失败,则使用慢路径进行锁的释放处理。这样可以尽量使用快速路径来释放锁,提高性能。
在这个函数中,首先会检查对象是否有偏向锁的标记,然后获取displaced header。如果displaced header为空,那么之前的加锁是递归加锁,不需要做任何操作。否则,会尝试使用CAS操作将displaced header替换回对象的mark word。如果这个操作失败,那么就使用慢路径释放锁。
*/
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
// 从下面这个断言遍可得知:偏向锁不会进入快锁解锁方法。
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// displaced header是升级轻量级锁过程中,用于存储锁对象MarkWord的拷贝,官方为这份拷贝加了一个Displaced前缀。可参考:《深入理解Java虚拟机》第三版482页的介绍。
// 如果displaced header是空,先前的加锁便是重量级锁
// 获取displaced header,这是在升级为重量级锁过程中用于存储对象mark word的拷贝
markOop dhw = lock->displaced_header();
markOop mark ;
// 如果displaced header为空,那么之前的加锁是递归加锁,不需要做任何操作
if (dhw == NULL) {
// Recursive stack-lock. 递归堆栈锁
// Diagnostics -- Could be: stack-locked, inflating, inflated. 断定应该是:堆栈锁、膨胀中、已膨胀(重量级锁)
mark = object->mark();
// 确保mark不是中立的
assert (!mark->is_neutral(), "invariant");
// 如果mark有锁定者并且不是正在膨胀,那么确保当前线程拥有这个锁
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant");
}
// 如果mark有监视器,那么确保这个监视器是由当前线程进入的
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor();
assert(((oop)(m->object()))->mark() == mark, "invariant");
assert(m->is_entered(THREAD), "invariant");
}
return;
}
// 获取对象的mark word
mark = object->mark() ; // 锁对象头的MarkWord
// 此处为轻量级锁的释放过程,使用CAS方式解锁(下述方法中的cmpxchg_ptr即CAS操作)。
// 如果对象被当前线程堆栈锁定,请尝试将displaced header和锁对象中的MarkWord替换回来。
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
// 如果快速释放锁失败,使用慢路径释放锁
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
inflate exit
方法见上面
其他方法
wait
wait等待
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;//用于延迟初始化
// Throw IMSX or IEX.
CHECK_OWNER();
EventJavaMonitorWait event;
/*
用于检查中断状态,并在需要的情况下触发中断异常,并发布相应的监视器等待事件。
*/
// check for a pending interrupt
如果 interruptible 为 true,并且当前线程 Self 被中断且没有未处理的异常,则进入条件判断。
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
// post monitor waited event. Note that this is past-tense, we are done waiting.
如果需要在 JvmtiExport 中发布监视器等待事件,则调用 JvmtiExport::post_monitor_waited(jt, this, false),将事件标记为等待完成(过去时)。
if (JvmtiExport::should_post_monitor_waited()) {
// Note: 'false' parameter is passed here because the
// wait was not timed out due to thread interrupt.
JvmtiExport::post_monitor_waited(jt, this, false);
}
如果需要提交事件,则调用 post_monitor_wait_event(&event, 0, millis, false),将事件提交。
if (event.should_commit()) {
post_monitor_wait_event(&event, 0, millis, false);
}
记录事件 TEVENT (Wait - Throw IEX)。
TEVENT (Wait - Throw IEX) ;
抛出 java.lang.InterruptedException 异常。
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
记录事件
TEVENT (Wait) ;
使用断言确保 _Stalled 值为0,以确保在设置新的 _Stalled 值之前没有其他线程在等待。
assert (Self->_Stalled == 0, "invariant") ;
将当前对象监视器的地址(this)转换为 intptr_t 类型,并将其赋值给 _Stalled。
Self->_Stalled = intptr_t(this) ;
使用 jt->set_current_waiting_monitor(this) 将当前对象监视器设置为当前线程的等待监视器。
jt->set_current_waiting_monitor(this);
// 创建一个节点放入队列至关重要的是,在我们重置()事件之后,但在park()之前,我们必须检查是否有挂起的中断。
创建一个 ObjectWaiter 对象 node,并传入当前线程 Self。
ObjectWaiter node(Self);
将 node 的状态设置为等待状态(ObjectWaiter::TS_WAIT)。
node.TState = ObjectWaiter::TS_WAIT ;
调用 _ParkEvent 事件对象的 reset() 方法,将其重置为初始状态。
Self->_ParkEvent->reset() ;
使用 OrderAccess::fence() 进行内存屏障操作,确保前面的写入操作对其他线程可见。
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// 输入等待队列,在这种情况下,它是一个循环的双链表,但它可以是一个优先级队列或任何数据结构。
//_WaitSetLock保护等待队列。通常情况下,等待队列只由监视器的所有者访问*,但在park()由于中断超时而返回的情况下除外。争用是非常罕见的,所以我们使用一个简单的旋转锁,而不是一个更重的重量阻塞锁。
/*
将等待节点添加到等待队列中,并更新了一些状态和计数,最后退出对象监视器。
*/
使用自旋锁 Thread::SpinAcquire 保护对 _WaitSetLock 的访问,以添加等待节点之前获取锁。
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
调用 AddWaiter(&node) 将等待节点 node 添加到等待队列中。
AddWaiter (&node) ;
使用 Thread::SpinRelease 释放 _WaitSetLock 锁,完成添加等待节点的操作。
Thread::SpinRelease (&_WaitSetLock) ;
如果 SyncFlags 的第4位为0,将 _Responsible 设置为 NULL。
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
将 _recursions 的值保存到 save 中,记录旧的递归计数。
intptr_t save = _recursions; // record the old recursion count
将 _waiters 的值增加1,表示有一个线程在等待。
_waiters++; // increment the number of waiters
将 _recursions 的值设置为0,表示递归层级为1。
_recursions = 0; // set the recursion level to be 1
调用 exit(true, Self) 退出对象监视器。
exit (true, Self) ; // exit the monitor
使用断言确保当前线程 Self 不是对象监视器的所有者。
guarantee (_owner != Self, "invariant") ;
//一旦在上面的exit()调用中删除了ObjectMonitor的所有权,另一个线程就可以进入()ObjectMonitor,执行notify(),并退出()ObjectMonitor。如果另一个线程的exit()调用选择该线程作为后续线程,而unpark()调用恰好发生在该线程发布MONITOR_CONTENDED_exit事件时,则我们将使用RawMonitors运行事件处理程序并使用unpark的风险。为了避免这个问题,我们重新发布该事件。即使原始的unpark()没有被使用,这也没有害处,因为我们是该监视器的选定继任者。
//封装了一些状态转换操作,包括线程等待和检查外部挂起状态等
判断等待节点 node 的 _notified 是否不为0,即等待节点是否已被通知。 判断当前线程 Self 是否是等待节点的后继节点 _succ。
if (node._notified != 0 && _succ == Self) {
如果上述两个条件都满足,则调用 _event 的 unpark() 方法,对线程进行唤醒操作。
node._event->unpark();
}
// 线程在WaitSet列表上,现在将其park()。
//在多处理器系统上,在park()之前进行一次短暂的自旋可能是有益的。
//需要注意的是,以下逻辑应该改为以下形式的循环: while (!timeout && !interrupted && _notified == 0) park()
定义变量 ret 并初始化为 OS_OK。
int ret = OS_OK ;
定义变量 WasNotified 并初始化为0。
int WasNotified = 0 ;
进入一个代码块,用于封装状态转换。
{ // State transition wrappers
获取当前线程的 osthread 对象。
OSThread* osthread = Self->osthread();
创建一个 OSThreadWaitState 对象 osts,将 osthread 和 true 传入构造函数,用于设置线程的等待状态。
OSThreadWaitState osts(osthread, true);
进入一个代码块,用于封装在虚拟机中阻塞线程的操作。
{
使用 ThreadBlockInVM 对象 tbivm,将 jt 传入构造函数,用于在虚拟机中阻塞线程,并确保访问对象时的安全性。
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
调用 jt->set_suspend_equivalent() 将线程设置为等效挂起状态
jt->set_suspend_equivalent();
如果可中断且线程被中断(Thread::is_interrupted(THREAD, false))或存在待处理的异常(HAS_PENDING_EXCEPTION),则不执行任何操作。
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
如果等待节点 node 的 _notified 为0,则执行以下操作:
if (node._notified == 0) {
如果 millis 小于等于0,则调用 Self->_ParkEvent->park() 进行线程等待。
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
否则,调用 Self->_ParkEvent->park(millis) 进行线程等待,并将返回值赋给 ret。
ret = Self->_ParkEvent->park (millis) ;
}
}
检查线程在等待期间是否被外部挂起,如果是,则调用 ExitSuspendEquivalent(jt) 进行退出挂起等效状态的操作。
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.添加相应的处理逻辑。
调用 jt->java_suspend_self() 将线程自身挂起。
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
/// 节点可能在WaitSet中,EntryList(或cxq)中,或者从WaitSet过渡到EntryList中。 // 检查是否需要从WaitSet中移除节点。 // 我们使用双重检查锁定来避免在线程不在等待队列上时获取_WaitSetLock。 // // 注意,在获取TState之前,我们不需要进行内存屏障。 // 在最坏的情况下,我们将获取到以前由该线程写入的旧的、过时的TS_WAIT值。 // (也许这个获取甚至可能通过处理器自己的存储缓冲区实现,尽管考虑到在前一个存储和这个加载之间的代码路径的长度,这是非常不可能的)。 // 如果以下加载操作获取到了过时的TS_WAIT值,那么我们将获取锁并重新获取一个新的TState值。 // 也就是说,我们在失败时向安全性靠拢。
//用于检查等待节点的状态是否为等待状态,并在满足条件时解除链接操作,将等待节点从等待队列中移除,并将其状态设置为运行状态。
检查等待节点 node 的状态是否为等待状态(ObjectWaiter::TS_WAIT)
if (node.TState == ObjectWaiter::TS_WAIT) {
使用自旋锁 Thread::SpinAcquire 保护对 _WaitSetLock 的访问,以进行解除链接操作之前获取锁。
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
再次检查等待节点 node 的状态是否为等待状态(ObjectWaiter::TS_WAIT),以防止在获取锁之前节点状态已被修改。
if (node.TState == ObjectWaiter::TS_WAIT) {
调用 DequeueSpecificWaiter(&node) 将等待节点从等待队列中解除链接,将其从等待队列中移除。
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
使用断言确保等待节点的 _notified 为0,即在解除链接操作之前没有被通知过。
assert(node._notified == 0, "invariant");
将等待节点的状态设置为运行状态(ObjectWaiter::TS_RUN)。
node.TState = ObjectWaiter::TS_RUN ;
}
使用自旋锁 Thread::SpinRelease 释放 _WaitSetLock 锁,完成解除链接操作。
Thread::SpinRelease (&_WaitSetLock) ;
}
// 现在,线程可能处于以下状态之一:离开列表(TS_RUN)、进入列表(TS_ENTER)或者在cxq(TS_CXQ)中。
//对于当前线程来说,节点的TState变量是稳定的。
//没有其他线程会异步地修改TState变量。
使用断言 guarantee 确保等待节点 node 的状态不是等待状态(ObjectWaiter::TS_WAIT),如果是等待状态,则会触发断言错误。
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
使用 OrderAccess::loadload() 进行内存屏障操作,确保前面的读操作对后面的读操作可见。
OrderAccess::loadload() ;
如果当前线程 _succ 等于自身,则将 _succ 设置为 NULL。
if (_succ == Self) _succ = NULL ;
将等待节点的 _notified 赋值给变量 WasNotified,用于记录等待节点是否被通知过。
WasNotified = node._notified ;
// 重新进入监视器。
//在重新进入竞争的监视器之前,需要重新获取监视器。 在调用object.wait()后重新进入竞争的监视器。 保留OBJECT_WAIT状态,直到成功完成重新进入操作。 线程状态是thread_in_vm,并且再次可以安全地访问oop对象,尽管对象的原始地址可能已经发生了变化。 (当然,在安全点之间不要缓存裸指针)。
//发布监视器等待事件。请注意,这是过去时态,表示我们已经完成了等待
如果满足条件 JvmtiExport::should_post_monitor_waited(),则调用 JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT),向 JVMTI 导出接口发送监视器等待事件。
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
}
// 根据条件执行一些监视器等待相关的事件和状态更新操作,然后根据等待节点的状态进行相应的处理操作。
如果满足条件 event.should_commit(),则调用 post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT),发送监视器等待事件。
if (event.should_commit()) {
post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
}
使用 OrderAccess::fence() 进行内存屏障操作,确保前面的写操作对后面的读操作可见。
OrderAccess::fence() ;
使用断言 assert 确保当前线程的 _Stalled 不为0,即在前面的代码中应该已经设置为非0的值。然后将 _Stalled 设置为0。
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
使用断言 assert 确保 _owner 不是当前线程 Self,即当前线程不应该是监视器的持有者。
assert (_owner != Self, "invariant") ;
将等待节点的状态赋值给变量 v。
ObjectWaiter::TStates v = node.TState ;
如果 v 等于 ObjectWaiter::TS_RUN,则调用 enter(Self),进入监视器
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
否则,使用断言 guarantee 确保 v 是 ObjectWaiter::TS_ENTER 或 ObjectWaiter::TS_CXQ,然后调用 ReenterI(Self, &node),重新进入监视器,并调用 node.wait_reenter_end(this) 完成等待重新进入的结束操作。
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
//检查通知是否发生以及是否需要抛出中断异常。根据条件进行相应的处理操作。
// 自身已经重新获取了锁。
//生命周期 - 代表自身的节点不应该出现在任何队列上。 节点即将超出作用域,即使它是不可销毁的,我们也不希望与该线程相关联的任何残余元素留在任何列表上。
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
使用断言 guarantee 确保 _recursions 等于0,即在前面的代码中应该已经被设置为0。
guarantee (_recursions == 0, "invariant") ;
将保存的旧递归计数值 save 赋值给 _recursions,恢复旧的递归计数。
_recursions = save; // restore the old recursion count
将等待线程的数量 _waiters 减1,表示有一个等待线程结束等待。
_waiters--; // decrement the number of waiters
// Verify a few postconditions
使用断言 assert 确保 _owner 等于当前线程 Self,即当前线程应该是监视器的持有者。
使用断言 assert 确保 _succ 不等于当前线程 Self,即当前线程不应该是监视器的后继者。
使用断言 assert 确保监视器对象的标记与当前监视器对象相匹配。
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
如果 SyncFlags 的第5位为1(即SyncFlags & 32为真),则调用 OrderAccess::fence() 进行内存屏障操作。
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
检查是否发生了通知,即变量 WasNotified 是否为假。
// check if the notification happened
如果通知没有发生,那么可能是超时或线程中断或两者都有。
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
// 检查是否发生了线程中断事件,并且当前线程允许被中断,并且没有未处理的异常。
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
如果满足这些条件,则抛出 java.lang.InterruptedException 异常。
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
// 注意:虚假唤醒将被视为超时。
// 监视器的通知优先于线程中断。
}
notify
notify 唤醒
notify
或者 notifyAll
方法可以唤醒同一个锁监视器下调用 wait
挂起的线程
void ObjectMonitor::notify(TRAPS) {
检查当前线程是否是锁的拥有者。
CHECK_OWNER();
如果_WaitSet为空,则记录一个事件并返回。
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
进行一些跟踪和监控的操作。
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
获取_WaitSetLock锁。
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
/从_WaitSet中取出一个等待线程,并将其赋值给变量iterator。
ObjectWaiter * iterator = DequeueWaiter() ;
如果iterator不为NULL,则记录一个事件,并对iterator的状态和通知标志进行一些断言和设置操作。如果Policy不等于4,则将iterator的状态设置为ObjectWaiter::TS_ENTER。将通知标志设置为已通知(_notified = 1)。获取当前线程的ID,并将其设置为iterator的通知者线程ID。
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = Self->osthread()->thread_id();
将_EntryList赋值给变量List。如果List不为NULL,则进行一些断言操作,确保_EntryList的状态和关系正确。
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
根据Policy的不同执行相应的操作:
如果Policy为0,则将iterator插入_EntryList的头部。
如果Policy为1,则将iterator插入_EntryList的尾部。
如果Policy为2,则将iterator插入_cxq的头部。
如果Policy为3,则将iterator插入_cxq的尾部。
如果Policy不是以上情况,则将iterator的状态设置为ObjectWaiter::TS_RUN,并使用ParkEvent的unpark()方法唤醒等待线程。
这段代码的目的是在调用notify方法时,从_WaitSet中取出一个等待线程,并根据Policy的不同将其插入_EntryList或_cxq中,或直接唤醒等待线程。这样可以实现线程的通知和唤醒机制。
// 头插 EntryList
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else // 尾插 EntryList
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else // 头插 cxq
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else // 尾插 cxq
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
/如果Policy小于4,则调用iterator的wait_reenter_begin方法,并将当前的ObjectMonitor对象作为参数传递给它。这个方法的目的是在等待期间重新进入对象监视器。
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock锁用于保护等待队列(_WaitSet),而不是EntryList。可以将将元素添加到EntryList的操作移到_WaitSetLock保护的关键部分之外。但在实践中,这并没有什么用处。除了wait()的超时和中断之外,只有监视器所有者线程会获取_WaitSetLock。_WaitSetLock几乎没有争用,因此减少关键部分的长度并没有多大好处。
换句话说,由于_WaitSetLock的争用很少,只有监视器所有者线程获取该锁,因此将元素添加到EntryList的操作放在_WaitSetLock保护的关键部分内外并没有太大区别。因此,将其保留在关键部分内部是更简单和更直观的做法。
}
// 自旋释放,自旋释放_WaitSetLock锁,并在需要时对通知计数进行递增操作。这段代码的作用是在释放锁之前对等待线程进行一些准备工作,并进行一些计数操作。
Thread::SpinRelease (&_WaitSetLock) ;
///如果iterator不为NULL,并且ObjectMonitor::_sync_Notifications不为NULL,则对ObjectMonitor::_sync_Notifications进行递增操作。
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc() ;
}
}
notifyAll
整个代码块的目的是在调用notifyAll方法时,从_WaitSet中取出所有等待线程,并根据Policy的不同将它们插入_EntryList或_cxq中,或直接唤醒等待线程。这样可以实现对所有等待线程的通知和唤醒机制。
void ObjectMonitor::notifyAll(TRAPS) {
CHECK_OWNER();
ObjectWaiter* iterator;
// 如果_WaitSet为空,则记录一个事件并返回。
if (_WaitSet == NULL) {
TEVENT (Empty-NotifyAll) ;
return ;
}
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
//声明并初始化变量Policy和Tally,并获取_WaitSetLock锁。该锁用于保护等待队列。
int Policy = Knob_MoveNotifyee ;
int Tally = 0 ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
//使用一个无限循环遍历_WaitSet,从中取出一个等待线程(iterator)。如果iterator为空,则跳出循环。在每次循环中,记录一个事件,并将Tally递增1。
for (;;) {
iterator = DequeueWaiter () ;
if (iterator == NULL) break ;
TEVENT (NotifyAll - Transfer1) ;
++Tally ;
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail or head.
// b. push it onto the front of the _cxq.
// For now we use (a).
对iterator的状态和通知标志进行一些断言和设置操作。将通知标志设置为已通知(_notified = 1)。获取当前线程的ID,并将其设置为iterator的通知者线程ID。如果Policy不等于4,则将iterator的状态设置为ObjectWaiter::TS_ENTER。
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = Self->osthread()->thread_id();
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
ObjectWaiter * List = _EntryList ;
/将_EntryList赋值给变量List。如果List不为NULL,则进行一些断言操作,确保_EntryList的状态和关系正确。
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
根据Policy的不同执行相应的操作:
如果Policy为0,则将iterator插入_EntryList的头部。
如果Policy为1,则将iterator插入_EntryList的尾部。
如果Policy为2,则将iterator插入_cxq的头部。
如果Policy为3,则将iterator插入_cxq的尾部。
如果Policy不是以上情况,则将iterator的状态设置为ObjectWaiter::TS_RUN,并使用ParkEvent的unpark()方法唤醒等待线程。
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
/// 如果Policy小于4,则调用iterator的wait_reenter_begin方法,并将当前的ObjectMonitor对象作为参数传递给它。这个方法的目的是在等待期间重新进入对象监视器。
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
/*
_WaitSetLock保护的是等待队列(wait queue),而不是EntryList。我们可以将将元素添加到EntryList的操作移到_WaitSetLock保护的关键部分之外。但在实践中,这并没有什么用处。除了wait()的超时和中断之外,只有监视器所有者线程会获取_WaitSetLock。_WaitSetLock几乎没有争用,因此减少关键部分的长度并没有多大好处。
换句话说,由于_WaitSetLock的争用很少,只有监视器所有者线程获取该锁,因此将元素添加到EntryList的操作放在_WaitSetLock保护的关键部分内外并没有太大区别。因此,将其保留在关键部分内部是更简单和更直观的做法。
*/
}
自旋释放_WaitSetLock锁。
Thread::SpinRelease (&_WaitSetLock) ;
//如果Tally不为0,并且ObjectMonitor::_sync_Notifications不为NULL,则对ObjectMonitor::_sync_Notifications进行递增操作,递增值为Tally。
if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc(Tally) ;
}
}
浅尝辄止,如有疑惑,欢迎一起交流。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/180223.html