AQS源码解析
AQS 是并发包的基石。AQS 是一个抽象类,必须要继承才能使用。ReentrantLock
、ReentrantReadWriteLock
,Semaphore
这些并发工具都是依赖 AQS 实现(内部类继承 AQS )的。
AQS内部通过一个FIFO队列管理线程的同步,队列里的节点即是被包装的线程。
本文暂时只讲独占模式。
继承情况
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AQS 继承 AbstractOwnableSynchronizer
,是用于互斥同步的同步器,仅有exclusiveOwnerThread
一个成员变量,表明当前拥有独占访问的线程。
可以看到get 和 set 方法都是 final 的,无法被重写。而且该成员变量没有使用volitile
,其方法也没有做额外的同步,继承类使用时要自己注意。
内部类
Node
:Node即是上图等待队列中的节点。
// AQS 的等待队列是 CLH 队列的变种,CLH 锁通常用于自旋锁。
// 虽然 AQS 是用来做阻塞同步器的,但使用了相同的策略:关于控制该节点状态的值都是在前一个节点持有(waitStatus)。
// 这里可以先记住,Node 点里 waitStatus并不是代表自己的,而是其后继节点的。
// 当一个节点的前置节点释放时,该节点会被signal。等待队列的每个节点都持有一个等待线程,作为一种
// 特殊通知形式的监视器。waitStatus并不控制线程是否被授予锁。
// 队列里第一个节点的线程,可能尝试获取锁。但是第一个节点并不保证成功获取锁。只是有尝试获取的权利。
// 当前被释放的竞争线程可能需要重新等待。
// CLH的入队,只需要原子性地把节点拼接到 tail。出队只需要 set head。
// * +------+ prev +-----+ +-----+
// * head | | <---- | | <---- | | tail
// * +------+ +-----+ +-----+
// CLH 入队只需要在 tail 做一次原子操作,因此从没入队到入队有一个简单的原子分界点。
// 相似的,CLH 出队只需要更新 head。然而,决定其后继几点可能更花时间,部分原因是处理由于和中断和超时导致的取消。
// “prev",指Node的prev引用,在原始的CLH锁里是没有的,在 AQS 里主要用于取消操作。如果一个Node被取消了,它的后继节点通常会重连到一个没有被取消的节点上。
// "next", Node 的 next 引用,用于实现阻塞机制。前置节点唤醒后继节点,就是通过遍历 next。
// 决定后继节点,必须避免新入队的节点 set 前置节点的 next 所发生的竞争。这是通过当一个节点的后继节点为空时,必要时从原子更新的tail向后检查实现的。
// 取消引入了一些保守的算法。因为我们必须轮询其他节点的取消,可能会忽略被取消的节点是在前在后。我们通过取消时总是 unparking 后继节点解决,使被取消的后继节点稳定在一个新的前置节点上。
// CLH 需要一个虚拟的 head 启动。但我们不会在构造时创造 head,因为如果永远没有竞争的话,是浪费。AQS 是在第一次竞争时才会 构造Node,并设置 head和tail的指针。
// 在 Conditions 上等待的线程,使用的是相同的nodes,但使用了一个额外的link(nextWaiter)。Conditions 只需要在简单的无并发队列里 link 节点,因为只在独占时被访问。
// 等待时,Node 被插入到一个 Condition 队列。Signal 时,节点被转移到主队列。一个特殊的 status field 的值标记 node 在哪个队列()。
static final class Node {
// 节点是共享模式的标识 (共享模式的队列)
static final Node SHARED = new Node();
// 节点是独占模式的标识
static final Node EXCLUSIVE = null;
// waitStatus的值 此线程取消了获取锁
static final int CANCELLED = 1;
// waitStatus的值 标识后继节点的线程需要 unparking(唤醒)
static final int SIGNAL = -1;
// waitStatus的值 标识当前节点在 Condition 上 wait
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// waitStatus的值 标识下一个 acquireShared 应该无条件传播
static final int PROPAGATE = -3;
/**
* waitStatus 的值,除上面4个外,还有0。
* SIGNAL: 当前节点的后继节点被阻塞了(通过park),所以当前节点释放或取消时必须 unpark 后继节点。
* 为避免竞争,acquire 方法必须首先表明他们需要一个 signal,然后重试原子的 acquire,失败就阻塞住。
* CANCELLED: 当前节点因为超时或者中断被取消了抢锁。一个 CANCELLED 状态的线程永远不会再次阻塞。
* CONDITION: 当前节点当前处于一个 Condition 队列(处于共享模式)。
* PROPAGATE: 一个 releaseShared 应该被传播到其他 nodes。在 doReleaseShared 时 head节点被设置为这个状态确保传播继续,即使其他操作干预了。
* 0: 普通的同步 node(非 Condition )的初始状态
*/
volatile int waitStatus;
// 前置节点的引用。
// 当前的节点,也就是线程,需要 check 其 prev 的 waitStatus 。
// 在当前节点入队时被赋值,出队时置null(for GC)。
// 在前置节点取消时,我们通过一个简短的 while 循环找到一个没有被取消的节点。这个节点是总是存在的,
// 因为 head 永远不会被取消:一个 node 能成为 head 肯定是 acquire 成功了。
// 一个被取消的线程永远不会在 acquire 时成功,并且线程只能取消自己,不能取消别的 node。
volatile Node prev;
// 后继节点的引用。当前节点 release 时,要 unpark next节点。
// 当前节点入队时赋值,由于取消,该值会被改变,出队时置null(for GC)
// 入队操作在连接完成前不给前置节点的 next 赋值,所以没必要认为 next 为null的就是在队尾。
// 然而,如果出现了 next 为空,我们可以从 tail 开始检查 prev 来 double-check
// 取消节点的 next 指向 node 自身而不是 null, 使 isOnSyncQueue 方法更简单。
volatile Node next;
volatile Node next;
// 当前节点的线程。
volatile Thread thread;
// 在 condition 上等待的 next node 或者 特殊值 SHARED 的引用。
// 因为 condition 队列仅在独占模式被访问,只需要一个简单的队列即可。
// 在 condition 等待队列的 nodes 随后被转移到主队列重新 acquire。
// 因为 condition 只能是排它的,我们用 SHARED 这个特殊值标识是在共享模式。
Node nextWaiter;
// 当前 Node 是不是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回 prev 节点,如果是null,抛出 NullPointerException
// 在 prev 不为 null 时使用。 null-check 可以省略,但有利于GC
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 不同的地方使用的3个构造器
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS 是双向等待队列,是 CLH 队列的变种,Node
类是队列中的节点。类上的注释大多是 AQS 队列的设计思想,可以暂时先对 AQS 有了大致了解后再来理解。
ConditionObject
,用于共享模式,暂时先不讲了。
Field
接来下看 AQS 的 Field
// 等待队列的 head,懒加载。除了初始化之外,只能通过 setHead 方法更改。
// 要注意如果 head 存在,它的 waitStatus 保证不能为 CANCELLED 。
private transient volatile Node head;
// 等待队列的tail,懒加载。只能通过 enq 添加新的等待 node 节点时更改。
private transient volatile Node tail;
// AQS 本身的锁定状态。
private volatile int state;
//继承自AbstractOwnableSynchronizer
private transient Thread exclusiveOwnerThread;
方法
Previous Jan 1, 2019
« Redis 设计与实现
« Redis 设计与实现
Sep 1, 2019 Next
Zero-Copy 简析 »
Zero-Copy 简析 »