• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

AQS的ReentrantLock的公平非公平源码解释

武飞扬头像
秋雨449
帮助3

AQS

AbstractQueuedSynchronizer(AQS)提供了一套可用于实现锁同步机制的框架,不夸张地说,AQSJUC同步框架的基石。AQS通过一个FIFO队列维护线程同步状态,实现类只需要继承该类,并重写指定方法即可实现一套线程同步机制。

JUC中的同步锁,异步任务都实现了AQS

AQS使用了模仿方法设计模式

源码解释,以ReentrantLock为例

加锁操作

// ReentrantLock类内部包含了FairSync公平锁和NonFairSync非公平锁和Sync
public ReentrantLock() {
    // 默认非公平锁,是一个Sync类型
    sync = new NonfairSync();
}
// Sync继承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer
public void lock() {
    // 调用ReentrantLock的lock方法其实调用的就是公平锁或非公平锁的lock方法
    sync.lock();
}

AQS的维护了一个同步状态和一个双向链表

// 双向链表头指针
private transient volatile Node head;
// 双向链表尾指针
private transient volatile Node tail;
// 同步状态
private volatile int state;

实际工作的是继承了AQS的Sync,先以非公平锁为例来讲

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    final void lock() {
        // 对同步状态state进行CAS操作,如果操作成功,抢到锁,不需要加入到阻塞队列
        if (compareAndSetState(0, 1))
            // 并设置当前的独占线程
            setExclusiveOwnerThread(Thread.currentThread());
        else // 否则加入到阻塞队列中,调用的是AQS的方法
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        // 又调用了父类Sync的方法
        return nonfairTryAcquire(acquires);
    }
}

AQS的acquire方法,有重要的三步

public final void acquire(int arg) {
    // 1. 先调用的是NonfairSync的tryAcquire方法,再尝试争夺锁资源
    if (!tryAcquire(arg) && 
        // 2. 如果尝试争夺锁失败,就进入到AQS的addWaiter方法,并传入一个表示独占模式的参数
        // 3. 调用AQS的acquireQueued方法,传入新节点,和参数arg,如果返回false,表示线程没被中断,返回true表示被中断
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Sync的nonfairTryAcquire方法(包含了公平锁和非公平锁)

final boolean nonfairTryAcquire(int acquires) {
    // 得到当前线程
    final Thread current = Thread.currentThread();
    // 得到同步状态
    int c = getState();
    // 如果锁没有被占用(状态为0表示没有被占用),则尝试CAS操作,如果操作成功,抢到锁返回true,不需要加入到阻塞队列
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果锁已被被占用了,则判断当前线程跟占用锁线程是否相等,相等就重入,然后返回true,不相等加入到阻塞队列
    // 这里体现出了可重入锁
    else if (current == getExclusiveOwnerThread()) {
        // 状态 1,重入一次就加一次,后续出一次减一次
        int nextc = c   acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 如果以上两步都没有拿到锁,则返回false,加入到阻塞队列
    return false;
}
// 公平锁源码
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // hasQueuedPredecessors就是公平锁原理,判断队列中是否还有在排队的节点
                // 如果队列中没有排队的节点,则可以去抢锁,否则就去排队。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c   acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
 }
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // h!=t说明有
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

先介绍的AQS的内部类Node

static final class Node {
    static final Node SHARED = new Node(); // 用于表示线程以共享模式等待锁
    static final Node EXCLUSIVE = null; // 用于表示线程以独占模式等待锁
    static final int CANCELLED =  1; // 用于表示线程获取锁的请求已经被取消了
    static final int SIGNAL    = -1; // 用于表示线程已经准备好了,等待资源释放
    static final int CONDITION = -2; // 用于表示节点在阻塞队列中,节点线程等待唤醒
    static final int PROPAGATE = -3; // 用于表示节点需要向后传播,当前线程处于共享模式才会使用
    volatile int waitStatus; // 用于表示节点的等待状态
    volatile Node prev; // 用于表示当前节点的前驱节点
    volatile Node next; // 用于表示当前节点的后继节点
    volatile Thread thread; // 用于表示当前节点所属的线程
    Node nextWaiter; // 用于表示下一个等待被唤醒的节点
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回当前节点的前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {}
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { 
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

AQS的addWaiter方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 把尾节点赋给pred,第一次进来tail肯定为null
    Node pred = tail;
    if (pred != null) {
        // 将新节点的前驱指向pred,也就是当前的尾节点
        node.prev = pred;
        // 对尾节点进行CAS操作,成绩就换成新节点,失败就进入enq方法
        if (compareAndSetTail(pred, node)) {
            // 再将前一个节点(也就是之前的尾节点)的后继节点指向新节点
            pred.next = node;
            return node;
        }
    }
    // 如果pred==null,或者替换尾节点CAS操作失败就进入enq方法,传入新建节点
    // 后续的节点如果上面的CAS失败了,那就都会进入该方法,一直循环CAS操作修改尾节点,直到成功
    enq(node);
    return node;
}

AQS的enq方法

private Node enq(final Node node) {
    // 死循环,设置成死循环是因为并发操作CAS有线程会失败,那就重新CAS,还有初始化头节点时,也需要再次循环,添加真实的节点
    for (;;) {
        // 将尾节点赋给t,第一次进来肯定为null
        Node t = tail;
        if (t == null) { // CAS操作初始化头节点,该头节点只是占个位,虚节点
            if (compareAndSetHead(new Node()))
                // 此时头尾都是同一个节点
                tail = head;
        } else { // 如果有尾节点了
            // 那就将新节点的前驱节点指向当前的尾节点
            node.prev = t;
            // 然后对尾节点进行CAS操作替换成新节点
            if (compareAndSetTail(t, node)) {
                // 在将前一个节点(也就是之前的尾节点)指向新节点
                t.next = node;
                return t;
            }
        }
    }
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 死循环,直到当前节点顺利得到锁资源,得不到就会一直等待锁的释放
        for (;;) {
            // 拿到当前节点的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点就是头节点,则再尝试获取锁资源,获取成功,则进入if,失败进入else
            if (p == head && tryAcquire(arg)) {
                // 设置新头节点为当前节点
                setHead(node);
                // 清除旧头节点的尾指针,当前节点能抢到锁,说明旧头节点已经执行完毕
                p.next = null; // help GC
                failed = false;
                // 返回线程是否被中断
                return interrupted;
            }
            // shouldParkAfterFailedAcquire如果返回true,那么就让当前线程调用parkAndCheckInterrupt,
            // parkAndCheckInterrupt里面会阻塞该线程,并且返回当前线程是否被中断
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 异常情况
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 前一个节点是否已经准备好了,只等锁释放
    if (ws == Node.SIGNAL)
        return true;
    // 前一个节点线程获取锁的请求是否已被取消,如果取消,则清除已取消的所有节点,返回false
    if (ws > 0) {
        do {
            // 当前节点的前驱指针前移,pred指针也前移
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);//循环判断pred节点是否已经取消对锁的请求,如果取消就一直前移当前节点的前驱指针
        // 最后把当前的pred指向的未取消请求的节点的后继节点指向当前节点。相当于把两者中间取消对锁请求的节点清除
        pred.next = node;
    } else {
        // 如果没有被取消,则对前一个节点的等待状态进行CAS操作,修改成已就绪状态,然后会调用LockSupport.park()阻塞线程,在阻塞线程前还有一次获取锁资源的机会,获取到了就不会阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);// 阻塞当前线程,等待唤醒,
    // 唤醒后返回当前线程是否被中断
    return Thread.interrupted();
}

到此加锁所有线程都是阻塞中,lock方法已经做完,等待unlock释放,这边阻塞的线程才能获取锁

解锁操作

public void unlock() {
    sync.release(1);
}

AQS的release方法

public final boolean release(int arg) {
    // 释放锁是否成功
    if (tryRelease(arg)) {
        // 头节点赋给h
        Node h = head;
        // 判断h是否非空,并且h节点的等待状态为非0
        if (h != null && h.waitStatus != 0)
            // 唤醒下一个节点
            unparkSuccessor(h);
        // 返回释放锁成功
        return true;
    }
    // 返回释放锁失败
    return false;
}

Sync的tryRelease方法

protected final boolean tryRelease(int releases) {
    // 同步状态-1
    int c = getState() - releases;
    // 判断当前线程是否不同于持有独占锁的线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 判断同步状态是否为0,为0才表示锁被释放
    if (c == 0) {
        free = true;
        // 设置当前独占锁没有任何线程被占用
        setExclusiveOwnerThread(null);
    }
    // 重新设置同步状态
    setState(c);
    // 返回锁释放结果
    return free;
}

AQS的unparkSuccessor方法

private void unparkSuccessor(Node node) {
    // 准备唤醒下一个节点
    // 当前头节点的等待状态赋给ws
    int ws = node.waitStatus;
    // 当前头节点等待状态是否小于0
    if (ws < 0)
        // 对当前头节点的等待状态进行CAS操作修改为0,因为始终认为头节点是已经执行完毕的节点,或正在执行的节点,是一个占位的作用
        compareAndSetWaitStatus(node, ws, 0);
    // 将当前头节点的下一个节点赋给s
    Node s = node.next;
    // s如果等于null 或者 s的等待状态大于0,说明下一个节点已经没了。或者已经取消锁的获取请求了
    if (s == null || s.waitStatus > 0) {
        // 则将s重置为null
        s = null;
        // 并且从队列尾巴开始遍历,找到连续的没有取消锁获取请求的节点的最前的一个
        // 然后该节点线程会唤醒,该线程就会把取消锁请求的线程给清除掉
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果找到了,唤醒该s节点中的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

异常情况

private void cancelAcquire(Node node) {
    // 当前节点为null,不处理
    if (node == null)
        return;
    // 清除节点线程
    node.thread = null;
    // 保存当前节点的前驱节点
    Node pred = node.prev;
    // 循环判断前驱节点的等待状态是否是取消锁的获取请求状态
    while (pred.waitStatus > 0)
        // 如果是,则把该前驱节点的前驱节点赋给pred再赋给当前节点的前驱节点
        node.prev = pred = pred.prev;
    // 保存当前节点的前驱节点的后继节点
    Node predNext = pred.next;
    // 设置当前节点的状态为取消锁的获取请求
    node.waitStatus = Node.CANCELLED;
    // 如果当前节点就是尾巴节点,则就清除自己
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 如果不是尾巴
        int ws;
        // 则判断是否满足pred不是头节点
        if (pred != head &&
            // 把pred的等待状态赋给ws,并比较该状态是否为就绪状态
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             // 如果不是以就绪状态,那也不能是取消锁请求状态,然后改成就绪状态
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            // pred的线程不能为null
            pred.thread != null) {
            // 将当前节点的后继节点保存起来
            Node next = node.next;
            // 判断该后继节点是否不为null并且当前节点等待状态<=0
            if (next != null && next.waitStatus <= 0)
                // 对前驱节点的后一个节点进行CAS操作替换成当前节点的后继节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 
            unparkSuccessor(node);
        }
        //自己的后继指向自己,等待gc回收
        node.next = node; // help GC
    }
}
private void unparkSuccessor(Node node) {
    // 拿到当前节点的等待状态
    int ws = node.waitStatus;
    // 该状态是否小于0
    if (ws < 0)
        // CAS修改等待状态为0
        compareAndSetWaitStatus(node, ws, 0);
    // 保存当前节点的后继节点
    Node s = node.next;
    // 后继节点是否为null 或者 后继节点的等待状态是否为取消锁请求
    if (s == null || s.waitStatus > 0) {
        // 将后继节点设置为null
        s = null;
        // 并且后队列尾巴开始向前遍历,找到连续非取消状态的最前一个节点(不能是当前节点)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果找到了该节点s
    if (s != null)
        // 就唤醒该节点
        LockSupport.unpark(s.thread);
}

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhbfjh
系列文章
更多 icon
同类精品
更多 icon
继续加载