Bootstrap

Java锁总论

一. 概述

在SMP体系结构下,往往出现执行并发执行等情况的出现;在编程时,考虑并发性问题;基于并发性问题,java提供了锁机制来限制对竞争资源的操作。当抢到锁时,才能访问该资源。那么在java体系中,都有哪些锁呢,且是如何体现的呢?

二. java锁以及原理

在java开发中,对某项资源或者某一项代码块进行加锁的形式有两种。一种是通过synchronize关键字进行加锁;另外一种是声明Lock对象进行加锁。

2.1 synchronize

2.1.1 使用场景

该关键字可以修饰类方法,锁的对象就是对应的类。如下代码:该锁的对象就是Test类

public class Test{
  public static synchronized void compute(int i){
    //.....
  }   
}

也可以修饰类实例方法,锁的对象就是对应的类实例。如下代码:该锁的对象就是Test对象

public class Test{
  public synchronized void compute(int i){
    //.....
  }   
}

修饰类实例方法,粒度有点大,还可以缩小范围,修饰代码块。该锁的对象可以指定。

public class Test{
  public void compute(int i){
    synchronized (this){
      //.....
    }
  }   
}

2.1.2 原理

通过 可以得知:修饰方法时,是在方法修饰符上打上ACC_SYNCHRONIZED 标志,表明是原子操作;而修饰代码块,在对应代码开始前加上monitorenter和代码执行完后加上monitorexit指令。

指令的介绍如下:

为了理解虚拟机规范说讲述的,我将以例子的形式进行说明:

public class LockTest {
  public void com6() {
          int i = 9;
          synchronized (LockTest.class) {
              i = 12;
              System.out.println("hello word");
          }
  }
}

查看该字节码:

  Constant pool:
   ......
  #7 = Class              #33            // LockTest
   ......
  #33 = Utf8               LockTest
  
  public void com6();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=2, locals=4, args_size=1
         0: bipush        9
         2: istore_1
         3: ldc           #7      // class LockTest 从常量池中推送至栈顶,注意这个是指针
         5: dup   //复制栈顶的数值并压入栈顶,之所以复制,是为将该指针存放到本地变量内,给monitorexit使用
         6: astore_2
         7: monitorenter   //进入竞争资源区域
         8: bipush        12
        10: istore_1
        11: getstatic     #4          // Field java/lang/System.out:Ljava/io/PrintStream;
        14: ldc           #5         // String hello word
        16: invokevirtual #6         // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        19: aload_2   //将第6步存放的值,压入栈顶,供给下一个指令使用
        20: monitorexit   //推出竞争资源区域
        21: goto          29
        24: astore_3
        25: aload_2
        26: monitorexit  
        27: aload_3
        28: athrow
        29: return

结合字节码以及指令的介绍,monitor对象就是LockTest对象。这里可能会有一个疑惑:为什么26行会有monitorexit指令?是由于异常情况。因为20行的退出指令是正常情况释放锁的,而异常情况下是不会执行到的,而26行的退出指令就是异常情况执行的;

自从JDK1.6后,JDK团队对synchronized关键字锁对应的锁进行了优化;具体的细节,可以参考

PS. 一个概念Mark Word结构,说明一下;该结构存在对象头部。32位架构的结构如下:

其中无锁的“对象的HashCode”以及偏向锁的“线程ID”都可以理解;而轻量级锁中的“指向栈中锁记录的指针”是什么意思呢?通过《深入理解Java虚拟机》第2版中第13章 线程安全与锁优化介绍,在当前线程的栈帧中建立一个名为锁记录的空间,用于存储对象目前的Mark Work的拷贝,然后通过CAS操作将对象的Mark Word更新为指向Lock Record的指针。

2.2 Lock

Lock是JDK提供出来的一个接口,具体的锁的特性交给子类来实现。其暴露的接口如下:

基于Condition的使用,有如下例子。详细的原理会在2.3.3 ConditionObject里有讲解

import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockConditionTest {
    private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

    private Lock lock = new ReentrantLock();

    private int maxSize = 5;
    //生产者
    private Condition providerCondition = lock.newCondition();
    // 消费者
    private Condition consumerCondition = lock.newCondition();

    public void provide(String value) {
        try {
            lock.lock();
            while (queue.size() == maxSize) {
                //当队列满了以后,停止当前线程执行,并将该线程保存到providerCondition对象里面
                System.out.println(System.currentTimeMillis()+"-"+Thread.currentThread().getName()+"-provide queue = " + queue);
                providerCondition.await();
            }
            System.out.println("provide - value = " + value);
            queue.add(value);
            //激活消费者线程继续消费队列里面的消息,如果consumerCondition里没有阻塞线程,不用做任何处理
            consumerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public String consume() {
        String result = null;
        try {
            lock.lock();
            while (queue.size() == 0) {
                //当队列空了,停止当前线程,并将该线程保存到consumerCondition对象里面
                System.out.println(System.currentTimeMillis()+"-"+Thread.currentThread().getName()+" -consume queue = " + queue);
                consumerCondition.await();
            }
            result = queue.poll();
            System.out.println("consume - result = " + result);
            //激活生产者线程继续生产消息
            providerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return result;
    }

    public static void main(String[] args) {
        LockConditionTest t = new LockConditionTest();
        new Thread(() -> {
            int i = 0;
            while (true) {
                t.provide(i++ + "");
            }
        }).start();
        new Thread(() -> {
            while (true) {
                t.consume();
            }
        }).start();
    }
}

2.2.1 ReentrantLock

这个是可重入锁。具体的使用如下:

Lock lock = new ReentrantLock();

里面的方法可以参考上一章节介绍。

需要说明的是ReentrantLock内部调用了AbstractQueuedSynchronizer类,也就是我们常说的AQS,后面将重点解读该类。

2.2.2 **Lock与**LockView

WriteLock与ReadLock是ReentrantReadWriteLock类的内部类;

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();

具体的介绍,有ReentrantReadWriteLock详细解读

WriteLockView与ReadLockView是StampedLock内部类:

StampedLock stampedLock = new StampedLock();
Lock readLock = stampedLock.asReadLock();
Lock writeLock = stampedLock.asWriteLock();

具体的介绍,有StampedLock详细解读

2.3 AbstractQueuedSynchronizer

从类名可以看出来,它是个抽象类,同时也包含队列特性,还有更加关键的是提供原子操作功能;

2.3.1 属性

A. 队列 从下面的代码,可以看出它是链表结构;当线程去获取锁时,发现锁已经被使用,那么就会自动将当前线程设置休眠状态,同时封装成Node对象,插入到AQS队列中去。等待锁的释放再去竞争。

private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
  volatile int waitStatus; //thread的状态
  volatile Node prev; // 上一个节点指针
  volatile Node next; //下一个节点指针
  volatile Thread thread; //线程
  Node nextWaiter; //当使用lock.newCondition时将记录下一个阻塞线程。而没有使用的情况下,将记录是排他模式还是共享模式
  //.......
}

B. 原子操作 JDK封装Unsafe类提供原子操作,其原理是通过CAS机制进行原子操作。

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
//.....
static {
  try {
    stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
   	//.....
  } catch (Exception ex) { throw new Error(ex); }
}

C state 是共享变量。当并发情况下,通过CAS操作达到线程间”通信“。这里的“通信”就是加锁释放锁等信息。

2.3.2 方法

下面将对主要的方法进行解读

1. acquire

public final void acquire(int arg) {  
  if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

tryAcquire是没有实现的,具体交给各个子类去实现。其大体的逻辑是,如果能获取该“资源”,则返回true, 否则返回false。未能获取“资源”。则将当前阻塞,并插入到队尾。

private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

当当前队列没有阻塞线程节点时,将创建一个空节点保存到head, 将入参的节点保存到尾部tail.

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize      
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      //
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

插入到队尾后,代码如下。该逻辑是:判断刚插入的节点的前驱节点(上一个节点)是否是头部节点。如果是,则尝试再次获取资源。意味者:当当前队列没有阻塞线程时,当前线程会有“两次”尝试获取资源的动作当尝试获取资源成功后,则将当前线程节点设置为头部节点。如果没有获取资源,则设置前驱节点的waitStatus为Node.SIGNAL状态。然而头部节点的状态是0,则会再一次循环,再次尝试获取资源。所以,当当前队列没有阻塞线程时,当前线程会有三次尝试获取资源的动作

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      //获取上一个节点的对象
      final Node p = node.predecessor(); 
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

为什么会将该节点设置为头部节点,而不是将该节点删除掉呢,毕竟该节点已经获取资源了?因为Head节点永远都是无用节点,也就是一个空节点;这里稍微提前说一下,释放资源release方法,该方法将head的后驱节点,也就是下一个节点的线程激活。

private void setHead(Node node) {
  head = node;
  node.thread = null;
  node.prev = null;
}

依然获取不到资源,则将当前节点的当前线程失效,放弃CPU资源。

private final boolean parkAndCheckInterrupt() {
  //释放CPU资源,等待其他线程来激活
  LockSupport.park(this);
  return Thread.interrupted();
}

当被其他线程激活时,则会重复之前的步骤,来进行尝试获取资源。

2. acquireInterruptibly

该方法支持中断,当尝试获取资源时,直接中断,会丢出一个中断异常。如果不是,则尝试获取资源。

public final void acquireInterruptibly(int arg)
  throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
}

当获取不到资源,该逻辑跟acquire的逻辑差不多。只是稍微不同的是,当该线程被中断,则直接丢出异常,而不是再次尝试获取资源。

private void doAcquireInterruptibly(int arg)
  throws InterruptedException {
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

3. tryAcquireNanos

这里跟acquireInterruptibly方法逻辑差不多,只是多出两个逻辑:

private boolean doAcquireNanos(int arg, long nanosTimeout)
  throws InterruptedException {
  if (nanosTimeout <= 0L)
    return false;
  //截至时间戳
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
      }      
      nanosTimeout = deadline - System.nanoTime();
      //当过期,则说明已经超时,直接返回
      if (nanosTimeout <= 0L)
        return false;
      //spinForTimeoutThreshold = 1000纳秒
      //当间隔时间大于自旋时间,则直接休眠该间隔时间
      if (shouldParkAfterFailedAcquire(p, node) &&
          nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
      if (Thread.interrupted())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

4. acquireShared

tryAcquireShared方法交给子类来实现。只有当返回大于等于0时,则说明获取该资源成功

public final void acquireShared(int arg) {
  if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
}

将当前线程封装成一个节点,该节点的nextWaiter是共享模式,插入队尾里面。

private void doAcquireShared(int arg) {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        //再次尝试获取资源
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          //说明获取到该资源,则会头节点以及设置“传递”机制
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      //请参考acquire章节的介绍
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

里面跟acquire章节的介绍逻辑差不多。我们重点看一下setHeadAndPropagate方法。只有当有资源且资源是大于0,或者是头节点的当前线程的状态是小于0时,才会获取node的下一个节点,去判断是否是共享模式的。如果是,则激活正在阻塞的共享模式下的线程。

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // Record old head for check below
  //请参考acquire章节的介绍
  setHead(node);
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;           
    if (s == null || s.isShared())
      doReleaseShared();
  }
}

当头部节点的状态为SIGNAL时,说明有阻塞线程存在,则可以直接激活该线程;该线程没有抢占到CPU资源时,说明16行的代码判断条件为真,则直接跳出去。如果该线程抢占到CPU资源了,但是依然获取不到资源,也不会更改head节点,条件也是为真,也会直接跳出去;如果获取到资源了,则更改head节点指针,条件为假,则再一次循环做判断;所以,该方法会激活所有正在阻塞的所有线程。

当状态为0时,则说明无阻塞线程存在,说明当前线程是的上一个前驱节点(上一个节点)就是头结点。当执行到16行时,条件为真,则直接跳出循环;

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;            // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
    }
    //当头结点没有发生变化,说明这个时间段没有线程释放
    if (h == head)                   // loop if head changed
      break;
  }
}

5. release

public final boolean release(int arg) {
  //释放该资源
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      //表明有阻塞线程在队列中,则去激活
      unparkSuccessor(h);
    return true;
  }
  return false;
}

之所以要将该节点的waitStatus置为0,是为了刚进来的线程有多一次的尝试获取资源机会;而不是直接让该线程直接释放其CPU资源;有利于提供性能;

private void unparkSuccessor(Node node) {
  int ws = node.waitStatus;
  if (ws < 0)
    //将该节点的等待状态置为0
    compareAndSetWaitStatus(node, ws, 0);
  
  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    //当下一个节点为空,说明没有阻塞线程;大于0说明,说该阻塞线程已经cancel了
    //会从尾部往前遍历,最后一个阻塞线程的。
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);
}

6. releaseShared

doReleaseShared可以参考“acquireShared”里面介绍。

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

2.3.3 ConditionObject

现在我们着重解读一下condition,其实现类为ConditionObject的原理;

 public class ConditionObject implements Condition, java.io.Serializable {
   private transient Node firstWaiter;//第一个节点
   private transient Node lastWaiter;//第二个节点
   //...
 }

按照2.2.1 ReentrantLock的介绍,要先获取锁;根据上文的对该方法的介绍,会获取该资源;

final void lock() {
  acquire(1);
}

获取成功后,条件满足会调用condition.await()方法,进行阻塞。

2.3.3.1 condition.await

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  //将当前线程封装成Node,保存到队列里面
  Node node = addConditionWaiter();
  //释放所有资源
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  //是否是同步队列,这个地方很关键
  while (!isOnSyncQueue(node)) {
    //释放当前线程CPU资源,阻塞状态
    LockSupport.park(this);
    //检测是不是被中断了
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  //acquireQueued在上面有讲解过,这里就不用在一一说明了
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
    //清理掉不是Condition的所有节点。
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}
final boolean isOnSyncQueue(Node node) {
  //当condition.signal()时,会将该状态置为0;详细的讲解稍微给出
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
  if (node.next != null) // If has successor, it must be on queue
    return true;
  return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
  Node t = tail;
  for (;;) {
    if (t == node)              
      return true;
    if (t == null)
      return false;
    t = t.prev;
  }
}

2.3.3.2 condition.signal

public final void signal() {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)
    //激活first的线程
    doSignal(first);
}
private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    first.nextWaiter = null;
  } while (!transferForSignal(first) &&
           (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
  //这里将node的状态置为0
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  //将node插入到队尾里
  Node p = enq(node);
  int ws = p.waitStatus;
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
  return true;
}

2.3.4 汇总

通过以上的几个关键方法的介绍,我们得知:

2.3.4.1 Node. waitStatus

static final int CANCELLED =  1; //表示当前的线程被取消
static final int SIGNAL    = -1;//表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作
static final int PROPAGATE = -3;//表示当前场景下后续的acquireShared能够得以执行。
//值为0,表示当前节点在sync queue中,等待着获取锁

2.3.4.2 Node.nextWaiter

//当没有使用lock.newCondition情况下,该属性的值只能为如下两中状态
static final Node SHARED = new Node();//共享模式
static final Node EXCLUSIVE = null;//排他模式
//当在使用lock.newCondition的情况,阻塞的节点都会存放该属性上,以此为链路

2.4 AbstractQueuedSynchronizer实现类

上文有说过,其有几个方法没有实现,交给子类来实现;

protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
  throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
  throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
  throw new UnsupportedOperationException();
}

为什么没有实现该方法呢,由于AbstractQueuedSynchronizer里有一个state属性,我们一直没有注重解释过。上文我们一直说获取该“资源”,那“资源”代表是什么,就是对state进行处理校验,判断符合指定条件下,才能获取资源成功。

private volatile int state;

2.4.1 Semaphore

这是是信号锁,其内部有一个属性sync,该对象实现了AbstractQueuedSynchronizer类。代码有点多,索性就不粘贴了,直接阐述就差不多了。

初始化,会将state设置指定的值。例如,state = 3,当第一次获取锁时,发现state >= 0。 将state修改为2. 并直接返回2. 上游方法会判断是否大于等于0.如果条件为真,则说明获取锁;否则获取锁失败。

所以,每次获取锁,state都会-1.每次释放锁,state都会+1。换句话说,state在代表是同时允许state个线程去执行;使用这个类,可以有效的控制线程数量同时执行;

2.4.2 ReentrantReadWriteLock

读写锁,意味者:允许可以多个线程并发读操作,当有一个线程在写操作时,其他线程都会被阻塞。

这里的state分为两部分,高16位代表多少个线程在做读操作,而低16位代表当前写操作的线程做了多少lock操作

例如,初始化读写锁后.

2.4.3 CountDownLatch

这是一个或多个线程等待其他线程完成后,才会执行它自身的操作。举个例子:E、F线程等待A、B、C、D线程完成后,E、F才会被激活去完成自己的操作;如下例子:

CountDownLatch countDownLatch = new CountDownLatch(4);
Thread A = new Thread(()->{
  countDownLatch.countDown();
  System.out.println("A thread run.... ");
});
Thread B = new Thread(()->{
  countDownLatch.countDown();
  System.out.println("B thread run.... ");
});
Thread C = new Thread(()->{
  countDownLatch.countDown();
  System.out.println("C thread run.... ");
});
Thread D = new Thread(()->{
  countDownLatch.countDown();
  System.out.println("D thread run.... ");
});
Thread E = new Thread(()->{
  try {
    countDownLatch.await();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  System.out.println("E thread run.... ");
});
Thread F = new Thread(()->{
  try {
    countDownLatch.await();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  System.out.println("F thread run.... ");
});
A.start();
B.start();
C.start();
D.start();
E.start();
F.start();

创建CountDownLatch对象,会初始化state的值,该值有构成函数传入;当调用await方法时,会判断state是否等于0,如果条件为真,则说明无需等待其他线程完成,因为state=0代表其他线程已经完成了。

如果条件为假,则将当前线程插入到队列中去;等待其他线程来激活;调用countDown方法,会将state-1,当state=0时,会去激活其他阻塞线程。

注意,CountDownLatch只能用一次,不能复用;

2.4.4 ThreadPoolExecutor

在线程池中有一个内部类Worker实现了。这里逻辑比较简单,就不多阐述了。里面有一行代码,需要重新留意的,

其实代码也说明了,获取锁是为了避免线程中暴露的方法去暴力中断该线程。

2.4.5 公平锁与非公平锁

在AbstractQueuedSynchronized的实现字类中,都会有公平锁和非公平锁,之所以单独抽离来讲解,而不是在各个子类进行阐述,是因为他们的业务逻辑都是一样的。

公平锁,当队列有阻塞的线程时,先有新的线程A尝试获取锁,则就会将线程A直接插入队尾中去。

非公平锁,当队列有阻塞的线程时,先有新的线程A尝试获取锁,则会尝试获取锁,如果获取锁,则直接执行线程A后续的操作,否则将线程A插入到队尾中,等待其他线程被释放。

2.5 CyclicBarrier

这个类跟CountDownLatch类有点类似;允许多个线程之间相互等待,当大家都达到那个“点"后,多个线程才被激活,继续执行后续的操作。

CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
Thread A = new Thread(()->{
  try {                
    cyclicBarrier.await();
  } catch (Exception e) {
    e.printStackTrace();
  }
  System.out.println("A thread run.... ");
});
Thread B = new Thread(()->{
  try {    
    cyclicBarrier.await();
  } catch (Exception e) {
    e.printStackTrace();
  }
  System.out.println("B thread run.... ");
});
Thread C = new Thread(()->{
  try {
    cyclicBarrier.await();
  } catch (Exception e) {                
    e.printStackTrace();
  }
  System.out.println("C thread run.... ");
});
Thread D = new Thread(()->{
  try {   
    cyclicBarrier.await();
  } catch (Exception e) {
    e.printStackTrace();
  }    
  System.out.println("D thread run.... ");
});
Thread E = new Thread(()->{
  try {
    cyclicBarrier.await();
  } catch (Exception e) {
    e.printStackTrace();
  }
  System.out.println("E thread run.... ");
});
Thread F = new Thread(()->{
  try {
    cyclicBarrier.await();
  } catch (Exception e) {
    e.printStackTrace();
  }
  System.out.println("F thread run.... ");        
});
A.start();
B.start();
C.start();
D.start();
E.start();
F.start();

创建CyclicBarrier对象,会初始化state的值,该值有构成函数传入。当调用await方法时,会将state-1,接着判断state是否等于0,如果条件为真,则说明其他线程都已经达到了这个”点“,激活其他线程线程,并且重置state为原始值(可以复用)。如果条件为假,则阻塞当前线程,将当前线程插入到队列中去。

2.5.1 CyclicBarrier与CountDownLatch的区别

2.6 StampedLock

ReentrantReadWriteLock是重量级的读写锁,而StampedLock是轻量级读写锁。为什么这么说呢?

有多个读操作线程在执行且没有释放锁,当有一个写操作的线程A进来,ReentanttReadWriteLock会直接阻塞线程A,并且插入到队尾中;而StampedLock会认为锁很快就会被释放,所以自旋一定的次数依然获取不到锁,才阻塞并插入到队尾中。

常规的用法跟ReentrantReadWriteLock差不多。

StampedLock stampedLock = new StampedLock();
long stamp = stampedLook.writeLock();//获取写锁
long stamp = stampedLock.readLock();//获取读锁
stampedLock.unLock(stamp);
Lock readLock = stampedLock.asReadLock();//获取读操作锁对象
readLock.lock();
readLock.unLock();
Lock writeLock = stampedLock.asWriteLock();//获取读操作锁对象
writeLock.lock();
writeLock.unLock();

StampedLock的结构跟ReentrantReadWriteLock对象的结构差不多;该实现原理跟其是差不多的。但并没有内部实现AbstractQueuedSynchronized抽象,而是自己实现一套新的方案,且新增自旋获取逻辑(乐观锁)。

StampedLock的state的组成结构:低7位代表的是读操作的线程数量,第8位代表的是写操作标志位,其余的位数代表写操作的版本数,即每次写操作,都会加1次

StampedLock还提供了不加锁的且更高性能的原子化操作。如下代码

StampedLock stampedLock = new StampedLock();
//如果第8没有标志,即没有写操作线程,则置state读操作数量为0后返回。
//如果有写操作的,则返回0
long stamp = sl.tryOptimisticRead();
//添加内存屏障,保障前面的代码被执行完。同时比对现有stampedLock的state值中的版本数是否与stamp一致。
//以及是否写标志位是否置为1.
//如果一致,则说明目前阶段没有写操作对该资源进行更新。数据是最新的。
//如果不一致,则说明已经有写操作正在更新过程中,则进行去获取读锁。
if (!stampedLock.validate(stamp)) {
  stamp = sl.readLock();
  try {
    //.....
  } finally {
    sl.unlockRead(stamp);
  }    
}

StampedLock的其他方法,根据该方法名可以猜测出具体做的功能,不是很难理解,就不一一介绍了。

另外,需要解读一下自旋获取锁的关键方法,acquireWrite方法解读。

private long acquireWrite(boolean interruptible, long deadline) {
  //p是parent的缩写,通过自旋依然获取不到锁,会将当前线程封装节点node插入到队尾。
  //p就是node的父节点,也就是前驱节点。
        WNode node = null, p;
 		 //通过自旋,尝试去获取锁。当自旋次数完后,依然没有获取锁,则初始化一个节点,插入到未节点中去
    	//,接着跳出该循环
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
          // 当低8位都为0时,说明没有读写操作的线程,则尝试将state+128.
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    return ns;
            }
            else if (spins < 0)
              //当state的写标志位为1,则说明有一个线程正在做写操作。那么设置spin=64,可以自旋至少64次。
              //尝试去获取锁。
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if ((p = wtail) == null) { // initialize queue
              //如英文注释,先初始化队列
              //注意,这里的模式为写模式。
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
              //初始化一个节点
                node = new WNode(WMODE, p);
            else if (node.prev != p)
              //将初始化的节点的前驱指针指向未节点。
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
              //插入到队尾,跳出循环
                p.next = node;
                break;
            }
        }

        for (int spins = -1;;) {
          // np是new Parent的缩写,
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
              //当刚插入的节点的父节点为头部节点,则说明只有当前线程正在获取锁,并没有其他线程跟它竞争锁
              //则可以自旋尝试去获取锁。
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    if (((s = state) & ABITS) == 0L) {
                      //当低8位都为0时,说明没有任何线程,则尝试将state+128.即将第8为置为1.
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                          ///......重点....
                          //将刚才插入的节点设置为头结点。
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0)
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
              //头结点不为空时,则将节点中的阻塞的读操作线程激活。
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
          //当没有其他线程去修改头结点时,意味者没有其他线程已经获取锁资源。
            if (whead == h) {
                if ((np = node.prev) != p) {
                  //修复链表中的链路,当p节点为cancel状态,会更改其连续,具体可以看到下面的cancel逻辑
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                  //设置父节点为等待状态
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                  //如果是cancel状态,则更该其链路
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                      //阻塞当前节点
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

private long acquireRead(boolean interruptible, long deadline) {
  //p 指向的尾节点
        WNode node = null, p;
        for (int spins = -1;;) {
            WNode h;
          //当当前队列没有任何阻塞的线程时,通过自旋去获取锁
            if ((h = whead) == (p = wtail)) {
                for (long m, s, ns;;) {
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        return ns;
                    else if (m >= WBIT) {
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;
                        }
                        else {
                            if (spins == 0) {
                                WNode nh = whead, np = wtail;
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            spins = SPINS;
                        }
                    }
                }
            }
            if (p == null) { // initialize queue
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
              //注意。这里是读模式
                node = new WNode(RMODE, p);
            else if (h == p || p.mode != RMODE) {
              //当父节点的模式不是读模式,则将node节点插入到队尾中。并跳出这个死循环。
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                  //重点代码
                    p.next = node;
                    break;
                }
            }
          //如果是读模式下的,则将node插入p节点中的cowait头部。
            else if (!U.compareAndSwapObject(p, WCOWAIT,
                                             node.cowait = p.cowait, node))
              
                node.cowait = null;
            else {
                for (;;) {
                    WNode pp, c; Thread w;
                  //激活头结点的cowait列表中线程
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null) // help release
                        U.unpark(w);
                    if (h == (pp = p.prev) || h == p || pp == null) {
                      //当父节点是头结点,或者父节点的父节点是头结点或空,则说明当前node是排到最前面的。
                      //意味者写操作刚刚释放锁,或者cancel掉了;
                      //又或者是另外的一种情景:当前没有写操作,然而循环自旋,
                      //一直做tryIncReaderOverflow返回0.这个方法,稍后讲解。
                        long m, s, ns;
                        do {
                            if ((m = (s = state) & ABITS) < RFULL ?
                                U.compareAndSwapLong(this, STATE, s,
                                                     ns = s + RUNIT) :
                                (m < WBIT &&
                                 (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT);
                    }
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                          //当其他线程释放锁时,或者父节点已经cancel状态,才会跳出这个死循环。
                          //换句话说,只有node的前驱节点都执行完,才会跳出该循环。
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                            whead == h && p.prev == pp)
                          //阻塞
                            U.park(false, time);
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {              
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
              ////自旋获取锁
                for (int k = spins;;) { // spin at head
                    long m, s, ns;
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        WNode c; Thread w;
                        whead = node;
                        node.prev = null;
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                                       c, c.cowait) &&
                                (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    }
                    else if (m >= WBIT &&
                             LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
            }
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 &&
                        (p != h || (state & ABITS) == WBIT) &&
                        whead == h && node.prev == p)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

private long tryIncReaderOverflow(long s) {
  //当读操作数量等于126时,且没有其他线程更改state,才会操作成功。
  //否则直接返回失败。
        if ((s & ABITS) == RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
                ++readerOverflow;
                state = s;
                return s;
            }
        }
        else if ((LockSupport.nextSecondarySeed() &
                  OVERFLOW_YIELD_RATE) == 0)
            Thread.yield();
        return 0L;
    }

三. 并发设计模式

我们在平常写代码,或多或少会涉及设计模式,装饰模式,组合模式,工厂模式,策略模式,观察者模式等;那么在编写并发代码,也会有对应的设计模式可以借鉴,从而是我们的代码更加美观,可拓展。

具体的内容,可以仔细阅读《Java 多线程编程》设计模式篇

详细的就不多说了

四. 锁的底层原理

在操作层面,锁是如何实现的;其主要是依赖于硬件层面上的逻辑处理。在其他文章也有涉及到。

《 Intel® 64 and IA-32 Architectures Software Developer’s Manual, Volume 3A》第8章,可以详细的去阅读该章节的知识点。然而我没花太多时间,只看到关键的介绍以及参考其他文章进行总结。

The Intel 64 and IA-32 architectures provide mechanisms for managing and improving the performance of multiple processors connected to the same system bus. These include:

Bus locking(总线锁) and/or cache coherency(缓存一致性) management for performing atomic operations on system memory.

• Serializing instructions.

.......

4.1 总线锁

重量级锁。当某个指令加上了# lock,意味该指令访问的地址被加上一个锁,阻塞其他CPU访问该地址。

4.2 缓存锁

轻量级锁。采用缓存一致性(MES)协议,保证缓存一致性问题。及保证了各个CPU访问的缓存都是一致性的。

在第2章节,我们所介绍的各种锁基本都是采用该机制。synchronized关键比较特殊,刚开始是采用缓存锁来处理,当一直获取不到锁时,会升级到重量级锁,也就是总线索。

有关总线索以及缓存锁更详细的介绍,可以参考 前半部分。

5. 汇总

在操作系统层面的锁机制,有两种实现方式:

在java体系中,基本上都是采用缓存一致性原理来实现锁;

java体系中,最核心的就是AQS抽象类,进行竞争锁,阻塞其他线程,激活其他线程等线程通信。

六. 参考资源