Tree's Blog

Home Projects AboutRSS

AQS源码解析

AQS 是并发包的基石。AQS 是一个抽象类,必须要继承才能使用。ReentrantLockReentrantReadWriteLockSemaphore这些并发工具都是依赖 AQS 实现(内部类继承 AQS )的。

AQS内部通过一个FIFO队列管理线程的同步,队列里的节点即是被包装的线程。

此处输入图片的描述

本文暂时只讲独占模式。

继承情况

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private static final long serialVersionUID = 3737899427754241961L;
    protected AbstractOwnableSynchronizer() { }
    private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AQS 继承 AbstractOwnableSynchronizer,是用于互斥同步的同步器,仅有exclusiveOwnerThread一个成员变量,表明当前拥有独占访问的线程。

可以看到get 和 set 方法都是 final 的,无法被重写。而且该成员变量没有使用volitile,其方法也没有做额外的同步,继承类使用时要自己注意。

内部类

Node:Node即是上图等待队列中的节点。

// AQS 的等待队列是 CLH 队列的变种,CLH 锁通常用于自旋锁。
// 虽然 AQS 是用来做阻塞同步器的,但使用了相同的策略:关于控制该节点状态的值都是在前一个节点持有(waitStatus)。
// 这里可以先记住,Node 点里 waitStatus并不是代表自己的,而是其后继节点的。
// 当一个节点的前置节点释放时,该节点会被signal。等待队列的每个节点都持有一个等待线程,作为一种
// 特殊通知形式的监视器。waitStatus并不控制线程是否被授予锁。
// 队列里第一个节点的线程,可能尝试获取锁。但是第一个节点并不保证成功获取锁。只是有尝试获取的权利。
// 当前被释放的竞争线程可能需要重新等待。

// CLH的入队,只需要原子性地把节点拼接到 tail。出队只需要 set head。
//   *      +------+  prev +-----+       +-----+
//   * head |      | <---- |     | <---- |     |  tail
//   *      +------+       +-----+       +-----+

// CLH 入队只需要在 tail 做一次原子操作,因此从没入队到入队有一个简单的原子分界点。
// 相似的,CLH 出队只需要更新 head。然而,决定其后继几点可能更花时间,部分原因是处理由于和中断和超时导致的取消。
// “prev",指Node的prev引用,在原始的CLH锁里是没有的,在 AQS 里主要用于取消操作。如果一个Node被取消了,它的后继节点通常会重连到一个没有被取消的节点上。

// "next", Node 的 next 引用,用于实现阻塞机制。前置节点唤醒后继节点,就是通过遍历 next。
// 决定后继节点,必须避免新入队的节点 set 前置节点的 next 所发生的竞争。这是通过当一个节点的后继节点为空时,必要时从原子更新的tail向后检查实现的。

// 取消引入了一些保守的算法。因为我们必须轮询其他节点的取消,可能会忽略被取消的节点是在前在后。我们通过取消时总是 unparking 后继节点解决,使被取消的后继节点稳定在一个新的前置节点上。

// CLH 需要一个虚拟的 head 启动。但我们不会在构造时创造 head,因为如果永远没有竞争的话,是浪费。AQS 是在第一次竞争时才会 构造Node,并设置 head和tail的指针。

// 在 Conditions 上等待的线程,使用的是相同的nodes,但使用了一个额外的link(nextWaiter)。Conditions 只需要在简单的无并发队列里 link 节点,因为只在独占时被访问。
// 等待时,Node 被插入到一个 Condition 队列。Signal 时,节点被转移到主队列。一个特殊的 status field 的值标记 node 在哪个队列()。

static final class Node {
    	// 节点是共享模式的标识 (共享模式的队列)
        static final Node SHARED = new Node();
        // 节点是独占模式的标识
        static final Node EXCLUSIVE = null;

        // waitStatus的值  此线程取消了获取锁
        static final int CANCELLED =  1;
        // waitStatus的值 标识后继节点的线程需要 unparking(唤醒)
        static final int SIGNAL    = -1;
        // waitStatus的值 标识当前节点在 Condition 上 wait
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
    	// waitStatus的值 标识下一个 acquireShared 应该无条件传播
        static final int PROPAGATE = -3;

        /**
         * waitStatus 的值,除上面4个外,还有0。
         *   SIGNAL:    当前节点的后继节点被阻塞了(通过park),所以当前节点释放或取消时必须 unpark 后继节点。
         *              为避免竞争,acquire 方法必须首先表明他们需要一个 signal,然后重试原子的 acquire,失败就阻塞住。
         *   CANCELLED: 当前节点因为超时或者中断被取消了抢锁。一个 CANCELLED 状态的线程永远不会再次阻塞。
         *   CONDITION: 当前节点当前处于一个 Condition 队列(处于共享模式)。
         *   PROPAGATE: 一个 releaseShared 应该被传播到其他 nodes。在 doReleaseShared 时 head节点被设置为这个状态确保传播继续,即使其他操作干预了。
         *   0:         普通的同步 node(非 Condition )的初始状态
         */
        volatile int waitStatus;

        // 前置节点的引用。
        // 当前的节点,也就是线程,需要 check 其 prev 的 waitStatus 。
        // 在当前节点入队时被赋值,出队时置null(for GC)。
        // 在前置节点取消时,我们通过一个简短的 while 循环找到一个没有被取消的节点。这个节点是总是存在的,
        // 因为 head 永远不会被取消:一个 node 能成为 head 肯定是 acquire 成功了。 
        // 一个被取消的线程永远不会在 acquire 时成功,并且线程只能取消自己,不能取消别的 node。    
        volatile Node prev;

        // 后继节点的引用。当前节点 release 时,要 unpark next节点。
        // 当前节点入队时赋值,由于取消,该值会被改变,出队时置null(for GC)
        // 入队操作在连接完成前不给前置节点的 next 赋值,所以没必要认为 next 为null的就是在队尾。
        // 然而,如果出现了 next 为空,我们可以从 tail 开始检查 prev 来 double-check
        // 取消节点的 next 指向 node 自身而不是 null, 使 isOnSyncQueue 方法更简单。
        volatile Node next;
        volatile Node next;

        // 当前节点的线程。
        volatile Thread thread;

        // 在 condition 上等待的 next node 或者 特殊值 SHARED 的引用。
        // 因为 condition 队列仅在独占模式被访问,只需要一个简单的队列即可。
        // 在 condition 等待队列的 nodes 随后被转移到主队列重新 acquire。
        // 因为 condition 只能是排它的,我们用 SHARED 这个特殊值标识是在共享模式。
        Node nextWaiter;

        // 当前 Node 是不是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 返回 prev 节点,如果是null,抛出 NullPointerException
        // 在 prev 不为 null 时使用。 null-check 可以省略,但有利于GC
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
	
    	// 不同的地方使用的3个构造器
        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

AQS 是双向等待队列,是 CLH 队列的变种,Node 类是队列中的节点。类上的注释大多是 AQS 队列的设计思想,可以暂时先对 AQS 有了大致了解后再来理解。

ConditionObject,用于共享模式,暂时先不讲了。

Field

接来下看 AQS 的 Field

	// 等待队列的 head,懒加载。除了初始化之外,只能通过 setHead 方法更改。
    // 要注意如果 head 存在,它的 waitStatus 保证不能为 CANCELLED 。
    private transient volatile Node head;

	// 等待队列的tail,懒加载。只能通过 enq 添加新的等待 node 节点时更改。
    private transient volatile Node tail;

	// AQS 本身的锁定状态。
    private volatile int state;

	//继承自AbstractOwnableSynchronizer
	private transient Thread exclusiveOwnerThread; 

方法