AQS内部的同步队列
AQS内部的同步队列是一个双向队列,队列中的元素是AQS中的Node节点:
1 | static final class Node { |
waitStatus等待状态的五种情况
- CANCELLED:对应的值为1,表示节点对应的线程被取消,处于取消状态的节点不会再竞争锁
- SIGNAL:对应的值为-1,表示该节点后面的节点的线程处于等待获取锁的状态
- CONDITION:对应的值为-2,表示节点在等待队列中
- PROPAGATE:对应的值为-3,表示下一次共享式同步状态获取将被无条件的传播
- 节点的初始状态:对应的值为0
源码
1. acquire()
(1)调用tryAcquire获取锁,如果获取失败调用addWaiter为当前的线程创建Node节点,并调用acquireQueued处理创建的节点:
1 | /** |
AQS中并没有实现tryAcquire方法,只是在方法中抛出了一个异常,需要由AQS的子类来实现该方法:
1 | protected boolean tryAcquire(int arg) { |
2. addWaiter()
addWaiter方法主要为当前线程创建节点,然后返回创建的节点,在这个方法中有一个队列尾节点为空的判断:
(1)如果尾节点tail不为空,说明队列中已经存在节点,直接将新建的节点加到队列的尾部,然后通过CAS方式将当前节点置为尾节点即可。
(2)如果尾节点为空,说明同步队列中还没有节点,调用enq方法初始化队列的头结点和尾节点,并将当前节点加入队列。
1 | /** |
enq()
1 | /** |
3. acquireQueued()
acquireQueued方法主要用来对节点进行处理,在方法中有一个for循环不断的在尝试获取锁以及判断当前任务是否应该被阻塞等待:
判断当前节点的前一个节点是否为头结点:
(1)当前节点的前一个节点是头结点,因为当前节点的前一个节点是头结点,此时重新调用tryAcquire方法尝试获取锁,如果获取成功将当前节点置为头结点,并将之前的头结点的next置为空,等待被回收,然后返回线程的中断状态,如果获取失败继续for循环的流程。
(2)当前节点的前一个节点不是头结点,调用shouldParkAfterFailedAcquire方法设置节点的等待状态,shouldParkAfterFailedAcquire方法如果返回true(返回true表示当前节点已处于等待状态可以被阻塞),执行parkAndCheckInterrupt方法,这个方法用来阻塞当前线程直到被唤醒,之后返回线程的中断状态。
1 | final boolean acquireQueued(final Node node, int arg) { |
shouldParkAfterFailedAcquire()
在shouldParkAfterFailedAcquire方法中,==主要是为了让当前节点处于等待状态==,设置的方式是找到当前节点之前第一个waitStatus等于或小于0的节点,然后设置prev的waitStatus。
为什么要找到waitStatus等于或小于0的节点?
通过waitStatus的几种值可以知道大于0的时候表示这个节点处于取消状态,之后是要从队列中移除的,所以要不断向前找,直到找到waitStatus等于或小于0的节点。
因为节点的等待状态可能会发生变化,所以先判断当前节点的前一个节点的等待状态waitStatus:
(1)等待状态是-1,表示这个节点之后的节点处于等待锁的状态,此时直接返回true即可。
(2)等待状态大于0,表示pred节点可能已被取消,此时要跳过这个节点,一直往上一个节点找,直到找到一个等待状态小于或者等于0的节点,并将这个节点的next指向当前节点(此时shouldParkAfterFailedAcquire返回的结果为false)。
(3)如果当前节点的前一个节点perd的等待状态既不等于-1,也不大于0,此时将pred的等待状态设为-1,表示这个节点之后的节点处于等待锁的状态(此时shouldParkAfterFailedAcquire返回的结果为false)。
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
由上可知shouldParkAfterFailedAcquire中当前节点的前一个节点的等待状态是-1时返回的是true,其他情况返回false,但是返回false之后下一次再执行shouldParkAfterFailedAcquire,就可以满足条件返回true(因为调用了compareAndSetWaitStatus方法设置前一个节点的等待状态是-1,所以第二次进入可以满足条件)。
parkAndCheckInterrupt()
parkAndCheckInterrupt方法用来阻塞当前线程,然后返回线程的中断状态。LockSupport.park(this)会让当前线程进入阻塞状态,直到被其他线程唤醒(或者被中断),然后会调用Thread.interrupted()返回线程的中断位:
- 如果当前线程处于中断状态,Thread.interrupted()返回true,下次调用Thread.interrupted()将会返回false,因为中断位被重置了。
- 如果当前线程处于非中断状态,Thread.interrupted()返回false。
1 | /** |
回到acquiredQueue方法:
如果parkAndCheckInterrupt返回true,将 interrupted 设为 true,记录了线程的中断状态,当线程尝试获取锁成功之后,将interrupted中断中断状态返回:
1 | final boolean acquireQueued(final Node node, int arg) { |
为什么要记录中断状态?
假设一个线程被中断唤醒,Thread.interrupted()返回线程的中断状态true,之后在acquireQueued方法中并没有中断线程的相关操作,被中断之后还在尝试获取锁,获取成功之后将中断状态返回(此时返回true),再回到acquire方法中,如果acquireQueued返回true代表线程被中断过,此时调用selfInterrupt方法,来中断当前线程,由于调用Thread.interrupted()的时候会重置中断位,此时不能靠Thread.interrupted()判断中断位,所以要记录一下中断状态表明线程被中断过,然后根据这个状态在acquire方法做中断处理。
1 | public final void acquire(int arg) { |
4.cancelAcquire()
在acquireQueued方法的finally语句块中,如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态。
cancelAcquire()方法中,首先也要找到当前节点之前第一个waitStatus<=0的节点pred,此时当前节点有三种情况:
(1)当前节点如果是尾节点,当前节点直接从队尾删掉即可,先将pred的next置为空,然后pred设为尾节点。
(2)pred不是头结点,也就是说当前节点处于队列中间,并且它的前一个节点pred(指的是第一个waitStatus<=0的节点)不是头结点,此时判断pred的等待状态,如果是SIGNAL或者非SIGNAL,就调用compareAndSetWaitStatus方法将它的状态置为SIGNAL,然后将当前节点的下一个节点置为到pred的下一个节点。
(3)pred是头结点,也就是当前节点在头结点之后,此时直接唤醒后继节点即可。
1 | private void cancelAcquire(Node node) { |
5.release()
release方法用来释放锁,释放锁的实现在tryRelease方法,由AQS的子类来实现这个方法,释放锁成功之后,调用unparkSuccessor方法从头结点开始唤醒之后的节点。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public final boolean release(int arg) {
// tryRelease由AQS子类实现,用来释放锁,如果释放锁成功
if (tryRelease(arg)) {
// 头结点
Node h = head;
// 如果队列头节点不为空并且等待装不是0,头结点初始化时等待状态为0,不为0表示它之后的节点处于等待状态
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}
// 唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果当前节点的等待状态小于0,将等待状态置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当前节点的下一个节点
Node s = node.next;
// 下一个节点为空或者状态是取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列的尾部开始向前查找,直到找到第一个等待状态<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点绑定的线程
LockSupport.unpark(s.thread);
}
参考:
[Idea Buffer:深入理解AbstractQueuedSynchronizer(一)](http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)/)
jk’s Blog:【Java并发】详解 AbstractQueuedSynchronizer
go2sea:Java多线程之JUC包:AbstractQueuedSynchronizer(AQS)源码学习笔记
jdk版本:1.8