AbstractQueuedSynchronizer框架浅析

AbstractQueuedSynchronizer框架浅析

1.概述

AbstractQueuedSynchronizer(AQS)抽象类提供一个实现阻塞锁和依赖于FIFO等待队列同步器的框架。
AQS被设计用来作为众多同步器的基类,例如ReentrantLock、Semaphore、CountDownLatch、FutureTask以及ReentrantReadWriteLock。AQS依赖于一个整数值,用来代表状态。AQS的子类可以通过覆写protect方法来改变状态,并且定义状态代表具体什么含义。不同的同步器对状态的含义解释不同:

  • 在ReentrantLock中,AQS的同步状态用于保存锁获取操作的次数
  • 在FutureTask中,AQS同步状态被用来保存任务的状态
  • 在Semaphore和CountDownLatch中,AQS的同步状态用于保存当前可用许可的数量

针对AQS中的状态,只可以通过AQS的getState()、setState()和compareAndSetState()方法来改变状态。

同步器类应该使用私有的AQS子类来实现功能,而不应该直接继承AQS类。AQS类没有实现任何同步接口,它只是定义了一些可以被具体同步器和锁调用的方法,例如acquireInterruptibly方法。AQS子类需要自己实现以下方法:

  • tryAcquire()
  • tryRelease()
  • tryAcquireShared()
  • tryReleaseShared()
  • isHeldExclusively()

这些方法默认会抛出UnsupportedOperationException异常。在这些方法实现中,可以使用getState()、setState()和compareAndSetState()来获取或修改AQS的状态。这些方法的实现必须是线程安全的,并且应该实现简单且没有阻塞。只有这些方法可以被子类覆写,其他的AQS的公开方法都是final方法。

AQS同时支持独占操作模式(例如ReentrantLock)和非独占操作模式(例如Semaphore和CountDownLatch)。当以独占模式获取锁时,只有一个线程能访问成功,其他线程都访问失败;而以非独占模式获取锁时,多个线程可以同时访问成功。不同操作模式的线程都在同一个FIFO队列中等待。通常,AQS的子类只支持一种操作模式(独占或非独占),但也有同时支持两种操作模式的同步器,例如ReadWriteLock的子类,它的读取锁是非独占操作模式,而写入锁是独占操作模式。

由于在将线程放入FIFO等待队列之前,需要尝试一次acquire,因此有可能新的acquire线程可以获取成功,尽管等待队列中还有其他线程阻塞等待,这是一种非公平策略。然而,可以在tryAcquire()或者tryAcquireShared()方法中禁止线程抢占,具体是通过hasQueuedPredecessors()方法判断等待队列中是否有线程在阻塞等待,如果有线程阻塞等待,则让tryAcquire()或者tryAcquireShared()方法返回false,这样的话,acquire线程会被放入等待队列的尾部,然后唤醒阻塞等待的线程,这是一种公平的策略

AQS类为同步器的状态、参数的获取和释放,以及内部FIFO等待队列,提供了一个高效的和可扩展的基础。当这些不能满足你的要求时,你可以自定义java.util.concurrent.atomic原子类,自定义java.util.Queue类,以及LockSupport类来提供阻塞支持。

AQS框架的一个类图如下所示:

AQS框架类图

2.源码分析

2.1 AbstractQueuedSynchronizer类的继承关系

AQS类继承了AbstractOwnableSynchronizer类,实现了Serializable接口。AbstractOwnableSynchronizer类主要是用来保存同步器被哪个线程独占使用。

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
    ......
}

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {

    protected AbstractOwnableSynchronizer() { }

    //The current owner of exclusive mode synchronization.
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

2.2 Node内部类

在AQS类中,定义了一个FIFO等待队列节点的内部类Node。

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;

    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /*
    * 非负值意味着该节点不需要signal。
    * 该值默认初始化为0,表示普通的同步节点,主要是通过CAS方法来修改该变量值。
    */
    volatile int waitStatus;

    /*
    * 指向前继节点,当前节点需要依赖于前继节点来检查waitStatus。
    * 在入队列的时候赋值,在出队列的时候为null。
    */
    volatile Node prev;

    /*
    * 指向后继节点,当前节点依赖后继节点来唤醒释放
    * 在入队列的时候赋值,在出队列的时候为null。
    */
    volatile Node next;

    /*
    * 将该节点入队列的线程,在构造节点的时候初始化,使用完之后变为null
    */
    volatile Thread thread;

    /*
    * 指向下一个在条件等待,或者是特定的SHARED值的节点。
    * 条件队列只有在获取独占锁时才可以被访问,我们需要一个单链表队列来保存节点,当他们在条件上等待时。
    */
    Node nextWaiter;

     /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

等待队列是CLH锁队列的变种,CLH锁通常被用来实现自旋锁。在节点中的waitStatus域保存了线程是否需要被阻塞的信息。当一个节点的前继节点被释放了,节点将会收到通知。在等待队列中的每一个节点,都充当着特定通知形式的监控器,并且持有一个等待线程。节点中的waitStatus域并不控制线程是否能获取到锁。如果一个线程是队列中的第一个线程,并不能保证它能竞争成功获取到锁,而只是给予了它参与竞争的权利。

为了将一个Node节点放入到LCH锁队列中,只需要将该Node节点拼接到队列尾部就行;如果为了出队列,则只需设置队列的头指针位置head。

+------+  prev +-----+       +-----+
  head |      | <---- |     | <---- |     |  tail
       +------+       +-----+       +-----+

插入节点到LCH队列中,只需一个在tail域上的原子操作。相似的,出队列仅仅需要更新head域,但是出队列还需要做一些额外的工作,来决定新的head节点的后继节点是哪个,其中需要考虑的因素有线程是否被取消了,操作是否超时了以及线程是否被中断了。

在Node节点中的prev域主要用来处理节点被取消的情况。如果一个Node节点被取消了,那边它的后继节点需要重新连接到一个未被取消的前继节点。

在Node节点中的next域主要被用来实现阻塞机制。在每个节点中都保存有线程ID,因此当需要唤醒下一个节点时,只需要通过遍历next域,找到后继节点,通过节点获取到线程ID,从而知道该唤醒哪个线程。

等待状态只能取以下几个值:

  • SIGNAL:该节点的后继节点当前是阻塞的,因此当前节点在释放和取消之后,必须唤醒它的后继节点。为了避免竞争,acquire方法必须首先进入SIGNAL等着状态,然后再尝试原子获取,这个获取过程可能会失败或者阻塞。
  • CANCELLED:该节点由于超时或者中断了被取消了。节点进入了CANCELLED状态之后,就不会再发生状态的变化了。特别地,处于CANCELLED状态节点的线程不会再被阻塞了。
  • CONDITION:该节点当前处于一个条件队列中。经过转移之后,该节点将会被作为同步队列的节点使用,此时节点的状态会被设置为0。
  • PROPAGATE:releaseShared方法应该传递给其他Node节点。在doReleaseShared方法中,确保传递会继续。
  • 0:非以上任何状态;

等待状态值使用数值来简化使用,非负值意味着节点不需要被通知唤醒,因此大多数代码只需检查数值的正负就可以知道是否需要唤醒了。

2.3 AQS类的重要成员变量

/*
* 等待队列的头节点,延迟初始化
* 除了初始化可以修改head值,还可以通过setHead方法设置
* 只要head节点存在,那么该节点的waitStatus状态将不会是CANCELLED
*/
private transient volatile Node head;

/*
* 等待队列的尾节点,延迟初始化
* 只能通过enq方法来添加一个新的等待节点到队列中,从而来修改tail值
*/
private transient volatile Node tail;

/*
* 同步状态
*/
private volatile int state;

可以看到,在AQS类中,有三个比较重要的成员变量,其中两个是表示等待队列的头指针和尾指针。另外一个表示同步的状态。

与state相关的方法有三个:

/*
* 获取当前同步状态
*/
protected final int getState() {
    return state;
}

/*
* 设置新的同步状态
*/
protected final void setState(int newState) {
    state = newState;
}

/*
* 采用CAS方法来更新同步状态
* expect 期望的值
* update 更新为新的值
*/
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);//采用了sun.misc.Unsafe类来实现CAS操作
}

在AQS中使用到了sun.misc.Unsafe类来实现CAS操作,Unsafe类的compareAndSwapInt()和compareAndSwapLong()等方法包装了CAS操作,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令。

与head和tail相关的方法有两个:

/*
* 将队列的头结点设置为node结点
* 该方法只被acquire系列方法调用
*/
private void setHead(Node node) {
    head = node;//让head指针指向node结点
    node.thread = null;
    node.prev = null;
}

/*
* 插入node节点到队列中,必要时初始化
* 返回node节点的前继节点,即原来的尾节点
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾节点为空,则先进行初始化操作,创建一个新的node节点
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))//head节点被成功初始化后,将tail节点指向head节点
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {//将tail节点更新为node节点 
                t.next = node;
                return t;//返回node的前继节点
            }
        }
    }
}

可以看到,head和tail节点的初始化操作是在setHead()和enq()方法中进行的,同时更新操作也是在这两个方法中进行的。

2.4 AQS类暴露给子类实现的方法

/*
* 尝试以独占模式获取
* 该方法应该查询对象的状态是否允许以独占模式获取,如果允许,则获取。
* 该方法总是被执行acquire方法的线程执行,如果该方法失败了,则acquire方法将该线程放入队列中,直到有其他线程调用了release方法来发送通知信号。
* 如果获取成功了,则返回true。
*/
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

/*
* 尝试修改状态来反映以独占模式释放
* 该方法总是被执行释放的线程触发
* 如果该对象处于完全释放的状态则返回true
*/
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

/*
* 尝试以非独占模式来获取
* 该方法应该查询是否对象允许以非独占模式来获取,如果允许,则获取。
* 该方法总是被执行acquire方法的线程执行。如果该方法失败了,则acquire方法将该线程放入队列中,直到有其他线程调用了release方法来发送通知信号
* 返回值为负值,表示失败;
* 返回0,表示以非独占模式获取成功,但后续的以非独占模式获取将失败
* 返回正值,表示以非独占模式获取成功,后续的以非独占模式获取也会成功
*/
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

/*
* 尝试修改状态来反映以非独占模式释放
* 该方法总是被执行释放的线程触发
* 返回true,表示以非独占模式释放成功
*/
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

/*
* 如果同步器是被当前线程以独占模式访问,则返回true。
* 该方法在每次调用非等待ConditionObject方法时被触发。
*/
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

2.5 AQS类定义的acquire系列方法

2.5.1 acquire()方法

/*
* 以独占模式获取,屏蔽中断
* 至少触发一次tryAcquire()方法,如果获取成功,直接返回
* 如果获取失败,则线程入队列,然后通过tryAcquire()不断尝试,直至成功
* 该方法可以被用来实现Lock.lock()方法
*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire()方法是空实现,需要子类覆写该方法,实现具体的获取操作。addWaiter()方法主要是为当前线程创建一个新的Node节点,并把该Node节点以指定的模式存放入队列中。

/*
* 为当前线程创建一个Node节点,并且设置为指定mode,最后将该node节点放入等待队列中
*  @param mode mode有两种取值:Node.EXCLUSIVE和Node.SHARED,分别代表以独占模式和非独占模式
*  @return 返回新的Node节点
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//给当前线程创建一个新的Node节点,节点模式为mode
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;//将node节点的前继指针指向尾节点
        if (compareAndSetTail(pred, node)) {//更新tail指针指向node节点
            pred.next = node;//将原来tail节点的后继指针指向node节点
            return node;//这样node节点成功链接到tail节点后面,并更新tail节点指向node节点了
        }
    }
    // 如果前面将node节点入队列失败,则再通过enq()方法入队列,其实现思想和上面的过程一致
    enq(node);
    return node;
}

addWaiter()方法首先根据mode模式给当前线程创建一个node节点,然后将该node节点放入队列的尾部。acquireQueued()方法以独占不可中断方式获取。

/*
* 为队列中的线程,以独占不可中断模式获取
* 该方法被条件等待和acquire方法使用  
* @param node 节点
* @param arg 获取的参数
* @return 在等待的过程中被中断了,则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//无限循环
            final Node p = node.predecessor();//获取当前节点的前继节点
            if (p == head && tryAcquire(arg)) {//如果前继节点等于head节点,并且尝试获取成功
                setHead(node);//更新head节点为node节点
                p.next = null; // help GC
                failed = false;
                return interrupted;//返回是否中断了
            }
            // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)//发生了异常,则取消获取操作
            cancelAcquire(node);//取消获取操作
    }
}

在acquireQueued()方法中,从尾节点开始循环前向遍历,如果当前节点的前继节点是头节点,并且tryAcquire()方法返回true了,则更新头结点,并返回。如果在向前遍历的过程中,遇到了节点获取失败需要挂起时,则会通过LockSupport的park()将当前线程挂起。

/*
* 检查和更新获取失败节点的状态
* 如果线程应该被阻塞,则返回true
* 该方法需要满足pred == node.prev
* @param pred node节点的前继节点,保存状态
* @param node node节点
* @return 如果线程应该被阻塞,则返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//获取前继节点的等待状态
    if (ws == Node.SIGNAL)// SIGNAL状态
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {//CANCELLED状态
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;//向前遍历搜索,找出前继节点
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {// waitStatus为0或者为PROPAGATE
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//设置waitStatus为SIGNAL
    }
    return false;
}

在shouldParkAfterFailedAcquire()方法中,根据pred节点的等待状态做出相应的处理:

  • SIGNAL状态:说明该节点已经更新了状态,请求释放,因此可以返回true
  • CANCELLED状态:说明节点已经被取消了,忽略前继节点状态为CANCELLED的节点,同时更新node和pred节点值
  • PROPAGATE状态或0:说明节点需要更新状态为SIGNAL。

可以看到,shouldParkAfterFailedAcquire()方法只有节点等待状态为SIGNAL时,才会返回true。后续才会执行parkAndCheckInterrupt()方法。

/*
* 停止当前线程参与系统调度,即挂起当前线程,并返回当前线程是否被中断了
*/
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//挂断当前线程
    return Thread.interrupted();//返回线程是否被中断了
}

LockSupport.park()方法:

/*
* 挂起线程
* 当发生了以下几种情况,可以唤醒线程:
* 1.其他线程触发了LockSupport.unlock()方法,唤醒线程
* 2.其他线程触发了Thread.interrupt()方法,打断当前线程
* 3.该调用无条件返回了
* 该方法不会记录什么原因导致该方法返回了,因此方法调用者,需要自己重新检查导致线程挂起的条件
* @param blocker 导致线程挂起的同步器
*/
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

从前面的一些方法调用可以看到,acquire方法主要分为两种情况来处理:

  • 第一次通过tryAcquire()方法获取成功了,则直接返回;
  • 当第一次tryAcquire()失败时,接下来为当前线程创建一个以独占操作模式的Node节点,并把Node节点放入等待队列的尾部。然后在acquireQueued()方法中,从尾节点开始向前,循环从等待队列中取出节点,判断是否允许获取操作。如果不允许获取,即需要阻塞,则通过LockSupport.lock()方法挂起当前线程。当其他线程解除了当前线程的阻塞,或者是发生了中断,则返回继续下一个循环。最终只有遍历到头结点head,并且tryAcquire()方法返回true时,才会从acquireQueued()方法中退出,代表获取完成了。

2.5.2 acquireInterruptibly()方法

/*
* 以独占模式获取,如果发生了中断,则停止获取
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquire()方法
* 如果第一次tryAcquire()方法失败,则将线程放入等待队列中,然后循环调用tryAcquire()方法,直至返回成功或者线程被中断了
* 该方法可以被用来实现Lock.lockInterruptibly()
*/
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())//如果线程被中断了,则直接返回异常
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

当tryAcquire()失败时,则调用doAcquireInterruptibly()方法重复的获取。

/*
* 以独占可中断模式获取
*/
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);//给当前线程,创建一个独占模式的节点,并把节点放入到等待队列尾部
    boolean failed = true;
    try {
        for (;;) {//从尾节点开始循环处理
            final Node p = node.predecessor();//当前节点的前继节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();// 如果线程被中断过,则抛出异常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到doAcquireInterruptibly()方法与acquireQueued()实现大致相似,唯一的不同之处是,doAcquireInterruptibly()检测到线程被中断之后,会抛出一个中断异常。

2.5.3 acquireShared()方法

/*
* 以非独占模式获取,屏蔽中断
* 至少会触发调用一次tryAcquireShared()
* 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功
*/
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

在tryAcquireShared()方法获取失败,会调用doAcquireShared()继续重复的获取。

/*
* 以非独占不可中断模式获取
*/
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//为当前线程创建一个非独占模式的Node节点,并把给Node节点放入到等待队列的尾部
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//从尾节点开始循环,不断向前遍历节点
            final Node p = node.predecessor();
            if (p == head) {//遍历到头节点
                int r = tryAcquireShared(arg);
                if (r >= 0) {//获取成功了,直接返回
                    setHeadAndPropagate(node, r);//更新头结点和同步状态
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared()方法与acquireQueued()方法实现类似,唯一不同之处是获取成功之后,会调用setHeadAndPropagate()方法来更head节点以及同步状态。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

在以下几种情况下,会对Node节点的后继节点进行判断是否需要释放:

  • propagate大于0;
  • 之前的头结点head为空;
  • 之前的头结点head的waitStatus状态小于0;
  • 当前的头结点head为空;
  • 当前的头结点head的waitStatus状态小于0;

在满足以上几种情况后,如果后继节点为空,或者后继节点是非独占模式的,则执行释放操作。

/*
* 非独占模式的释放
* 通知后继节点并且确保释放的传递
*/
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {// 从头结点开始循环
        Node h = head;
        // 如果头结点不为空,并且头结点不等于尾节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;//等待状态
            if (ws == Node.SIGNAL) {//等待状态为SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//1.将Node节点的状态更新为0
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//唤醒h节点的后继节点
            }else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//2.将Node节点的状态更新为PROPAGATE
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

可以看到,在doReleaseShared()方法中,主要的工作有:

  • 先将头节点从SIGNAL状态更新为0,并且唤醒头结点的后继节点;
  • 将头节点的状态从0更新为PROPAGATE;
  • 如果头结点在更新状态的时候没有发生改变,则退出循环;

    /*

    • 唤醒Node的后继节点
      */
      private void unparkSuccessor(Node node) {
      /*

      • If status is negative (i.e., possibly needing signal) try
      • to clear in anticipation of signalling. It is OK if this
      • fails or if status is changed by waiting thread.
        */
        int ws = node.waitStatus;
        if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

      /*

      • Thread to unpark is held in successor, which is normally
      • just the next node. But if cancelled or apparently null,
      • traverse backwards from tail to find the actual
      • non-cancelled successor.
        */
        Node s = node.next;//Node的后继节点
        if (s == null || s.waitStatus > 0) {// 后继节点为空,或者后继节点的等待状态大于0,则尝试从尾部节点开始到Node节点为止,寻找最靠近Node节点的等待状态小于等于0的节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
        s = t;
        }
        // 存在后继节点,其等待状态小于等于0
        if (s != null)
        LockSupport.unpark(s.thread);//唤醒该节点关联的线程
        }

在unparkSuccessor()方法中,主要是唤醒Node的后继节点中等待状态小于等于0的节点。

2.5.4 acquireSharedInterruptibly()方法

/*
* 以非独占模式获取,如果发生了中断,则停止获取
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquireShared()方法
* 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功
*/
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())//线程被中断了
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

在tryAcquireShared()返回失败后,会调用doAcquireSharedInterruptibly()方法重复尝试获取。

/*
* 以非独占可中断模式获取
*/
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();//获取前继节点
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();//抛出中断异常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到doAcquireSharedInterruptibly()方法与doAcquireShared()方法实现类似,唯一的不同之处是,在线程等待的过程中,如果被中断了,则会抛出中断异常。

2.5.5 tryAcquireNanos()

/*
* 尝试以独占模式获取,如果发生了中断则停止,如果超时了,则返回失败
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquire()方法
* 如果调用tryAcquire()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquire()方法,直到返回成功,或者被中断,或者超时了。
* 该方法可以被用来实现Lock.tryLock(long,TimeUnit)方法
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())//检查线程是否被中断了
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

当tryAcquire()方法返回失败时,会去调用doAcquireNanos()方法,重复尝试获取。

/*
* 以独占超时模式获取
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)// 如果超时时间小于0,则直接返回
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间
    final Node node = addWaiter(Node.EXCLUSIVE);//为当前线程创建一个独占模式的Node节点,并把Node放入到等待队列中
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();//前继节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();//计算剩余时间
            if (nanosTimeout <= 0L)//超时了,直接返回
                return false;
            if (shouldParkAfterFailedAcquire(p, node) && 
                nanosTimeout > spinForTimeoutThreshold)//在获取失败后,如果需要让线程挂起,则通过LockSupport的parkNanos()方法,让线程挂起指定的时间
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())//如果线程被中断了,则抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireNanos()方法与acquireQueued()方法实现很类似,不同之处在于,doAcquireNanos()加了一个超时判断,如果超时了,则直接返回。另外,doAcquireNanos()使用带超时时间的LockSupport.parkNanos()方法来暂停线程。

2.5.6 tryAcquireSharedNanos()方法

/*
* 尝试以非独占模式获取,如果发生了中断,则停止,如果超时了,则返回失败
* 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquireShared()方法
* 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功,或者被中断,或者超时了。
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())//线程被中断了
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

当tryAcquireShared()方法返回失败时,会去调用doAcquireSharedNanos()不断重复尝试获取。

/*
* 以非独占超时模式获取
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)// 如果超时时间小于0,则直接返回
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间
    final Node node = addWaiter(Node.SHARED);//为当前线程创建一个非独占模式的Node节点,并把Node放入到等待队列中
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();//前继节点
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();//计算剩余时间
            if (nanosTimeout <= 0L)//超时了,直接返回
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) //在获取失败后,如果需要让线程挂起,则通过LockSupport的parkNanos()方法,让线程挂起指定的时间
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedNanos()方法与doAcquireNanos()方法实现类似,唯一的区别是doAcquireSharedNanos()方法中是以非独占模式去获取状态,调用的是tryAcquireShared()方法去获取状态。

至此,已经介绍了所有公开的与acquire相关的final方法。下面看看所有公开的与release相关的final方法。

2.6 AQS类里面定义的release方法

2.6.1 release()方法

/*
* 以独占模式释放
* 如果tryRelease()方法返回true,则至少有一个线程非阻塞
* 该方法可以用来实现Lock.unlock()方法
*/
public final boolean release(int arg) {
    if (tryRelease(arg)) {//tryRelease()返回true,则返回true
        Node h = head;//头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒头结点的后继节点
        return true;
    }
    return false;
}

可以看到,在release()方法中,首先会调用tryRelease()方法尝试释放,如果释放成功,则返回true;如果释放失败,则直接返回false。在tryRelease()方法返回成功后,还会根据头结点的等待状态来判断是否需要唤醒头结点的后继节点。

2.6.2 releaseShared()方法

/*
* 以非独占模式释放
* 如果tryReleaseShared()方法返回true,则至少有一个线程非阻塞
*/
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//tryReleaseShared()返回true,则返回true
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()方法在5.3中的acquireShared()方法中有介绍到,其主要是先将头节点从SIGNAL状态更新为0,并且唤醒头结点的后继节点,然后将头节点的状态从0更新为PROPAGATE。

3.AQS的使用

我们通常不是直接继承AQS类,而是将相应的功能委托为私有的AQS子类来实现。下面是AQS类源码中介绍的两个使用范例:

  • 使用范例1

下面是一个不可重入互斥锁Mutex,使用0代表非锁定状态,使用1代表锁定状态。虽然不可重入锁不需要严格记录持有锁的当前线程,但是在Mutex类中,实现了记录当前持有锁的线程,这样更容易监控。另外,Mutex类也支持Condition条件,并且暴露了一些方法给外部使用。

public class Mutex implements Lock ,Serializable {

    private static class Sync extends AbstractQueuedSynchronizer{
        // Reports whether in locked state
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquires the lock if state is zero
        @Override
        protected boolean tryAcquire(int acquires) {
            assert acquires == 1;
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1;
            if (getState() == 0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition(){
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }


    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }
}
  • 使用范例2

下面是一个实现类似闭锁CountDownLatch功能的类,它是以非独占模式获取和释放。

public class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer{
        boolean isSignalled(){
            return getState() != 0;
        }

        @Override
        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();
    public boolean isSignalled(){
        return sync.isSignalled();
    }

    public void signal(){
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}
相关文章
相关标签/搜索