AQS的ReentrantLock的公平非公平源码解释
AQS
AbstractQueuedSynchronizer(AQS)
提供了一套可用于实现锁同步机制的框架,不夸张地说,AQS
是JUC
同步框架的基石。AQS
通过一个FIFO
队列维护线程同步状态,实现类只需要继承该类,并重写指定方法即可实现一套线程同步机制。
JUC中的同步锁,异步任务都实现了AQS
AQS使用了模仿方法设计模式
源码解释,以ReentrantLock为例
加锁操作
// ReentrantLock类内部包含了FairSync公平锁和NonFairSync非公平锁和Sync
public ReentrantLock() {
// 默认非公平锁,是一个Sync类型
sync = new NonfairSync();
}
// Sync继承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer
public void lock() {
// 调用ReentrantLock的lock方法其实调用的就是公平锁或非公平锁的lock方法
sync.lock();
}
AQS的维护了一个同步状态和一个双向链表
// 双向链表头指针
private transient volatile Node head;
// 双向链表尾指针
private transient volatile Node tail;
// 同步状态
private volatile int state;
实际工作的是继承了AQS的Sync,先以非公平锁为例来讲
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
// 对同步状态state进行CAS操作,如果操作成功,抢到锁,不需要加入到阻塞队列
if (compareAndSetState(0, 1))
// 并设置当前的独占线程
setExclusiveOwnerThread(Thread.currentThread());
else // 否则加入到阻塞队列中,调用的是AQS的方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 又调用了父类Sync的方法
return nonfairTryAcquire(acquires);
}
}
AQS的acquire方法,有重要的三步
public final void acquire(int arg) {
// 1. 先调用的是NonfairSync的tryAcquire方法,再尝试争夺锁资源
if (!tryAcquire(arg) &&
// 2. 如果尝试争夺锁失败,就进入到AQS的addWaiter方法,并传入一个表示独占模式的参数
// 3. 调用AQS的acquireQueued方法,传入新节点,和参数arg,如果返回false,表示线程没被中断,返回true表示被中断
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Sync的nonfairTryAcquire方法(包含了公平锁和非公平锁)
final boolean nonfairTryAcquire(int acquires) {
// 得到当前线程
final Thread current = Thread.currentThread();
// 得到同步状态
int c = getState();
// 如果锁没有被占用(状态为0表示没有被占用),则尝试CAS操作,如果操作成功,抢到锁返回true,不需要加入到阻塞队列
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果锁已被被占用了,则判断当前线程跟占用锁线程是否相等,相等就重入,然后返回true,不相等加入到阻塞队列
// 这里体现出了可重入锁
else if (current == getExclusiveOwnerThread()) {
// 状态 1,重入一次就加一次,后续出一次减一次
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果以上两步都没有拿到锁,则返回false,加入到阻塞队列
return false;
}
// 公平锁源码
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// hasQueuedPredecessors就是公平锁原理,判断队列中是否还有在排队的节点
// 如果队列中没有排队的节点,则可以去抢锁,否则就去排队。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h!=t说明有
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
先介绍的AQS的内部类Node
static final class Node {
static final Node SHARED = new Node(); // 用于表示线程以共享模式等待锁
static final Node EXCLUSIVE = null; // 用于表示线程以独占模式等待锁
static final int CANCELLED = 1; // 用于表示线程获取锁的请求已经被取消了
static final int SIGNAL = -1; // 用于表示线程已经准备好了,等待资源释放
static final int CONDITION = -2; // 用于表示节点在阻塞队列中,节点线程等待唤醒
static final int PROPAGATE = -3; // 用于表示节点需要向后传播,当前线程处于共享模式才会使用
volatile int waitStatus; // 用于表示节点的等待状态
volatile Node prev; // 用于表示当前节点的前驱节点
volatile Node next; // 用于表示当前节点的后继节点
volatile Thread thread; // 用于表示当前节点所属的线程
Node nextWaiter; // 用于表示下一个等待被唤醒的节点
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS的addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 把尾节点赋给pred,第一次进来tail肯定为null
Node pred = tail;
if (pred != null) {
// 将新节点的前驱指向pred,也就是当前的尾节点
node.prev = pred;
// 对尾节点进行CAS操作,成绩就换成新节点,失败就进入enq方法
if (compareAndSetTail(pred, node)) {
// 再将前一个节点(也就是之前的尾节点)的后继节点指向新节点
pred.next = node;
return node;
}
}
// 如果pred==null,或者替换尾节点CAS操作失败就进入enq方法,传入新建节点
// 后续的节点如果上面的CAS失败了,那就都会进入该方法,一直循环CAS操作修改尾节点,直到成功
enq(node);
return node;
}
AQS的enq方法
private Node enq(final Node node) {
// 死循环,设置成死循环是因为并发操作CAS有线程会失败,那就重新CAS,还有初始化头节点时,也需要再次循环,添加真实的节点
for (;;) {
// 将尾节点赋给t,第一次进来肯定为null
Node t = tail;
if (t == null) { // CAS操作初始化头节点,该头节点只是占个位,虚节点
if (compareAndSetHead(new Node()))
// 此时头尾都是同一个节点
tail = head;
} else { // 如果有尾节点了
// 那就将新节点的前驱节点指向当前的尾节点
node.prev = t;
// 然后对尾节点进行CAS操作替换成新节点
if (compareAndSetTail(t, node)) {
// 在将前一个节点(也就是之前的尾节点)指向新节点
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 死循环,直到当前节点顺利得到锁资源,得不到就会一直等待锁的释放
for (;;) {
// 拿到当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点就是头节点,则再尝试获取锁资源,获取成功,则进入if,失败进入else
if (p == head && tryAcquire(arg)) {
// 设置新头节点为当前节点
setHead(node);
// 清除旧头节点的尾指针,当前节点能抢到锁,说明旧头节点已经执行完毕
p.next = null; // help GC
failed = false;
// 返回线程是否被中断
return interrupted;
}
// shouldParkAfterFailedAcquire如果返回true,那么就让当前线程调用parkAndCheckInterrupt,
// parkAndCheckInterrupt里面会阻塞该线程,并且返回当前线程是否被中断
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 异常情况
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前一个节点是否已经准备好了,只等锁释放
if (ws == Node.SIGNAL)
return true;
// 前一个节点线程获取锁的请求是否已被取消,如果取消,则清除已取消的所有节点,返回false
if (ws > 0) {
do {
// 当前节点的前驱指针前移,pred指针也前移
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);//循环判断pred节点是否已经取消对锁的请求,如果取消就一直前移当前节点的前驱指针
// 最后把当前的pred指向的未取消请求的节点的后继节点指向当前节点。相当于把两者中间取消对锁请求的节点清除
pred.next = node;
} else {
// 如果没有被取消,则对前一个节点的等待状态进行CAS操作,修改成已就绪状态,然后会调用LockSupport.park()阻塞线程,在阻塞线程前还有一次获取锁资源的机会,获取到了就不会阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);// 阻塞当前线程,等待唤醒,
// 唤醒后返回当前线程是否被中断
return Thread.interrupted();
}
到此加锁所有线程都是阻塞中,lock方法已经做完,等待unlock释放,这边阻塞的线程才能获取锁
解锁操作
public void unlock() {
sync.release(1);
}
AQS的release方法
public final boolean release(int arg) {
// 释放锁是否成功
if (tryRelease(arg)) {
// 头节点赋给h
Node h = head;
// 判断h是否非空,并且h节点的等待状态为非0
if (h != null && h.waitStatus != 0)
// 唤醒下一个节点
unparkSuccessor(h);
// 返回释放锁成功
return true;
}
// 返回释放锁失败
return false;
}
Sync的tryRelease方法
protected final boolean tryRelease(int releases) {
// 同步状态-1
int c = getState() - releases;
// 判断当前线程是否不同于持有独占锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 判断同步状态是否为0,为0才表示锁被释放
if (c == 0) {
free = true;
// 设置当前独占锁没有任何线程被占用
setExclusiveOwnerThread(null);
}
// 重新设置同步状态
setState(c);
// 返回锁释放结果
return free;
}
AQS的unparkSuccessor方法
private void unparkSuccessor(Node node) {
// 准备唤醒下一个节点
// 当前头节点的等待状态赋给ws
int ws = node.waitStatus;
// 当前头节点等待状态是否小于0
if (ws < 0)
// 对当前头节点的等待状态进行CAS操作修改为0,因为始终认为头节点是已经执行完毕的节点,或正在执行的节点,是一个占位的作用
compareAndSetWaitStatus(node, ws, 0);
// 将当前头节点的下一个节点赋给s
Node s = node.next;
// s如果等于null 或者 s的等待状态大于0,说明下一个节点已经没了。或者已经取消锁的获取请求了
if (s == null || s.waitStatus > 0) {
// 则将s重置为null
s = null;
// 并且从队列尾巴开始遍历,找到连续的没有取消锁获取请求的节点的最前的一个
// 然后该节点线程会唤醒,该线程就会把取消锁请求的线程给清除掉
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到了,唤醒该s节点中的线程
if (s != null)
LockSupport.unpark(s.thread);
}
异常情况
private void cancelAcquire(Node node) {
// 当前节点为null,不处理
if (node == null)
return;
// 清除节点线程
node.thread = null;
// 保存当前节点的前驱节点
Node pred = node.prev;
// 循环判断前驱节点的等待状态是否是取消锁的获取请求状态
while (pred.waitStatus > 0)
// 如果是,则把该前驱节点的前驱节点赋给pred再赋给当前节点的前驱节点
node.prev = pred = pred.prev;
// 保存当前节点的前驱节点的后继节点
Node predNext = pred.next;
// 设置当前节点的状态为取消锁的获取请求
node.waitStatus = Node.CANCELLED;
// 如果当前节点就是尾巴节点,则就清除自己
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果不是尾巴
int ws;
// 则判断是否满足pred不是头节点
if (pred != head &&
// 把pred的等待状态赋给ws,并比较该状态是否为就绪状态
((ws = pred.waitStatus) == Node.SIGNAL ||
// 如果不是以就绪状态,那也不能是取消锁请求状态,然后改成就绪状态
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// pred的线程不能为null
pred.thread != null) {
// 将当前节点的后继节点保存起来
Node next = node.next;
// 判断该后继节点是否不为null并且当前节点等待状态<=0
if (next != null && next.waitStatus <= 0)
// 对前驱节点的后一个节点进行CAS操作替换成当前节点的后继节点
compareAndSetNext(pred, predNext, next);
} else {
//
unparkSuccessor(node);
}
//自己的后继指向自己,等待gc回收
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
// 拿到当前节点的等待状态
int ws = node.waitStatus;
// 该状态是否小于0
if (ws < 0)
// CAS修改等待状态为0
compareAndSetWaitStatus(node, ws, 0);
// 保存当前节点的后继节点
Node s = node.next;
// 后继节点是否为null 或者 后继节点的等待状态是否为取消锁请求
if (s == null || s.waitStatus > 0) {
// 将后继节点设置为null
s = null;
// 并且后队列尾巴开始向前遍历,找到连续非取消状态的最前一个节点(不能是当前节点)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到了该节点s
if (s != null)
// 就唤醒该节点
LockSupport.unpark(s.thread);
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfhbfjh
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01