参考文档:深入理解Java并发框架AQS系列(二):AQS框架简介及锁概念

1. AQS 框架和概念

1.1 思考

我们去学习一个知识点或开启一个新课题时,最好是带着问题去学习,这样针对性比较强,且印象比较深刻,主动思考带给我们带来了无穷的好处

抛开AQS,设想以下问题:

  • Q:如果我们遇到 thread 无法获取所需资源时,该如何操作?

  • A:不断重试呗,一旦资源释放可快速尝试获取

  • Q:那如果资源持有时长较长,不断循环获取,是否比较浪费CPU ?

  • A:的确,那就让线程休息1秒钟,再尝试获取,这样就不会导致CPU空转了

  • Q:那如果资源在第0.1秒时被释放,那线程岂不是要白白等待0.9秒了 ?

  • A:实在不行就让当前线程挂起,等释放资源的线程去通知当前线程,这样就不存在等待时间长短的问题了

  • Q:但如果资源持有时间很短,每次都挂起、唤醒线程成为了一个很大的开销

  • A:那就依情况而定,lock时间短的,就不断循环重试,时间长的就挂起

  • Q:如何界定lock的时间长短?还有就是如果lock的时间不固定,也无法预期呢?

  • A:唔。。。这是个问题

  • Q:如果线程等待期间,我想放弃呢?

  • A:。。。。。。

  • Q:还有很多问题

    • 如果我想动态增加资源呢?
    • 如何我不想产生饥饿,而保证加锁的有序性呢?
    • 或者我要支持/不支持可重入特性呢?
    • 我要查看所有等待资源的线程状态呢?
    • 。。。。。。

我们发现,一个简单的等待资源的问题,牵扯出后续诸多庞杂且无头绪的问题;加锁不仅依赖一套完善的框架体系,还要具体根据使用场景而定,才能接近最优解;那我们即将要引出的AQS能完美解决上述这些问题吗?

答案是肯定的:不能

其实 Doug Lea 也意识到问题的复杂性,不可能出一个超级工具来解决所有问题,所以他把 AQS 设计为一个 abstract 类,并提供一系列子类去解决不同场景的问题,例如 ReentrantLockSemaphore 等;当我们发现这些子类也不能满足我们加锁需求时,我们可以定义自己的子类,通过重写两三个方法,寥寥几行代码,实现强大的功能,这一切都得益于 AQS 作者敏锐的前瞻性

指的一提的是,虽然我们可以用某个子类去实现另一个子类所提供的功能(例如使用 Semaphore 替代 CountDownLatch),但其易用、简洁、高效性等能否达到理想效果,都值得商榷;就好比在陆地上穿着雪橇走路,虽能前进,却低效易摔跤

1.2 并发框架

AQS 是什么,AbstractQueuedSynchronizer 类如其名,抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的 ReentrantLock/Semaphore/CountDownLatch。

它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。state 的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

本小节仅带大家对AQS架构有个初步了解,在后文的独占锁、共享锁等中会详细阐述。下图为AQS框架的主体结构

2109301-20210312100329988-1019373521

从上图中我们看到了 AQS 中非常关键的一个概念:“阻塞队列”。即 AQS 的理念是当线程无法获取资源时,提供一个 FIFO 类型的有序队列,用来维护所有处于“等待中”的线程。看似无解可击的框架设计,同时也牵出另外的一个问题:阻塞队列一定高效吗?

当“同步块逻辑”执行很快时,我们列出两种场景

  • 场景1:直接使用 AQS 框架,例如试用其子类 ReentrantLock,遇到资源争抢,放阻塞队列
  • 场景2:因为锁占用时间短,无限重试

针对这2种场景,我们写测试用例比较一下

package com.example.springboottest;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author likangning
 * @since 2021/3/9 上午8:58
 */
@SpringBootTest
public class CompareTest {

  // 自己实现 AQS 框架,使用无限重试的方式,
  private class MyReentrantLock extends AbstractQueuedSynchronizer {
    // 参数是个状态标记,可以自定义,可参考可重入锁
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      while (true) {
        int c = getState();
        if (c == 0) {
          // 原子操作
          if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
          }
        }
      }
    }

    // 参数要与锁定的时候的 acquires 相同
    protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
      }
      // 原子操作
      setState(c);
      return free;
    }
  }

  /**
   * 使用实现 AQS 框架的 ReentrantLock
   */
  @Test
  public void test1() throws InterruptedException {
    ReentrantLock reentrantLock = new ReentrantLock();
    long begin = System.currentTimeMillis();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 2; i++) {
      executorService.submit(() -> {
        for (int j = 0; j < 50000000; j++) {
          reentrantLock.lock();
          doBusiness();
          reentrantLock.unlock();
        }
      });
    }
    executorService.shutdown();
    // 堵塞,直到线程池运行完毕
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    System.out.println("ReentrantLock cost : " + (System.currentTimeMillis() - begin));
  }

  /**
   * 无限重试
   */
  @Test
  public void test2() throws InterruptedException {
    MyReentrantLock myReentrantLock = new MyReentrantLock();
    long begin = System.currentTimeMillis();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 2; i++) {
      executorService.submit(() -> {
        for (int j = 0; j < 50000000; j++) {
          myReentrantLock.tryAcquire(1);
          doBusiness();
          myReentrantLock.tryRelease(1);
        }
      });
    }
    executorService.shutdown();
    // 堵塞,直到线程池运行完毕
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    System.out.println("MyReentrantLock cost : " + (System.currentTimeMillis() - begin));
  }

  private void doBusiness() {
    // 空实现,模拟程序快速运行
  }
}

上例,虽然MyReentrantLock继承了AbstractQueuedSynchronizer,但没有使用其阻塞队列。我们每种情况跑5次,看下两者在耗时层面的表现

耗时1耗时2耗时3耗时4耗时5平均耗时(ms)
ReentrantLock114251230112289102621146111548
MyReentrantLock8717895710283844589289066

上例只是拿独占锁举例,共享锁也同理。可以简单概括为:线程挂起、唤醒的时间占整个加锁周期比重较大,导致每次挂起、唤醒已经成为一种负担。当然此处并不是说 AQS 设计有什么缺陷,只是想表达并没有一种万能的框架能应对所有情况,一切都要靠使用者灵活理解、应用。

1.3 类结构及如何使用

AQS 类内部结构

2109301-20210315110701067-122196209

因后文还会反复涉及,此处仅罗列2点

  • private volatile int state 重要属性,一般不论是实现独占锁还是共享锁,都要进行 CAS 操作的字段。独占锁时,如果通过 cas 将其从 0 改变为 1 的话,那么标记加锁成功;而在共享锁时,则表示支持并发数的最大值。
  • isHeldExclusively() 标记是否持有线程:AQS虽然为抽象类,但其继承了类AbstractOwnableSynchronizer,用来标记加锁线程,但AQS本身不依赖这个属性,也不会设置这个属性,实现类如果需要可以直接重新此方法。一般实现可重入特性需要重写该方法

而我们常用的锁并发类,基本上都是AQS的子类或通过组合方式实现,可见AQS在Java并发体系的重要性

2109301-20210312100356854-1934745395

至于如何使用,是需要区分子类是想实现独占锁还是共享锁

  • 独占锁
    • tryAcquire()
    • tryRelease()
    • isHeldExclusively() -- 可不实现
  • 共享锁
    • tryAcquireShared()
    • tryReleaseShared()

AQS 本身是一个 abstract 类,将主要并发逻辑进行了封装,我们定义自己的并发控制类,仅需要实现其中的两三个方法即可。而在对外( public 方法)表现形式上,可依据自己的业务特性来定义;例如 Semaphore 定义为 acquire、 release,而 ReentrantLock 定义为 lock 、unlock

2. 独占锁

2.1 简介

相信我们平时写独占锁的程序大抵是这样的:

ReentrantLock lock = new ReentrantLock();
try {
  lock.lock();
  doBusiness();
} finally {
  lock.unlock();
}

上述代码分为三部分:

  • 加锁 lock.lock()
  • 执行同步代码 doBusiness()
  • 解锁 lock.unlock()

加锁部分,一定是众矢之的,兵家争抢的要地,对于高并发的程序来说,同一时刻,大量的线程争相涌入,而lock()则保证只能有一个线程进入doBusiness()逻辑,且在其执行完毕unlock()方法之前,不能有其他线程进入。所以相对而言,unlock()方法相对轻松,不用处理多线程的场景.

本章中,我们引入节点中一个关键的字段 waitStatus (后文简写为 ws),在独占锁模式中,可能会使用到的等待状态如下:

  • 0: 初始状态,当一个节点新建时,其默认 ws 为0
  • SIGNAL (-1): 如果某个节点的状态为 SIGNAL,即表明其后续节点处于(或即将处于)阻塞状态。所以当前节点在执行完同步代码或被取消后,一定要记得唤醒其后续节点
  • CANCELLED (1): 顾名思义,即取消操作的含义。当一个节点等待超时、或者被打断、或者执行 tryAcquire 发生异常,都会导致当前节点取消。而当节点一旦取消,便永远不会再变为 0 或者 SIGNAL 状态了

2.2 加锁(核心)

我们先上一张 ReentrantLock 加锁功能(非公平)的整体流程图,在并发或关键部分有注释

2109301-20210325204703110-854718401

第一眼看上去,确实有点复杂,不过不用怕,我们逐一分析解读后,它其实就是只纸老虎

大体上可以分为三大部分

  1. 加入阻塞队列
  2. 阻塞队列调度
  3. 异常处理

按照正常的理解,可能只会有前两部分就够了,为什么会有第三部分呢?什么时候会发生异常?

2.2.1 加入阻塞队列

当一个线程尝试加锁失败后,便会放入阻塞队列的队尾;这节我们来讨论一下这个动作的细节

在加入阻塞队列之前,首先会查看头节点是否为 null,如果是 null 的话,需要新建 ws 为 0 的头结点,(为什么在 AQS 初始化的时候,不直接新建头结点呢?其实由此可见作者细节处理的严谨,因为如果当我们的独占锁并发度不大,在尝试加锁的过程中,总能获取到锁,这时便不会向阻塞队列添加内容,假如初始化便新建头结点,会导致其白白占用内存空间而得不到有效利用)

然后将当前节点添加至阻塞队列的尾部,当然头结点初始化、向尾部节点追加新节点都是通过 CAS 操作的。

而阻塞队列呢,正如我们前文提及的是一个 FIFO 的队列,且带有 next、prev 两个引用来标记前、后节点;我们在阻塞队列中加入第一个节点后,阻塞队列的样子:

2109301-20210325204735010-1042358658

2.2.2 阻塞队列调度

这一节属于独占锁很核心的部分,里面涉及 ws 更改、线程挂起与唤醒、更换头结点等

我们接着上节继续,在节点进入调度后,首先检查下当前节点的前节点是否为 head 节点,如果是的话,那么有一次尝试加锁的机会,加锁成功或失败将导致 2 个分支。

我们首先看加锁加锁成功的情况,一旦加锁成功,当前节点便从阻塞队列中“消失”(其实是当前节点变为了头结点,而原头结点内存不可达,等待垃圾回收),当所有节点都加锁成功,阻塞队列便为空了,但并不代表阻塞队列的长度为 0,因为有头结点的存在,所以空阻塞队列的长度是 1。

2109301-20210325204756683-429471471

而加锁失败或者当前节点的前节点不是 head 节点呢?是马上将线程挂起吗?答案是不确定的,要看前节点的 ws 状态而定。而此步骤还有个隐藏任务:将当前节点之前的所有已取消节点从阻塞队列中剔除。

2109301-20210325204819274-1649271927

从上图中我们看到,一个节点如果想正常进入挂起状态,那么一定要将前节点的 ws 改为SIGNAL (-1)状态,但如果前节点已经变为CANCELLED (1)状态后,就要递归向前寻找第一个非 CANCELLED 的节点。

针对“线程挂起并等待其他线程唤醒”,我们提出2个问题

问题1

  • 如果是普通节点,直接挂在队尾,且将其线程挂起,这个没啥问题;但如果是头节点被唤醒,尝试加锁却失败了,又被再次挂起,会不会导致头结点永远处于挂起状态?
  • 答:不会,因为头结点之所以抢锁失败,一定是因为另外一个A线程抢锁成功。虽然头节点暂时处于挂起状态,但当A线程执行完加锁代码后,还会再次唤醒头结点

问题2

  • 假定当前节点判定需要被挂起,在执行挂起操作前,拥有锁的线程执行完毕,并唤醒了当前线程,而当前线程又马上要进行挂起操作,岂不是会导致无法成功将当前节点唤醒,从而永远 hang 死?
  • 答:能考虑到这个问题,说明你已经带着分身去思考问题了,不错。不过此处是不会存在这个问题的,因为线程挂起、唤醒使用的api为 park/unpark,即便是 unpark 发生在 park 之前,在执行 park 操作时,也会成功唤醒。这个特质区别于 wait/notify

而针对阻塞队列的调度,还有一些没有解释的问题:

  • 为什么阻塞队列内有这么多CANCELLED状态的节点?
  • 当前节点在挂起前,前节点为SIGNAL状态,但经过一段时间运行,前节点变为了CANCELLED状态,岂不是导致当前节点永远无法被唤醒?

要回答这两个问题,就要引出异常处理了

2.2.3 异常处理

我们首先讨论如果 AQS 不做异常处理可以吗? 不可以,例如第一个节点被唤醒后,在加锁阶段发生了异常,如果没有异常处理,这个异常节点将永远处于阻塞队列,成为“僵尸节点”,且后续节点也不会被唤起

官方标明可能会出现异常的部分,诸如“等待超时”、“打断”等,那如果我们调用 acquire() 方法,而非acquireInterruptibly()tryAcquireNanos(time)是不是就不会出现异常?

不是的,因为还有 AQS 下放给我们自己实现的 tryRelease() 等方法。我们实现一个自己的 AQS,并模拟 tryRelease() 报错,看 AQS 能否正常应对

public class FindBugAQS {

    public volatile static int FLAG = 0;
  
    private static ThreadLocal<Integer> FLAG_STORE = new ThreadLocal<>();
  
    private static ThreadLocal<Integer> TIMES = ThreadLocal.withInitial(() -> 0);
  
    private Sync sync = new Sync();
  
    private static class Sync extends AbstractQueuedSynchronizer {
  
      private Sync() {
        setState(1);
      }
  
      public void lock() {
        FLAG_STORE.set(++FLAG);
        int state = getState();
        if (state == 1 && compareAndSetState(state, 0)) {
          return;
        }
        acquire(1);
      }
  
      @Override
      protected boolean tryAcquire(int acquires) {
        if (FLAG_STORE.get() == 2) {
          Integer time = TIMES.get();
          if (time == 0) {
            TIMES.set(1);
          } else {
            // 模拟发生异常,第二个节点在第二次访问tryAcquire方法时,将会扔出运行期异常
            System.out.println("发生异常");
            throw new RuntimeException("lkn aqs bug");
          }
        }
        int state = getState();
        if (state == 1 && compareAndSetState(state, 0)) {
          return true;
        }
        return false;
      }
  
      @Override
      protected final boolean tryRelease(int releases) {
        setState(1);
        return true;
      }
  
      public void unlock() {
        release(1);
      }
    }
  
    public void lock() {
      sync.lock();
    }
  
    public void unlock() {
      sync.unlock();
    }
  
  }
  
  // 测试用例如下:
  
  public class BugTest {
    private static volatile int number = 0;
  
    @Test
    public void test2() throws InterruptedException {
      List<Thread> list = Lists.newArrayList();
      FindBugAQS aqs = new FindBugAQS();
      Thread thread1 = new Thread(() -> {
        aqs.lock();
        PubTools.sleep(5000);
        number++;
        aqs.unlock();
      });
      thread1.start();
      list.add(thread1);
  
      PubTools.sleep(500);
  
      for (int i = 0; i < 4; i++) {
        Thread thread2 = new Thread(() -> {
          aqs.lock();
          PubTools.sleep(500);
          number++;
          aqs.unlock();
        });
        thread2.start();
        list.add(thread2);
      }
  
      for (Thread thread : list) {
        thread.join();
      }
      System.out.println("number is " + number);
    }
  }
}

运行结果:

发生异常
Exception in thread "Thread-1" java.lang.RuntimeException: lkn aqs bug
	at org.xijiu.share.aqs.bug.FindBugAQS$Sync.tryAcquire(FindBugAQS.java:42)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:863)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
	at org.xijiu.share.aqs.bug.FindBugAQS$Sync.lock(FindBugAQS.java:31)
	at org.xijiu.share.aqs.bug.FindBugAQS.lock(FindBugAQS.java:64)
	at org.xijiu.share.aqs.bug.BugTest.lambda$test2$2(BugTest.java:61)
	at java.lang.Thread.run(Thread.java:748)
number is 4

我们自定义了 AQS 实现类 FindBugAQS.java,模拟第二个节点在第二次访问 tryAcquire 会扔出异常;然后启动5个线程,对 number 进行累加。可见,最后的结果符合预期,AQS 处理的很完美。那程序发生异常后,阻塞队列究竟如何应对?

举例说明吧,假定现在除去头结点外,阻塞队列中还有3个节点,当第1个节点被唤醒执行时,发生了异常,那么第1个节点会将ws置为CANCELLED,且将向后的链条打断(指向自己),但向前链条保持不变,并唤醒下一个节点

2109301-20210325205004061-1851650904

由上图可见,当某个节点响应中断/发生异常后,其会主动打断向后链条,但依旧保留向前的链条,这样做的目的是为了后续节点在寻找前节点时,可以找到标记为CANCELLED状态的节点,而不是找到null。至此便解答了3.2提出的两个问题

a、为什么阻塞队列内有这么多CANCELLED状态的节点?

  • 当被调度执行的节点发生了异常,状态便会更改为CANCELLED状态,但仍存在于阻塞队列中,直到正常执行的节点将其剔除

b、当前节点在挂起前,前节点为SIGNAL状态,但经过一段时间运行,前节点变为了CANCELLED状态,岂不是导致当前节点永远无法被唤醒?

  • 不会,节点发生异常后,会主动唤起后续节点,而后续节点负责将前节点从阻塞队列中删除

2.3 解锁

本来想针对“解锁逻辑”画一张流程图,但猛然发现解锁部分仅仅10行左右的代码,那就索性把源码贴上,逐一论述下

AQS 解锁源码

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLock 解锁源码

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

我们发现当tryRelease()方法返回true时,AQS便会负责唤醒后续节点,因为ReentrantLock支持了可重入的特性,所以当前线程的每次加锁都会对state累加,而每次tryRelease()方法则会对state累减,直到state变为初始状态0时,tryRelease()方法才会返回true,即唤醒下一个节点

解锁逻辑相对简洁,且不存在并发,本文不再赘述

2.4 总结

再次强调本文是通过 ReentrantLock 的视角来分析独占锁,且主要分析的是ReentrantLock.lock()/unlock()方法,目的是让大家对AQS整体的数据结构有个全面认识,方便后续在实现自己的并发框架时,明白api背后发生的事情,做到游刃有余.

而像ReentrantLocklockInterruptibly()tryLock(TimeUnit)或者其他独占锁的实现类,读者可自行阅读源码,原理类似,核心代码也是一样的

3. 共享锁

3.1 简介

独占锁虽说在j.u.c中有现成的实现,但在JAVA的语言层面也同样提供了支持(synchronized);但共享锁却是只存在于AQS中,而它在实际生产中的使用频次丝毫不亚于独占锁,在整个AQS体系中占有举重若轻的地位。而在某种意义上,因为可能同时存在多个线程的并发,它的复杂度要高于独占锁。本章除了介绍共享锁数据结构等,还会重点对焦并发处理,看 doug lea 在并发部分是否有遗漏

j.u.c下支持的并发锁有SemaphoreCountDownLatch等,本章我们采用经典并发类Semaphore来阐述。

共享锁其实是相对独占锁而言的,涉及到共享锁就要聊到并发度,即同一时刻最多允许同时执行线程的数量。

但共享锁的并发度也可以设置为1,此时它可以看作是一个特殊的独占锁。

waitStatus

在独占锁章节中,我们介绍到了关键的状态标记字段waitStatus,它在独占锁的取值有

  • 0
  • SIGNAL (-1)
  • CANCELLED (1)

而这些取值在共享锁中也都存在,含义也保持一致,而除了上述这3个取值外,共享锁还额外引入了新的取值:

  • PROPAGATE (-3)

-3这个取值在整个AQS体系中,只存在于共享锁中,它的存在是为了更好的解决并发问题,我们将在后文中详细介绍

使用场景

本人参加的某性能挑战赛中,有这样一个场景:数据产生于CPU,且有12个线程在不断的制造数据,而这些数据需要持久化到磁盘中,由于数据产生的非常快,此时的瓶颈卡在 IO 上;磁盘的性能经过基准测试,发现每次写入 8K 数据,且开 4 个线程写入时,能将 IO 打满;但如何控制在同一时刻,最多有4个线程进行 IO 写入呢?

其实这是一个典型的使用共享锁的场景,我们用三四行代码即可解决

// 设置共享锁的并发度为4
Semaphore semaphore = new Semaphore(4);
// 加锁
semaphore.acquire();
// 执行数据存储
storeIO();
// 释放锁
semaphore.release();

3.2 并发

3.2.1 独占锁 vs 共享锁

共享锁的整体流程与独占锁相似,都是首先尝试去获取资源(子类逻辑,一般是CAS操作)

  • 如果能拿到资源,那么进入同步块执行业务代码;当同步块执行完毕后,唤醒阻塞队列的头结点
  • 如果资源已空,那么进入阻塞队列并挂起,等待被其他线程唤醒

两者的不同点在什么地方呢?

就在于“唤醒阻塞队列的头结点”的操作。在独占锁时,唤醒头结点的操作,只会有一个线程(加锁成功的线程调用release())去触发;而在共享锁时,可能会有多个线程同时去调用释放。

2109301-20210408180441391-1515154854

直观感觉这样设计不太合理:如果多个线程同时去唤醒头结点,而头结点只能被唤醒一次,假定阻塞队列中有20个节点,那这些节点只能等待上一个节点执行完毕后才会被唤醒,无形中共享锁的并发度变成了1。要解决这个疑问,我们先来看共享锁的释放逻辑。

3.2.2 锁释放

先来思考一下锁释放需要做的事儿

  1. 阻塞队列的第一个节点一定要被激活;这个问题看似不值一提,却相当重要,区别于独占锁,共享锁的锁释放是存在并发的,在高并发的流量下,一定要保证阻塞队列的第一个有效节点被激活,否则会导致阻塞队列永久性的挂死
  2. 保证激活阻塞队列时的并发度;这个问题同样也是独占锁不存在的,也就是我们在上面提出的问题;假定这样一种场景:“共享锁的并发度为10,阻塞队列中有100个待处理的节点,而此时又没有新的加锁请求,如何保证在激活阻塞队列时,保持10的并发度?”

共享锁如何解决这两个问题呢?我们接下来逐一阐述

3.2.2.1 调用点

与独占锁不同,共享锁调用“锁释放”有2个地方(注:AQS的一个阻塞队列是可以同时添加独占节点、共享节点的,为了简化模型,我们这里暂不讨论这种混合模型)

  • 某线程同步块执行完毕,正常调用解锁逻辑;此点与独占锁一致
  • 在每次更换头结点时,如果满足以下任一条件,同样会调用“锁释放”;更换头结点的操作,其实此时已经意味着当前线程已经加锁成功
    • 有额外的资源可用;拿信号量举例,当发现信号量数量>0时,表示有额外资源可用
    • 旧的头结点或当前头结点的ws < 0

那这两个点调用的时候,是否存在并发呢?有同学会说第一点并发,第二点是串行的”;其实此处第二点也是存在并发的,例如线程1更换了head节点后,准备执行“锁释放”逻辑,正在此时,线程2正常锁释放后,唤醒了新的head节点(线程3),线程3又会执行更换head节点,并准备执行“锁释放”逻辑;此时线程1跟线程3都准备执行“锁释放”逻辑

2109301-20210408180510153-195535666

既然“锁释放”存在这么多并发,那就一定要保证“锁释放”逻辑是幂等的,那它又是如何做到呢?

3.2.2.2 锁释放

直接贴一下它的源码吧,释放锁的代码寥寥几笔,却很难说它简单

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

对应的流程图如下:

2109301-20210408180544743-1180456736

我们简单描述一下锁释放做的事儿

  1. 首选获取头结点的快照,并将其赋予变量h,同时获取h.waitStatus,并标记位ws

  2. 判断 ws 的状态

    • ws == -1 表示下一个节点已经挂起,或即将挂起。如果只要发现是-1状态,就进行线程唤起的话,因为存在并发,可能导致目标线程被唤起多次,故此处需要通过CAS进行抢锁,保证只有一个线程去唤起
    • ws == 0 如果发现节点ws为0,此处会存在两种情况(情况1:节点刚新建完毕,还未进入阻塞队列;情况2:节点由-1修改为了0),不管哪种情况,都强制将其由-1改为-3,标记位强制传播,此处是否存在漏洞?
    • ws == -3 表示当前节点已经被标识为强制传播了,直接结束
  3. 如果此时 h == head,说明在上述逻辑发生时,头结点没有发生变化,那么结束当前操作,否则重复上述步骤。注:AQS中所有节点只有一次当头结点的机会,也就是某个节点当过一次头结点后,便会被抛弃,再无可能第二次成为头结点,这点至关重要

根据以上分析,我们发现,节点的状态流转是通过ws来控制的,即0、-1、-3,乍看上去,貌似不太严谨,那我们来做具体分析

3.2.2.3 ws 状态转换

仅有2个功能点会对ws进行修改,一是将节点加入阻塞队列时,二就是3.2.1中描述的调用锁释放逻辑时;

我们将加入阻塞队列时ws的状态流转再回忆下:

  • 状态为0(初始状态),加入阻塞队列前,需要将前节点修改为-1,然后进入线程挂起
  • 状态为-3(强制传播状态,被解锁线程标记),加入阻塞队列前,同样需要将前节点修改为-1,然后进入线程挂起

综述,我们出一张ws的整体状态流转图

2109301-20210408180608133-342046168

由上图可得知,只要解锁逻辑成功通过CAS将head节点由-1修改为0的话,那么就要负责唤醒阻塞队列中的第一个节点了

整个流转过程有bug吗?我们设想如下场景:共享锁的并发度设置为1,A、B两个线程同时进入加锁逻辑,B线程成功抢到锁,并开始进入同步块,A线程抢锁失败,准备挂到阻塞队列,正常流程是A线程将ws由0修改为-1后,进入挂起状态,但B线程执行较快,已经优先A线程并开始执行解锁逻辑,将ws由0修改为了-3,然后B线程正常结束;A线程发现ws为-3后,将其修改为-1,然后进入挂起。 如果这个场景真实发生的话,A线程将永久处于挂起状态,那岂不是存在漏洞?

然而事实并非如此,因为只要A线程将ws修改为-1后,都要再尝试进行一次获取锁的操作,正是这个操作避免了上述情况的发生,可见aqs是很严谨的

2109301-20210408180635712-863533682

3.2.3 保证并发度

阻塞队列中节点的激活顺序是什么样呢?其实上文已经描述的较为清楚,解锁的逻辑只负责激活头节点,那如何保证共享锁的并发度?

我们还是假定这样一个场景:共享锁的并发度为5,阻塞队列中有20个节点,只有head节点已被唤醒,且没有新的请求进入,我们希望在同一时刻,同时有5个节点处于激活状态。针对上述场景,aqs如何做到呢?

2109301-20210408180703912-1867945441

其实head节点被激活时,在第一时间会通知后续节点,并将其唤醒,然后才会执行同步块逻辑,保证了等待中的节点快速激活

4. 条件队列

4.1 介绍

AQS 中的条件队列相比较前文中的“独占锁”、“共享锁”等比较独立,即便没有条件队列也丝毫不影响诸如 ReentrantLockSemaphore 类的实现,那如此说来条件队列是否就是一个可有可无的产物?答案是否定的,我们来看下直接或间接用到条件队列的 JDK 并发类:

  • ReentrantLock 独占锁经典类
  • ReentrantReadWriteLock 读写锁
  • ArrayBlockingQueue 基于数组的阻塞队列
  • CyclicBarrier 循环栅栏,解决线程同步问题
  • DelayQueue 延时队列
  • LinkedBlockingDeque 双向阻塞队列
  • PriorityBlockingQueue 支持优先级的无界阻塞队列
  • ThreadPoolExecutor 线程池构造器
  • ScheduledThreadPoolExecutor 可基于时间调度的线程池构造器
  • StampedLock 邮戳锁,1.8后引入,更高效的读写锁

如此豪华的阵容,可见Condition的地位不可小觑

我们简单描述下条件队列实现的功能:有3个线程A、B、C,分别调用wait/await方法后,线程进入阻塞,在没有其他线程去唤醒的情况下,3个线程将永远处于阻塞状态。此时如果有另外线程调用notify/signal,那么A、B、C线程中的某一个将被激活(根据其进入条件队列的顺序而定),从而执行后续的逻辑;如果调用notifyAll/signalAll的话,那么3个线程都将被激活,这可能是我们对条件队列的简单认识。这样的描述是否准确呢?可能不太严谨,我们引入JDK的条件队列来做说明

统一话术:其实语法层面支持的wait/notify与AQS都属于JDK的范畴,但为了区分两者,我们定义如下:

  • JDK条件队列:语法层面提供支持的wait/notify,即Object类中的wait()/notify()方法
  • AQS条件队列:AQS提供的条件队列,即AQS内部的ConditionObject

4.2 JDK 中的条件队列 (wait/notify)

众所周知,在JDK中,wait/notify/notifyAll是根对象Object中内置的方法,且方法均被定义为native本地方法

// 等待
public final native void wait(long timeout) throws InterruptedException;
// 唤醒
public final native void notify();
// 唤醒所有等待线程
public final native void notifyAll();

4.2.1 wait

// 步骤1
synchronized (obj) {
  // 步骤2
  before();
  // 步骤3
  obj.wait();
  // 步骤4
  after();
}

相信大家对上述代码并不陌生,我们将JDK的条件队列抽象为4步,逐一阐述

  • 步骤1: synchronized (obj)

    • 在jdk中如果想调用Object.wait()方法,必须首先获取该对象的synchronized锁,当前步骤,如果成功获取到锁,那么将进入“步骤2”,如果存在并发,当前线程将会进入阻塞(线程状态为BLOCKED),知道获取到锁为止
  • 步骤2: before()

    • 我们知道synchronized是独占锁,所以在执行步骤2代码时,程序是不存在并发的,即同一时刻,只有一个线程正在执行,此处也相对好理解
  • 步骤3: obj.wait()

    • 此步骤是将当前线程放入条件队列,同时释放obj的同步锁。此处跟我们对synchronized的认知有悖,我们一般认为synchronized (obj) {......}在大括号中的代码会一直持有锁,而事实情况却是,当程序执行wait()方法时,会释放obj的同步锁
  • 步骤4: after()

    • 此步骤是并发执行还是串行执行?

      假设我们现在有3个线程A、B、C都已经执行完毕wait()方法,并进入了条件队列,等待其他线程唤醒;此时另外一个线程执行了notifyAll()时,后续的激活流程是怎么样的?

      • 错误观点:有很多同学直观感受是,线程A、B、C同时被激活,所以步骤4是并发执行的;就像是百米赛跑,所有同学都准备就绪(wait),一声枪响后(notifyAll),所有人开始赛跑,并跑到终点(步骤4)
      • 正确观点:其实“步骤4”是串行执行的,大家再检查下代码后便可发现,“步骤4”处于synchronized的大括号之间;还是拿上述赛跑举例,如果认为从听到枪响至跑到终点是“步骤4”的话,那真实的场景应该是这样的:一声枪响后,A起跑,B、C原地不动;A跑到终点后,B开始起跑,C原地不动;最后是C跑到终点

由此我们断定,obj.wait()虽然是native方法,但其内部经历了释放锁、重新抢锁的两个大环节

4.2.2 notify

synchronized (obj) {
  obj.notify();
  // obj.notifyAll();
}

所有因obj.wait()阻塞的线程,都要通过notify来唤醒

  • notify() 唤醒条件队列中,队首节点
  • notifyAll() 唤醒条件队列中所有节点

4.3 AQS 中的条件队列(await/signal)

我们初看AQS中的条件队列时,发现其提供了与JDK条件队列几乎一致的功能

JDKAQS
waitawait
notifysingal
notifyAllsingalAll

用法上也及其相似:

await

// 初始化
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
  lock.lock();
  condition.await();
} catch (InterruptedException e) {
  e.printStackTrace();
} finally {
  lock.unlock();
}

singal

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
  lock.lock();
  condition.signal();
} finally {
  lock.unlock();
}

4.3.1 条件队列

我们知道在AQS内部维护了一个阻塞队列,数据结构如下:

2109301-20210428094448063-798568915

上图描述的是一个长度为 3 的FIFO阻塞队列,因为头结点常驻内存,所以不算在内;我们可以发现阻塞队列中每个节点都包含了前、后引用

那AQS内部的另一个条件队列又是什么样的数据结构呢?

2109301-20210428094514179-1229024253

可见,条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。

上图描述的是一个长度为 5 的条件队列,即有5个线程执行了await()方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的waitStatus为 -2 。

当有新节点加入时,将会追加至队列尾部。

4.3.2 唤醒

当我们调用signal()方法时,会发生什么?我们还是拿长度为 5 的条件队列举例说明,在AQS内部会经历队列转移,即由条件队列转移至阻塞队列

2109301-20210428094539039-1519593306

signalAll()执行时,具体执行流程与signal()类似,即会将条件队列中的所有节点全部转移至阻塞队列(并发度为1,按顺序依次激活)中,依靠阻塞队列自身依次唤醒的机制,达到激活所有线程的目的

4.4 JDK vs AQS

经过上文的介绍,似乎AQS做了与wait/notify相同的功能,相比较而言,甚至JDK的写法更简洁;那他们在性能上的表现如何呢?让我们来做个对比。

4.4.1 对比

我们模拟这样的一个场景:启动10个线程,分别调用wait()方法,当所有线程都进入阻塞后,调用notifyAll(),10个线程均被唤醒并执行完毕后,方法结束。 上述方法执行10000次,对比JDK与AQS耗时

JDK测试代码:

public class ConditionCompareTest {

  @Test
  public void runTest() throws InterruptedException {
    long begin = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
      if (i % 1000 == 0) {
        System.out.println(i);
      }
      jdkTest();
    }
    long cost = System.currentTimeMillis() - begin;
    System.out.println("耗时: " + cost);
  }
  
  public void jdkTest() throws InterruptedException {
    Object lock = new Object();
    List<Thread> list = Lists.newArrayList();
    // 步骤一:启动10个线程,并进入wait等待
    for (int i = 0; i < 10; i++) {
      Thread thread = new Thread(() -> {
        try {
          synchronized (lock) {
            lock.wait();
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.start();
      list.add(thread);
    }

    // 步骤二:等待10个线程全部进入wait方法
    while (true) {
      boolean allWaiting = true;
      for (Thread thread : list) {
        if (thread.getState() != Thread.State.WAITING) {
          allWaiting = false;
          break;
        }
      }
      if (allWaiting) {
        break;
      }
    }

    // 步骤三:唤醒10个线程
    synchronized (lock) {
      lock.notifyAll();
    }

    // 步骤四:等待10个线程全部执行完毕
    for (Thread thread : list) {
      thread.join();
    }
  }
}

AQS 测试代码

public class ConditionCompareTest {
  private ReentrantLock lock = new ReentrantLock();
  private Condition condition = lock.newCondition();

  @Test
  public void runTest() throws InterruptedException {
    long begin = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
      if (i % 1000 == 0) {
        System.out.println(i);
      }
      aqsTest();
    }
    long cost = System.currentTimeMillis() - begin;
    System.out.println("耗时: " + cost);
  }

  @Test
  public void aqsTest() throws InterruptedException {
    AtomicInteger lockedNum = new AtomicInteger();
    List<Thread> list = Lists.newArrayList();
    // 步骤一:启动10个线程,并进入wait等待
    for (int i = 0; i < 10; i++) {
      Thread thread = new Thread(() -> {
        try {
          lock.lock();
          lockedNum.incrementAndGet();
          condition.await();
          lock.unlock();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.start();
      list.add(thread);
    }

    // 步骤二:等待10个线程全部进入wait方法
    while (true) {
      if (lockedNum.get() != 10) {
        continue;
      }
      boolean allWaiting = true;
      for (Thread thread : list) {
        if (thread.getState() != Thread.State.WAITING) {
          allWaiting = false;
          break;
        }
      }
      if (allWaiting) {
        break;
      }
    }

    // 步骤三:唤醒10个线程
    lock.lock();
    condition.signalAll();
    lock.unlock();

    // 步骤四:等待10个线程全部执行完毕
    for (Thread thread : list) {
      thread.join();
    }
  }
}
条件队列耗时1耗时2耗时3耗时4耗时5平均耗时(ms)
JDK500050765054508949425032
AQS535854405444547354725437

4.4.2 基准测试Q&A

基于以上的测试我们还是有一些疑问的,不要小看这些疑问,通过这些疑问我们可以把之前的知识点全都串联起来

  • Q:AQS测试中的“步骤二”,为什么在判断“等待10个线程全部进入wait方法”时,要引入lockedNum.get() != 10的判断?直接通过判断所有线程是否均为waiting方法不可以吗?
  • A:如果真的删除lockedNum.get() != 10的判断,在多次并发测试时,会有较小的概率出现程序死锁的情况(作者电脑的环境是平均5万次调用会出现一次),为什么会出现死锁呢?我们追AQS源码就会发现,不管是调用lock()还是await,挂起线程使用的方法均为LockSupport.park()方法,此方法会将线程置为WAITING状态,也就是线程状态是WAITING状态时,有可能线程刚进入lock()方法,从而导致awaitthread.join()的死锁
  • Q:既然是这样,为什么JDK的测试没有出现死锁?
  • A:我们看到JDK的加锁是通过synchronized关键字完成的,而当线程因为等待synchronized资源而阻塞时,线程状态将变为BLOCKED,而进入wait()方法后,状态才会变为WAITING
  • Q:那看来只有通过引入AtomicInteger lockedNum变量才能解决死锁问题了
  • A:其实解决问题的方式有很多种,我们甚至可以简单将ReentrantLock lock置为公平锁,也能解决上述死锁问题;因为当前场景发生死锁的情况是,singalAll()先于await()发生,而当所有线程都变成WAITING状态后,公平锁则确保了singalAll()一定是在所有线程都调用了await()。但因为synchronized本身是非公平锁,故如果AQS使用公平锁的话,性能偏差较大
  • Q:那这样看来,AQS中的阻塞队列相对比JDK的没有优势可言啊,用法上没有JDK简洁,性能上还没人家快
  • A:的确,如果真是只是单纯的使用阻塞、唤醒功能的话,还是建议使用JDK内置的方式;但AQS的优势并不在此

4.5 再说 AQS 条件队列

AQS的优势在于,其提供了丰富的api可以查询条件队列的状态;例如当我们想看一下在条件队列中等待节点的个数时,使用JDK的wait/notify时,是无法做的;

AQS 提供的 api 如下:

  • boolean hasWaiters() 阻塞队列中是否有等待节点
  • int getWaitQueueLength() 获取阻塞队列长度
  • Collection<Thread> getWaitingThreads() 获取阻塞队列中线程对象

这些 api 为程序提供了更灵活的控制,条件队列对于 javaer 已不是黑盒;当然使用 AQS 的条件队列必然要引入独占锁,例如ReentrantLock,自然地我们还可以通过它查看条件队列外围的一些指标,例如:

  • Interrupted 响应中断,借助独占锁,提供响应中断能力; wait/notify不提供,因为虽然wait方法响应中断,但是synchronized关键字是会一直阻塞的
  • boolean tryLock() 尝试获取锁; wait/notify不提供
  • int getHoldCount() 获取阻塞线程的数量
  • boolean isLocked() 是否持有锁
  • fair/nonFair 提供公平/非公平锁
  • ...

可见整个AQS体系相比较Objectwait/notify方法是相当灵活的,提供了很多监控条件队列、阻塞队列的指标