Bootstrap

AQS介绍和原理分析(下)

概述

本文首先用导图总结一下上一篇文档写的内容,然后通过Mutex,ReentrantLock说明如何使用AQS,同时关注公平和非公平这个条件,最后关注一下可中断和条件这两个特性。上一篇的总结:

Mutex&ReentrantLock&公平/非公平

在jdk8的current包中并没有找到Doug Lea的Mutex类,不过这个不影响,就当我们自己写的也一样。

class Mutex implements Lock, java.io.Serializable {
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取资源,立即返回。成功则返回true,否则false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,立即返回。成功则为true,否则false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定为1个量
            if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 真正同步类的实现都依赖继承于AQS的自定义同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。两者语义一样:释放资源。
    public void unlock() {
        sync.release(1);
    }

    //锁是否占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

读过AQS代码前面的注释,就会了解这种实现就是注释里面建议的AbstractQueuedSynchronizer使用方式。

  • 通过内部类Sync继承AbstractQueuedSynchronizer,通过实现tryAcquire()/tryRelease()或者ryAcquireShared()/tryReleaseShared()方法实现独占/共享方式获取锁。

  • 定义私有变量sync,提供外部接口lock()/unlock()。

接下来再来看一下ReentrantLock(主要关注它关于公平和非公平的特性)

ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer

Sync又有两个子类:

final static class NonfairSync extends Sync
final static class FairSync extends Sync  

显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁

非公平锁

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

公平锁

static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

共同的Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
      
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        final boolean isLocked() {
            return getState() != 0;
        }
    }

这里只贴出了部分代码。小总结:

  • 这里和Mutex的实现方式相似,都是通过内部的Sync类来实现。

  • 公平锁和非公平锁的区别只是在获取锁的时候(在入AQSCLH队列之前)代码有点区别,其它的都是一样的。一旦进入了队列,所有线程都是按照队列中先来后到的顺序请求锁。

  • 非公平锁的代码中总是优先尝试当前是否有线程持有锁,一旦没有任何线程持有锁,那么非公平锁就霸道的尝试将锁“占为己有”。

ps:小插曲。这个时候如果有人再问Lock和Synchronized的区别,可以这么回答了:

AbstractQueuedSynchronizer通过构造一个基于阻塞的CLH队列容纳所有的阻塞线程,而对该队列的操作均通过Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock实现了偏向锁的功能。synchronized的底层也是一个基于CAS操作的等待队列,但JVM实现的更精细,把等待队列分为ContentionList和EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但synchronized还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而Lock则完全依靠系统阻塞挂起等待线程。当然Lock比synchronized更适合在应用层扩展,可以继承AbstractQueuedSynchronizer定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock对应的Condition也比wait/notify要方便的多、灵活的多。

但是后续的问题,只能祝你好运了……(学海无涯啊)

可中断

前提是AQS的acquire是不响应中断的(关于Intereput的知识点,自行百度一下即可,需要知道线程在运行状态下是不响应中断的)。

可中断和超时的实现都是在AbstractQueuedSynchronizer,没有交给子类来实现。如果在获取一个通过网络交互实现的锁时,这个锁资源突然进行了销毁,那么使用acquireInterruptibly的获取方式就能够让该时刻尝试获取锁的线程提前返回。而同步器的这个特性被实现Lock接口中的lockInterruptibly方法。根据Lock的语义,在被中断时,lockInterruptibly将会抛出InterruptedException来告知使用者

public final void acquireInterruptibly(int arg)

    throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    if (!tryAcquire(arg))

        doAcquireInterruptibly(arg);

}

private void doAcquireInterruptibly(int arg)

    throws InterruptedException {

    final Node node = addWaiter(Node.EXCLUSIVE);

    boolean failed = true;

    try {

        for (;;) {

            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null; // help GC

                failed = false;

                return;

            }

            // 检测中断标志位

            if (shouldParkAfterFailedAcquire(p, node) &&

            parkAndCheckInterrupt())

                throw new InterruptedException();

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

上述逻辑主要包括:

1. 检测当前线程是否被中断;

判断当前线程的中断标志位,如果已经被中断了,那么直接抛出异常并将中断标志位设置为false。

2. 尝试获取状态;

调用tryAcquire获取状态,如果顺利会获取成功并返回。

3. 构造节点并加入sync队列;

获取状态失败后,将当前线程引用构造为节点并加入到sync队列中。退出队列的方式在没有中断的场景下和acquireQueued类似,当头结点是自己的前驱节点并且能够获取到状态时,即可以运行,当然要将本节点设置为头结点,表示正在运行。

4. 中断检测。

在每次被唤醒时,进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。

# 超时

针对超时控制这部分的实现,主要需要计算出睡眠的delta,也就是间隔值。间隔可以表示为nanosTimeout = 原有nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。如果nanosTimeout大于0,那么还需要使当前线程睡眠,反之则返回false。

private boolean doAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException {

    long lastTime = System.nanoTime();

    final Node node = addWaiter(Node.EXCLUSIVE);

    boolean failed = true;

    try {

        for (;;) {

            final Node p = node.predecessor();

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null; // help GC

                failed = false;

                return true;

            }

            if (nanosTimeout <= 0)                 return false;             if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)

            LockSupport.parkNanos(this, nanosTimeout);

            long now = System.nanoTime();

            //计算时间,当前时间减去睡眠之前的时间得到睡眠的时间,然后被

            //原有超时时间减去,得到了还应该睡眠的时间

            nanosTimeout -= now - lastTime;

            lastTime = now;

            if (Thread.interrupted())

                throw new InterruptedException();

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

1. 加入sync队列;

将当前线程构造成为节点Node加入到sync队列中。

2. 条件满足直接返回;

退出条件判断,如果前驱节点是头结点并且成功获取到状态,那么设置自己为头结点并退出,返回true,也就是在指定的nanosTimeout之前获取了锁。

3. 获取状态失败休眠一段时间;

通过LockSupport.unpark来指定当前线程休眠一段时间。

4. 计算再次休眠的时间;

唤醒后的线程,计算仍需要休眠的时间,该时间表示为nanosTimeout = 原有nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。其中now – lastTime表示这次睡眠所持续的时间。

5. 休眠时间的判定。

唤醒后的线程,计算仍需要休眠的时间,并无阻塞的尝试再获取状态,如果失败后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超时,没有获取到锁。 如果nanosTimeout小于等于1000L纳秒,则进入快速的自旋过程。那么快速自旋会造成处理器资源紧张吗?结果是不会,经过测算,开销看起来很小,几乎微乎其微。Doug Lea应该测算了在线程调度器上的切换造成的额外开销,因此在短时1000纳秒内就让当前线程进入快速自旋状态,如果这时再休眠相反会让nanosTimeout的获取时间变得更加不精确。

条件中断

参考

https://www.cnblogs.com/onlywujun/articles/3531568.html

http://ifeve.com/introduce-abstractqueuedsynchronizer/