【JAVA】AbstractQueuedSynchronizer源码学习

AQS内部的同步队列

AQS内部的同步队列是一个双向队列,队列中的元素是AQS中的Node节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 static final class Node {
// 共享锁
static final Node SHARED = new Node();
// 独占锁
static final Node EXCLUSIVE = null;

/** 取消状态,处于取消状态的线程不再竞争锁,等待被GC回收 */
static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;

/**
* 等待状态
*/
volatile int waitStatus;

/**
* 前置节点,当前节点的前一个节点
*/
volatile Node prev;

/**
* 后置节点,当前节点的下一个节点
*/
volatile Node next;

/**
* 节点绑定的线程
*/
volatile Thread thread;

Node nextWaiter;

......
}

waitStatus等待状态的五种情况

  • CANCELLED:对应的值为1,表示节点对应的线程被取消,处于取消状态的节点不会再竞争锁
  • SIGNAL:对应的值为-1,表示该节点后面的节点的线程处于等待获取锁的状态
  • CONDITION:对应的值为-2,表示节点在等待队列中
  • PROPAGATE:对应的值为-3,表示下一次共享式同步状态获取将被无条件的传播
  • 节点的初始状态:对应的值为0

源码

1. acquire()

(1)调用tryAcquire获取锁,如果获取失败调用addWaiter为当前的线程创建Node节点,并调用acquireQueued处理创建的节点:

1
2
3
4
5
6
7
8
9
10
/**
* 获取锁
* @param arg
*/
public final void acquire(int arg) {
// 调用tryAcquire获取锁,如果获取失败,调用addWaiter将任务封装成Node加入队列,调用acquireQueued处理创建的节点
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

AQS中并没有实现tryAcquire方法,只是在方法中抛出了一个异常,需要由AQS的子类来实现该方法:

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

2. addWaiter()

addWaiter方法主要为当前线程创建节点,然后返回创建的节点,在这个方法中有一个队列尾节点为空的判断:

(1)如果尾节点tail不为空,说明队列中已经存在节点,直接将新建的节点加到队列的尾部,然后通过CAS方式将当前节点置为尾节点即可。

(2)如果尾节点为空,说明同步队列中还没有节点,调用enq方法初始化队列的头结点和尾节点,并将当前节点加入队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 创建节点
*/
private Node addWaiter(Node mode) {
// 创建节点,绑定当前线程
Node node = new Node(Thread.currentThread(), mode);
// 尾节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 将新的节点加入到尾节点之后
node.prev = pred;
// CAS将新建的节点设置成尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾节点为空,在enq中初始化head和tail节点之后再把新节点插入到队列后边
enq(node);
return node;
}

enq()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 初始化队列的头结点和尾节点,并将传入的节点加入队列
* @param node
* @return
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 创建一个新节点,当做头结点,并设置尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将传入的节点插入到头结点之后并设为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

3. acquireQueued()

acquireQueued方法主要用来对节点进行处理,在方法中有一个for循环不断的在尝试获取锁以及判断当前任务是否应该被阻塞等待:

判断当前节点的前一个节点是否为头结点:

(1)当前节点的前一个节点是头结点,因为当前节点的前一个节点是头结点,此时重新调用tryAcquire方法尝试获取锁,如果获取成功将当前节点置为头结点,并将之前的头结点的next置为空,等待被回收,然后返回线程的中断状态,如果获取失败继续for循环的流程。

(2)当前节点的前一个节点不是头结点,调用shouldParkAfterFailedAcquire方法设置节点的等待状态,shouldParkAfterFailedAcquire方法如果返回true(返回true表示当前节点已处于等待状态可以被阻塞),执行parkAndCheckInterrupt方法,这个方法用来阻塞当前线程直到被唤醒,之后返回线程的中断状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前一个节点为头结点,重新调用tryAcquire方法尝试获取锁
if (p == head && tryAcquire(arg)) {
// 如果获取锁成功,将当前节点置为头结点
setHead(node);
// 将之前的头结点的next置为空
p.next = null; // help GC
failed = false;
// 返回中断状态
return interrupted;
}
// 如果当前节点的前一个节点不是头结点
// shouldParkAfterFailedAcquire中设置当前节点的前一个节点的等待状态,让前一个节点知道当前节点进入等待
// parkAndCheckInterrupt让当前线程进入阻塞状态,直到被唤醒之后,继续执行,并返回中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;// 记录中断状态
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire()

在shouldParkAfterFailedAcquire方法中,==主要是为了让当前节点处于等待状态==,设置的方式是找到当前节点之前第一个waitStatus等于或小于0的节点,然后设置prev的waitStatus。

为什么要找到waitStatus等于或小于0的节点?

通过waitStatus的几种值可以知道大于0的时候表示这个节点处于取消状态,之后是要从队列中移除的,所以要不断向前找,直到找到waitStatus等于或小于0的节点。

因为节点的等待状态可能会发生变化,所以先判断当前节点的前一个节点的等待状态waitStatus:

(1)等待状态是-1,表示这个节点之后的节点处于等待锁的状态,此时直接返回true即可。

(2)等待状态大于0,表示pred节点可能已被取消,此时要跳过这个节点,一直往上一个节点找,直到找到一个等待状态小于或者等于0的节点,并将这个节点的next指向当前节点(此时shouldParkAfterFailedAcquire返回的结果为false)。

(3)如果当前节点的前一个节点perd的等待状态既不等于-1,也不大于0,此时将pred的等待状态设为-1,表示这个节点之后的节点处于等待锁的状态(此时shouldParkAfterFailedAcquire返回的结果为false)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 当前节点的前一个节点的等待状态
int ws = pred.waitStatus;
// 如果是signal状态(-1),返回true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果当前节点的前一个节点的等待状态大于0,从前一个节点开始一直往前找,直到找到等待状态不大于0的节点,返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 当前节点的前一个节点的状态既不等于-1,也不大于0,设置当前节点的前一个节点的状态为signal(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

由上可知shouldParkAfterFailedAcquire中当前节点的前一个节点的等待状态是-1时返回的是true,其他情况返回false,但是返回false之后下一次再执行shouldParkAfterFailedAcquire,就可以满足条件返回true(因为调用了compareAndSetWaitStatus方法设置前一个节点的等待状态是-1,所以第二次进入可以满足条件)。

parkAndCheckInterrupt()

parkAndCheckInterrupt方法用来阻塞当前线程,然后返回线程的中断状态。LockSupport.park(this)会让当前线程进入阻塞状态,直到被其他线程唤醒(或者被中断),然后会调用Thread.interrupted()返回线程的中断位:

  • 如果当前线程处于中断状态,Thread.interrupted()返回true,下次调用Thread.interrupted()将会返回false,因为中断位被重置了。
  • 如果当前线程处于非中断状态,Thread.interrupted()返回false。
1
2
3
4
5
6
7
8
9
10
/**
* 阻塞线程
* @return
*/
private final boolean parkAndCheckInterrupt() {
// 阻塞当前的线程
LockSupport.park(this);
// 返回当前线程的中断位
return Thread.interrupted();
}

回到acquiredQueue方法:

如果parkAndCheckInterrupt返回true,将 interrupted 设为 true,记录了线程的中断状态,当线程尝试获取锁成功之后,将interrupted中断中断状态返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;//返回中断状态
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//设置中断状态
}
} finally {
if (failed)
cancelAcquire(node);//取消竞争锁
}
}

为什么要记录中断状态

假设一个线程被中断唤醒,Thread.interrupted()返回线程的中断状态true,之后在acquireQueued方法中并没有中断线程的相关操作,被中断之后还在尝试获取锁,获取成功之后将中断状态返回(此时返回true),再回到acquire方法中,如果acquireQueued返回true代表线程被中断过,此时调用selfInterrupt方法,来中断当前线程,由于调用Thread.interrupted()的时候会重置中断位,此时不能靠Thread.interrupted()判断中断位,所以要记录一下中断状态表明线程被中断过,然后根据这个状态在acquire方法做中断处理。

1
2
3
4
5
6
public final void acquire(int arg) {
// 调用tryAcquire获取锁,如果获取失败,调用addWaiter将任务封装成Node,调用acquireQueued将node加入队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

4.cancelAcquire()

在acquireQueued方法的finally语句块中,如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态。
cancelAcquire()方法中,首先也要找到当前节点之前第一个waitStatus<=0的节点pred,此时当前节点有三种情况:

(1)当前节点如果是尾节点,当前节点直接从队尾删掉即可,先将pred的next置为空,然后pred设为尾节点。

(2)pred不是头结点,也就是说当前节点处于队列中间,并且它的前一个节点pred(指的是第一个waitStatus<=0的节点)不是头结点,此时判断pred的等待状态,如果是SIGNAL或者非SIGNAL,就调用compareAndSetWaitStatus方法将它的状态置为SIGNAL,然后将当前节点的下一个节点置为到pred的下一个节点。

(3)pred是头结点,也就是当前节点在头结点之后,此时直接唤醒后继节点即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void cancelAcquire(Node node) {
// 节点为直接返回
if (node == null)
return;
// 将节点绑定的线程置为空
node.thread = null;

// 获取当前节点的前一个节点
Node pred = node.prev;
// 从当前节点的前一个节点开始,直到找到waitStatus<=0的节点记为pred
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// pred的下一个节点
Node predNext = pred.next;
// 将当前节点的等待状态置为取消
node.waitStatus = Node.CANCELLED;
// (1)如果当前节点是尾节点,将pred置为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
//将pred的next置为空
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//(2)当前节点位于队列中间的情况
// 当前节点不是head节点的下一个节点,或者pred的等待状态是SIGNAL 再或者不是SIGNAL,就将pred的等待状态置为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {// pred绑定的线程也不能为空
//当前节点的下一个节点
Node next = node.next;
// 如果下一个节点不为空并且不是取消状态
if (next != null && next.waitStatus <= 0)
// 将pred的下一个节点指向当前节点的下一个节点
compareAndSetNext(pred, predNext, next);
} else {
//(3)当前节点是头结点的下一个节点,唤醒后继节点
unparkSuccessor(node);
}
// 当前节点的下一个节点指向自己等待回收
node.next = node;
}
}

5.release()

release方法用来释放锁,释放锁的实现在tryRelease方法,由AQS的子类来实现这个方法,释放锁成功之后,调用unparkSuccessor方法从头结点开始唤醒之后的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final boolean release(int arg) {
// tryRelease由AQS子类实现,用来释放锁,如果释放锁成功
if (tryRelease(arg)) {
// 头结点
Node h = head;
// 如果队列头节点不为空并且等待装不是0,头结点初始化时等待状态为0,不为0表示它之后的节点处于等待状态
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}

// 唤醒后继节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
// 如果当前节点的等待状态小于0,将等待状态置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当前节点的下一个节点
Node s = node.next;
// 下一个节点为空或者状态是取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列的尾部开始向前查找,直到找到第一个等待状态<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点绑定的线程
LockSupport.unpark(s.thread);
}

参考:

[Idea Buffer:深入理解AbstractQueuedSynchronizer(一)](http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)/)

我们都是小青蛙:java并发性能(五)

jk’s Blog:【Java并发】详解 AbstractQueuedSynchronizer

waterystone:Java并发之AQS详解

go2sea:Java多线程之JUC包:AbstractQueuedSynchronizer(AQS)源码学习笔记

jdk版本:1.8