0%

AQS原理解析

AbstractQueuedSynchronizer,可以叫做抽象队列同步器,可以说是 J.U.C 并发包里大多数工具的基石,并发包里的几个工具类,还有包括之前已经分析过得ReentrantLock统统是基于该同步器所设立的框架。 就让我们看看这个东西到底有什么精巧的设计。

AQS 实现了对同步状态的管理,以及对阻塞线程进行排队、等待通知等等一些底层的实现处理。AQS 的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,这些实际上是AQS提供出来的模板方法。AQS 是一个抽象类,当我们继承 AQS 去实现自己的同步器时,要做的仅仅是根据自己同步器需要满足的性质实现线程获取和释放资源的方式(修改同步状态变量的方式)即可,至于具体线程等待队列的维护(如获取资源失败入队、唤醒出队、以及线程在队列中行为的管理等),AQS 在其顶层已经帮我们实现好了,AQS 的这种设计使用的正是模板方法模式。

AQS 支持两种模式:

  • 独占模式(exclusive mode):同一时刻只允许一个线程访问共享资源,如ReentrantLock
    • 公平模式:获取锁失败的线程需要按照顺序排列,前面的先拿到锁
    • 非公平模式: 当线程需要获取锁时,会尝试直接获取锁
  • 共享模式(shared mode):同一时刻允多个线程访问共享资源

AQS 使用了 CLH 内部队列,也叫 CLH 锁。这个 CLH 听起来很厉害的样子,实际上是三位创作者的名字简称:Craig, Landin, and Hagersten。CLH 锁是基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

AQS 维护了一个内部类,包括以下内容:

1
static final class Node {
2
    static final Node SHARED = new Node();
3
    static final Node EXCLUSIVE = null;
4
    static final int CANCELLED =  1;
5
    static final int SIGNAL    = -1;
6
    static final int CONDITION = -2;
7
    static final int PROPAGATE = -3;
8
    volatile int waitStatus;
9
10
    volatile Node prev;
11
    volatile Node next;
12
    volatile Thread thread;
13
    
14
    Node nextWaiter;
15
16
    final boolean isShared() {
17
        return nextWaiter == SHARED;
18
    }
19
    
20
    final Node predecessor() {
21
        Node p = prev;
22
        if (p == null)
23
            throw new NullPointerException();
24
        else
25
            return p;
26
    }
27
28
    Node() {    // Used to establish initial head or SHARED marker
29
    }
30
    
31
    /** Constructor used by addWaiter. */
32
    Node(Node nextWaiter) {
33
        this.nextWaiter = nextWaiter;
34
        THREAD.set(this, Thread.currentThread());
35
    }
36
37
    /** Constructor used by addConditionWaiter. */
38
    Node(int waitStatus) {
39
        WAITSTATUS.set(this, waitStatus);
40
        THREAD.set(this, Thread.currentThread());
41
    }
42
    
43
    /** CASes waitStatus field. */
44
    final boolean compareAndSetWaitStatus(int expect, int update) {
45
        return WAITSTATUS.compareAndSet(this, expect, update);
46
    }
47
48
    /** CASes next field. */
49
    final boolean compareAndSetNext(Node expect, Node update) {
50
        return NEXT.compareAndSet(this, expect, update);
51
    }
52
53
    final void setPrevRelaxed(Node p) {
54
        PREV.set(this, p);
55
    }
56
57
    // VarHandle mechanics
58
    private static final VarHandle NEXT;
59
    private static final VarHandle PREV;
60
    private static final VarHandle THREAD;
61
    private static final VarHandle WAITSTATUS;
62
    static {
63
        try {
64
            MethodHandles.Lookup l = MethodHandles.lookup();
65
            NEXT = l.findVarHandle(Node.class, "next", Node.class);
66
            PREV = l.findVarHandle(Node.class, "prev", Node.class);
67
            THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
68
            WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
69
        } catch (ReflectiveOperationException e) {
70
            throw new ExceptionInInitializerError(e);
71
        }
72
    }
73
}

可以看出,该内部类是一个双向链表,保存前后节点,然后每个节点存储了当前的状态waitStatus、当前线程thread,还可以通过SHAREDEXCLUSIVE两个变量定义为共享模式或者独占模式,通过下面的方式:

1
// 标识当前节点在共享模式
2
static final Node SHARED = new Node();
3
// 标识当前节点在独占模式
4
static final Node EXCLUSIVE = null;

然后定义了四个常量:

1
CANCELLED,值为1,表示当前的线程被取消;
2
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
3
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
4
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行; 
5
默认值为0,表示当前节点在sync队列中,等待着获取锁。
6
7
waitStatus 表当前节点的状态值,取值为上面的四个常量。

独占模式

首先我们来分析互斥模式,互斥模式作为最常用的模式使用范围很广,比如ReentrantLock,加锁和释放锁就是使用互斥模式来实现的。

独占模式中核心加锁方法是acquire():

1
public final void acquire(int arg) {
2
    if (!tryAcquire(arg) &&
3
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4
        selfInterrupt();
5
}

其中tryAcquire()方法是没有具体实现的,需要继承者自己实现。tryAcquire()方法返回成功或者失败,如果失败之后先执行addWaiter()添加一个独占式的节点:

1
/**
2
 * Creates and enqueues node for current thread and given mode.
3
 *
4
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
5
 * @return the new node
6
 */
7
private Node addWaiter(Node mode) {
8
    Node node = new Node(mode);//创建一个节点,此处mode是独占式的。
9
  
10
    for (;;) {//注意这里是循环
11
        Node oldTail = tail;
12
        if (oldTail != null) {
13
            node.setPrevRelaxed(oldTail);// 如果 tail 节点不是 null,就将新节点(node)的前节点设置为 tail 节点,并且将新节点(node)设置成 tail 节点。
14
            if (compareAndSetTail(oldTail, node)) {//CAS 将 tail 更新为新节点(node)
15
                oldTail.next = node;//把原 tail 的 next 设为 node。至此,完成了把新节点 node 插入到原来尾节点的后面,并设置成新的尾节点。
16
                return node;
17
            }
18
        } else {
19
            initializeSyncQueue();//还没有初始化,就调用 initializeSyncQueue() 方法
20
        }
21
    }
22
}

initializeSyncQueue()方法也很简单,就是初始化头结点和尾节点:

1
/**
2
 * Initializes head and tail fields on first contention.
3
 */
4
private final void initializeSyncQueue() {
5
    Node h;
6
    if (HEAD.compareAndSet(this, null, (h = new Node())))
7
        tail = h;
8
}

至此,我们添加了一个新的节点到原来的队列,并且把新加入的节点设置成了尾节点。然后看acquireQueue()方法:

1
/**
2
 * Acquires in exclusive uninterruptible mode for thread already in
3
 * queue. Used by condition wait methods as well as acquire.
4
 *
5
 * @param node the node
6
 * @param arg the acquire argument
7
 * @return {@code true} if interrupted while waiting
8
 */
9
final boolean acquireQueued(final Node node, int arg) {
10
    boolean interrupted = false;
11
    try {
12
        for (;;) {
13
            final Node p = node.predecessor();
14
            if (p == head && tryAcquire(arg)) {
15
                setHead(node);
16
                p.next = null; // help GC
17
                return interrupted;
18
            }
19
            if (shouldParkAfterFailedAcquire(p, node))
20
                interrupted |= parkAndCheckInterrupt();
21
        }
22
    } catch (Throwable t) {
23
        cancelAcquire(node);
24
        if (interrupted)
25
            selfInterrupt();
26
        throw t;
27
    }
28
}

shouldParkAfterFailedAcquire()(注意该方法是在循环里面) 这个方法最终会返回true或者false,从这个方法的名称可以看出,该方法的作用是在当前线程获取资源失败后是否挂起当前线程,显然:

  • 返回true,说明前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒。当前节点是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候通知当前线程
  • 返回false,说明当前线程不需要被挂起,因为不符合挂起的条件。
1
/**
2
 * Checks and updates status for a node that failed to acquire.
3
 * Returns true if thread should block. This is the main signal
4
 * control in all acquire loops.  Requires that pred == node.prev.
5
 *
6
 * @param pred node's predecessor holding status
7
 * @param node the node
8
 * @return {@code true} if thread should block
9
 */
10
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
11
    int ws = pred.waitStatus;//ws是代表前节点的状态
12
    if (ws == Node.SIGNAL)//前节点状态是等待唤醒状态,那么当前线程需要被挂起,等待以后被唤醒。
13
        /*
14
         * This node has already set status asking a release
15
         * to signal it, so it can safely park.
16
         */
17
        return true;
18
    if (ws > 0) {//前节点状态是 CANCEL,代表可以忽略,我们删除掉这个节点,再看更前面的一个。
19
        /*
20
         * Predecessor was cancelled. Skip over predecessors and
21
         * indicate retry.
22
         */
23
        do {
24
            node.prev = pred = pred.prev;
25
        } while (pred.waitStatus > 0);
26
        pred.next = node;
27
    } else {//前节点状态是0 或者 PROPAGATE,把状态改成 SIGNAL,但是不挂起。
28
        /*
29
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
30
         * need a signal, but don't park yet.  Caller will need to
31
         * retry to make sure it cannot acquire before parking.
32
         */
33
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
34
    }
35
    return false;
36
}

继续往下看,如果shouldParkAfterFailedAcquire(p, node)返回true,接下来就会执行下面这段代码:

1
interrupted |= parkAndCheckInterrupt();

其实这段代码等价于:

1
interrupted = interrupted | parkAndCheckInterrupt();

接着就是parkAndCheckInterrup方法,用来挂起当前的线程,返回中断标志。代码如下:

1
/**
2
 * Convenience method to park and then check if interrupted.
3
 *
4
 * @return {@code true} if interrupted
5
 */
6
private final boolean parkAndCheckInterrupt() {
7
    LockSupport.park(this);
8
    return Thread.interrupted();
9
}

注意入队与挂起线程操作不响应中断,只是返回线程中断标志,这一点从上面的代码就可以看出来。

acquireQueued方法中,for循环是在try语句块里面的,所以这块代码会出现异常,下面有catch语句块。在 JDK8 中,没有catch语句块,有一个finally语句块,这是两个版本之间的差异。这里捕获的异常是tryAcquire抛出的,因为tryAcquire需要继承的类自定义实现,有可能抛出异常。catch异常之后,执行以下方法:

1
cancelAcquire(node);//取消加锁,恢复状态
2
if (interrupted)
3
    selfInterrupt();
4
throw t;

cancelAcquire方法的源码如下:

1
/**
2
 * Cancels an ongoing attempt to acquire.
3
 *
4
 * @param node the node
5
 */
6
private void cancelAcquire(Node node) {
7
    // Ignore if node doesn't exist
8
    if (node == null)
9
        return;
10
11
    node.thread = null;// node节点内的线程置为空
12
13
    // Skip cancelled predecessors
14
    Node pred = node.prev;    // pred 是前驱节点
15
    while (pred.waitStatus > 0)// 找到 pred 结点前面最近的一个状态不为 CANCELLED 的结点
16
        node.prev = pred = pred.prev;
17
18
    // predNext is the apparent node to unsplice. CASes below will
19
    // fail if not, in which case, we lost race vs another cancel
20
    // or signal, so no further action is necessary, although with
21
    // a possibility that a cancelled node may transiently remain
22
    // reachable.
23
    Node predNext = pred.next;
24
25
    // Can use unconditional write instead of CAS here.
26
    // After this atomic step, other Nodes can skip past us.
27
    // Before, we are free of interference from other threads.
28
    node.waitStatus = Node.CANCELLED;//当前节点的状态改成 CANCELLED
29
30
    // If we are the tail, remove ourselves.
31
    if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾节点,则利用 CAS 设置尾结点为 pred 结点
32
        pred.compareAndSetNext(predNext, null);
33
    } else {
34
        // If successor needs signal, try to set pred's next-link
35
        // so it will get one. Otherwise wake it up to propagate.
36
        int ws;
37
        //如果 pred 结点不为头结点
38
        //并且(pred 结点的状态为 SIGNAL 或者 (ws 小于 0 并且 CAS 设置等待状态为 SIGNAL 成功))
39
        //并且 pred 结点内的线程不为空
40
        if (pred != head &&
41
            ((ws = pred.waitStatus) == Node.SIGNAL || 
42
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
43
            pred.thread != null) {
44
            Node next = node.next;
45
            if (next != null && next.waitStatus <= 0)//后继节点不为空 并且后继节点的等待状态小于等于0
46
                pred.compareAndSetNext(predNext, next);//把当前节点的后节点设置成本节点的后节点,也就是说把本节点剔除出去。
47
        } else {
48
            unparkSuccessor(node);// 释放节点的后继节点
49
        }
50
51
        node.next = node; // help GC
52
    }
53
}

cancleAcquire方法执行完成之后,node 节点就取消了加锁,恢复了队列原有的信号状态,然后从队列列删除了 node 节点。

最后一步,如果interruptedtrue,就把当前线程挂起。

简化一点,以上所有做的就是以下几个工作:

  1. 尝试获取锁
  2. 获取不到锁的话,加入队列并将队列中的前元素的状态改为SIGNAL
  3. 如果出错,就恢复状态,抛出异常。
  4. 没有出错就按照需求判断是否需要中断,需要的话中断当前线程。

上面研究了独占模式下 AQS 的原理,再让我们一起看看共享模式做了哪些工作。

共享模式

共享模式的获取和释放锁的方法也很容易找到:

1
public final void acquireShared(int arg) {
2
    if (tryAcquireShared(arg) < 0)
3
        doAcquireShared(arg);
4
}
5
public final boolean releaseShared(int arg) {
6
    if (tryReleaseShared(arg)) {
7
        doReleaseShared();
8
        return true;
9
    }
10
    return false;
11
}

需要注意到,获取锁除了基本的方法之外,还有两个增强的方法,这两个方法被用在SemaphoreCountDownLatchReentrantReadWriteLock中:

1
public final void acquireSharedInterruptibly(int arg)//在acquireShared 方法基础上增加了能响应中断的功能;
2
        throws InterruptedException {
3
    if (Thread.interrupted())
4
        throw new InterruptedException();
5
    if (tryAcquireShared(arg) < 0)
6
        doAcquireSharedInterruptibly(arg);
7
}
8
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)//在acquireSharedInterruptibly基础上增加了超时等待的功能
9
        throws InterruptedException {
10
    if (Thread.interrupted())
11
        throw new InterruptedException();
12
    return tryAcquireShared(arg) >= 0 ||
13
        doAcquireSharedNanos(arg, nanosTimeout);
14
}

还是从获取锁开始看,tryAcquireShared方法跟tryAcquire类似,需要继承者手动实现,返回 0 代表当前线程能够执行,但之后的将会进入等待队列中;返回正数直接执行,之后的线程可能也可以直接执行。

我们还是先看实际主要逻辑所在的doAcquireShared方法:

1
private void doAcquireShared(int arg) {
2
    final Node node = addWaiter(Node.SHARED);//创建一个分享模式的节点,CAS 循环加到队尾,node 就是新加到队尾的那个节点。
3
    boolean interrupted = false;
4
    try {
5
        for (;;) {
6
            final Node p = node.predecessor();
7
            if (p == head) {//前节点是 head,证明当前节点是队列里的第一个。
8
                int r = tryAcquireShared(arg);
9
                if (r >= 0) {//获取锁成功
10
                    setHeadAndPropagate(node, r);
11
                    p.next = null; // help GC
12
                    return;
13
                }
14
            }
15
            if (shouldParkAfterFailedAcquire(p, node))
16
                interrupted |= parkAndCheckInterrupt();
17
        }
18
    } catch (Throwable t) {
19
        cancelAcquire(node);
20
        throw t;
21
    } finally {
22
        if (interrupted)
23
            selfInterrupt();
24
    }
25
}

这里与上面独占的部分也很相似,只有一个setHeadAndPropagate方法是新的,主要就是把当前节点设置成head节点,然后依次唤醒后续节点。

1
/**
2
 * Sets head of queue, and checks if successor may be waiting
3
 * in shared mode, if so propagating if either propagate > 0 or
4
 * PROPAGATE status was set.
5
 *
6
 * @param node the node
7
 * @param propagate the return value from a tryAcquireShared
8
 */
9
private void setHeadAndPropagate(Node node, int propagate) {
10
    Node h = head; // Record old head for check below
11
    setHead(node);
12
    /*
13
     * Try to signal next queued node if:
14
     *   Propagation was indicated by caller,
15
     *     or was recorded (as h.waitStatus either before
16
     *     or after setHead) by a previous operation
17
     *     (note: this uses sign-check of waitStatus because
18
     *      PROPAGATE status may transition to SIGNAL.)
19
     * and
20
     *   The next node is waiting in shared mode,
21
     *     or we don't know, because it appears null
22
     *
23
     * The conservatism in both of these checks may cause
24
     * unnecessary wake-ups, but only when there are multiple
25
     * racing acquires/releases, so most need signals now or soon
26
     * anyway.
27
     */
28
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
29
        (h = head) == null || h.waitStatus < 0) {
30
        Node s = node.next;
31
        if (s == null || s.isShared())
32
            doReleaseShared();
33
    }
34
}

下面这一大长串判断的逻辑是这样:首先propagate > 0代表当前线程已经获取到了资源,并且需要唤醒后面阻塞的节点;h.waitStatus < 0 代表旧的头节点后面的节点可以被唤醒;(h = head) == null || h.waitStatus < 0 这个操作是说新的头节点后面的节点可以被唤醒,总结来说:

  1. propagate > 0代表当前线程已经获取到了资源,并且需要唤醒后面阻塞的节点。
  2. 无论新旧头节点,只要其waitStatus < 0,那么其后面的节点可以被唤醒。

如果上面if返回true,接着获取当前节点的后继节点,这里又会有一个判断,如果后继节点是共享模式或者现在还看不到后继的状态,则都继续唤醒后继节点中的线程。上面if返回true,接着执行doReleaseShared方法,代码如下:

1
/**
2
 * Release action for shared mode -- signals successor and ensures
3
 * propagation. (Note: For exclusive mode, release just amounts
4
 * to calling unparkSuccessor of head if it needs signal.)
5
 */
6
private void doReleaseShared() {
7
    /*
8
     * Ensure that a release propagates, even if there are other
9
     * in-progress acquires/releases.  This proceeds in the usual
10
     * way of trying to unparkSuccessor of head if it needs
11
     * signal. But if it does not, status is set to PROPAGATE to
12
     * ensure that upon release, propagation continues.
13
     * Additionally, we must loop in case a new node is added
14
     * while we are doing this. Also, unlike other uses of
15
     * unparkSuccessor, we need to know if CAS to reset status
16
     * fails, if so rechecking.
17
     */
18
    for (;;) {
19
        Node h = head;
20
        if (h != null && h != tail) {
21
            int ws = h.waitStatus;
22
            if (ws == Node.SIGNAL) {//如果状态是等待信号
23
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))//cas 操作失败的话就循环继续
24
                    continue;            // loop to recheck cases
25
                unparkSuccessor(h);// 唤醒后继节点
26
            }
27
            // 如果后继节点还未设置前驱节点的waitStatus为SIGNAL,代表目前无需唤醒或者不存在。
28
            // 那么就将头节点的waitStatus设置为PROPAGATE,代表在下次acquireShared时无条件地传播
29
            else if (ws == 0 &&
30
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
31
                continue;                // loop on failed CAS
32
        }
33
        if (h == head)                   // loop if head changed
34
            break;
35
    }
36
}

再回头看doAcquireSharedInterruptiblydoAcquireSharedNanos方法,提供了可以中断和可以超时的获取锁方式:

1
/**
2
 * Acquires in shared interruptible mode.
3
 * @param arg the acquire argument
4
 */
5
private void doAcquireSharedInterruptibly(int arg)
6
    throws InterruptedException {
7
    final Node node = addWaiter(Node.SHARED);
8
    try {
9
        for (;;) {
10
            final Node p = node.predecessor();
11
            if (p == head) {
12
                int r = tryAcquireShared(arg);
13
                if (r >= 0) {
14
                    setHeadAndPropagate(node, r);
15
                    p.next = null; // help GC
16
                    return;
17
                }
18
            }
19
            if (shouldParkAfterFailedAcquire(p, node) &&
20
                parkAndCheckInterrupt())
21
                throw new InterruptedException();
22
        }
23
    } catch (Throwable t) {
24
        cancelAcquire(node);
25
        throw t;
26
    }
27
}

可中断获取锁的逻辑跟前面acquire很像,唯一的区别是当parkAndCheckInterrupt返回true时即线程阻塞时该线程被中断,代码抛出被中断异常。

通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:

  1. 在超时时间内,当前线程成功获取了锁;
  2. 当前线程在超时时间内被中断;
  3. 超时时间结束,仍未获得锁返回false

具体实现如下:

1
/**
2
 * Acquires in shared timed mode.
3
 *
4
 * @param arg the acquire argument
5
 * @param nanosTimeout max wait time
6
 * @return {@code true} if acquired
7
 */
8
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
9
        throws InterruptedException {
10
    if (nanosTimeout <= 0L)
11
        return false;
12
    final long deadline = System.nanoTime() + nanosTimeout;
13
    final Node node = addWaiter(Node.SHARED);
14
    try {
15
        for (;;) {
16
            final Node p = node.predecessor();
17
            if (p == head) {
18
                int r = tryAcquireShared(arg);
19
                if (r >= 0) {
20
                    setHeadAndPropagate(node, r);
21
                    p.next = null; // help GC
22
                    return true;
23
                }
24
            }
25
            nanosTimeout = deadline - System.nanoTime();
26
            if (nanosTimeout <= 0L) {
27
                cancelAcquire(node);
28
                return false;
29
            }
30
            if (shouldParkAfterFailedAcquire(p, node) &&
31
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
32
                LockSupport.parkNanos(this, nanosTimeout);
33
            if (Thread.interrupted())
34
                throw new InterruptedException();
35
        }
36
    } catch (Throwable t) {
37
        cancelAcquire(node);
38
        throw t;
39
    }
40
}