Bootstrap

AQS介绍和原理分析(下)-条件中断

接着上篇文章没说完的条件中断,我们先来看一个例子:

//首先创建一个可重入锁,它本质是独占锁
private final ReentrantLock takeLock = new ReentrantLock();
//创建该锁上的条件队列
private final Condition notEmpty = takeLock.newCondition();
//使用过程
public E take() throws InterruptedException {
    //首先进行加锁
    takeLock.lockInterruptibly();
    try {
        //如果队列是空的,则进行等待
        notEmpty.await();
        //取元素的操作...
        
        //如果有剩余,则唤醒等待元素的线程
        notEmpty.signal();
    } finally {
        //释放锁
        takeLock.unlock();
    }
    //取完元素以后唤醒等待放入元素的线程
}

上面的代码片段截取自LinkedBlockingQueue,是Java常用的阻塞队列之一。注意一点:条件队列是建立在锁基础上的,而且必须是独占锁

分析

等待条件的过程:

条件满足后的唤醒过程(以唤醒一个节点为例,也可以唤醒多个):

这里要注意,整个AQS分为两个队列,一个同步队列,一个条件队列,如图

源码分析

执行过程:

以我们上面提到的例子来分析

//条件队列入口,参考上面的代码片段
public final void await() throws InterruptedException {
    //如果当前线程被中断则直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //把当前节点加入条件队列
    Node node = addConditionWaiter();
    //释放掉已经获取的独占锁资源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果不在同步队列中则不断挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //中断处理,另一种跳出循环的方式
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
    //这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
    //删除条件队列中被取消的节点
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    //根据不同模式处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

/1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
//2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
private Node addConditionWaiter() {
    Node t = lastWaiter;
    //如果最后一个节点被取消,则删除队列中被取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {

        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

//删除取消节点的逻辑虽然长,但比较简单就是链表删除
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

//入参就是新创建的节点,即当前节点
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁(可以考虑下这个逻辑放在共享锁下面会发生什么?)
        int savedState = getState();
        //跟独占锁释放锁资源一样,不赘述
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            //如果这里释放失败,则抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        //如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中只需要判断最后一个节点是否被取消就可以了
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

//如果不在同步队列就继续挂起(signal操作会把节点加入同步队列)
 while (!isOnSyncQueue(node)) {
       LockSupport.park(this);
       //中断处理后面再分析
       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
 }
//判断节点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
    //快速判断1:节点状态或者节点没有前置节点
    //注:同步队列是有头节点的,而条件队列没有
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
    if (node.next != null) 
        return true;
    //上面如果无法判断则进入复杂判断
    return findNodeFromTail(node);
}

//注意这里用的是tail,这是因为条件队列中的节点是被加入到同步队列尾部,这样查找更快
//从同步队列尾节点开始向前查找当前节点,如果找到则说明在,否则不在
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

 while (!isOnSyncQueue(node)) {
       LockSupport.park(this);
       //这里被唤醒可能是正常的signal操作也可能是中断
       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
 }
 //这里的判断逻辑是:
 //1.如果现在不是中断的,即正常被signal唤醒则返回0
 //2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
 private int checkInterruptWhileWaiting(Node node) {
       return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
 }

 //修改节点状态并加入同步队列
 //该方法返回true表示节点由中断加入同步队列,返回false表示由signal加入同步队列
 final boolean transferAfterCancelledWait(Node node) {
    //这里设置节点状态为0,如果成功则加入同步队列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //与独占锁同样的加入队列逻辑,不赘述
        enq(node);
        return true;
    }
    //如果上面设置失败,说明节点已经被signal唤醒,由于signal操作会将节点加入同步队列,我们只需自旋等待即可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
 }

//在处理中断之前首先要做的是从同步队列中成功获取锁资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
//由于当前节点可能是由于中断修改了节点状态,所以如果有后继节点则执行删除已取消节点的操作
//如果没有后继节点,根据上面的分析在后继节点加入的时候会进行删除
if (node.nextWaiter != null) 
      unlinkCancelledWaiters();
if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);

//根据中断时机选择抛出异常或者设置线程中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
      if (interruptMode == THROW_IE)
           throw new InterruptedException();
      else if (interruptMode == REINTERRUPT)
           //实现代码为:Thread.currentThread().interrupt();
           selfInterrupt();
}

//条件队列唤醒入口
public final void signal() {
   //如果不是独占锁则抛出异常,再次说明条件队列只适用于独占锁
   if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
   //如果条件队列不为空,则进行唤醒操作
   Node first = firstWaiter;
   if (first != null)
        doSignal(first);
}

//该方法就是把一个有效节点从条件队列中删除并加入同步队列
//如果失败则会查找条件队列上等待的下一个节点直到队列为空
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&(first = firstWaiter) != null);
}

//将节点加入同步队列
final boolean transferForSignal(Node node) {
    //修改节点状态,这里如果修改失败只有一种可能就是该节点被取消,具体看上面await过程分析
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //该方法很熟悉了,跟独占锁入队方法一样,不赘述
    Node p = enq(node);
    //注:这里的p节点是当前节点的前置节点
    int ws = p.waitStatus;
    //如果前置节点被取消或者修改状态失败则直接唤醒当前节点
    //此时当前节点已经处于同步队列中,唤醒会进行锁获取或者正确的挂起操作
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

小总结

至此,AQS的关键特性在这里都分析完毕了,这里做个总结。

  • 我们首先关注独占锁和共享锁的实现,其次关注它公平/非公平,超时,条件中断的特性。

  • 整个AQS依靠一个state状态,两个队列(CLH和ConditionObject)来实现,队列中都是Node节点,节点中的状态:0,SINAL,CONDITION,- - --- CANCEL都在分析中出现过了,相信以后知道这些都是在什么时候用的了。

  • AQS在许多设计思路都值得借鉴,比如典型的自旋等待、CLH队列。

参考

https://www.cnblogs.com/onlywujun/articles/3531568.htmlhttp://ifeve.com/introduce-abstractqueuedsynchronizer/https://www.cnblogs.com/lfls/p/7615982.html?utm_source=debugrun&utm_medium=referral