0%

AQS源码分析

AQS为很多的锁实现了底层的功能,就拿ReentrantLock来说,里面的lock就使用了AQS提供的方法

本文是参考的JDK 14的源码,JDK14和JDK8中的源码不太相同,例如去除了几种NODE状态,增加了一些节点的内部静态类

结构图

AQS的结构图

https://iotxing-1253163150.cos.ap-shanghai.myqcloud.com/AQS-%E7%BB%93%E6%9E%84%E5%9B%BE.jpg

Node的结构图

https://iotxing-1253163150.cos.ap-shanghai.myqcloud.com/NODE%E7%BB%93%E6%9E%84.jpg

基础变量

1
2
3
4
5
6
7
8
9
10
11
12
//表示NODE的状态   
static final int WAITING = 1; // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND = 2;

//用来维护队列的链表节点
private transient volatile Node head; //等待队列的头结点,初始化之后通过懒加载生成

private transient volatile Node tail; //等待队列的尾结点,初始化之后才会出现,并且使用casTail操作进行修改,保证线程安全

//状态
private volatile int state; //表示当前对象的同步状态

Node

Node是AQS中比较重要的一个对象,它用来形成一个双向链表,存放等待的线程信息。

定义了上一个节点和下一个节点,等待的线程以及当前节点的状态。

节点的状态总共有上面定义的三种状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others

// methods for atomic operations
final boolean casPrev(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, PREV, c, v);
}
final boolean casNext(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, NEXT, c, v);
}
final int getAndUnsetStatus(int v) { // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}
final void setPrevRelaxed(Node p) { // for off-queue assignment
U.putReference(this, PREV, p);
}
final void setStatusRelaxed(int s) { // for off-queue assignment
U.putInt(this, STATUS, s);
}
final void clearStatus() { // for reducing unneeded signals
U.putIntOpaque(this, STATUS, 0);
}

private static final long STATUS
= U.objectFieldOffset(Node.class, "status");
private static final long NEXT
= U.objectFieldOffset(Node.class, "next");
private static final long PREV
= U.objectFieldOffset(Node.class, "prev");
}

通过上面的结构图能发现,有三个类对Node进行了扩展,其中只有ConditionNode进行了增强

节点的status有几种值

  • 当节点在acquire中新建出来的时候,默认是0,然后通过自旋,将状态改为WAITING,也就是1
  • 如果节点被取消,状态会被改为CANCELLED,然后再cleanQueue的时候被清理
  • 在挂起指定时间后,将节点的状态置位0

ConditionNode

因为是条件节点,所以存储了下一个等待的线程。在进行节点释放的时候,先判断当前节点的状态,如果小于等于1,说明当前节点处于wait或者cancelled的状态,可以释放,或者判断当前的线程是否被中断了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node

/**
* Allows Conditions to be used in ForkJoinPools without
* risking fixed pool exhaustion. This is usable only for
* untimed Condition waits, not timed versions.
*/
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}

public final boolean block() {
while (!isReleasable()) LockSupport.park(); //如果当前节点不能被释放,调用中断函数
return true;
}
}

casTail

使用CAS操作来修改尾结点,保证线程安全

1
2
3
private boolean casTail(Node c, Node v) {
return U.compareAndSetReference(this, TAIL, c, v);
}

tryInitializeHead

尝试初始化头结点,会使用cas操作进行添加

如果头结点添加成功,修改尾结点为头结点。

1
2
3
4
5
private void tryInitializeHead() {
Node h = new ExclusiveNode();
if (U.compareAndSetReference(this, HEAD, null, h))
tail = h;
}

enqueue

将节点添加到队列里面去。

使用自旋操作,获取到当前的尾结点,然后尝试使用cas将目标节点插入到尾结点后面。如果之前的尾结点状态小于0,也就是说处于cancelled状态,唤醒新加入节点的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}

Acquire 获取锁

根据是否共享模式,调用不同的tryAcquire。

共享模式的获取锁只有CountDownLatch和ReentrantReadWriteLock

独享模式的主要在可重入锁中,还有线程池中也有

first表示的是当前节点未head后面的第一个节点,head一定是拿到锁的节点

https://iotxing-1253163150.cos.ap-shanghai.myqcloud.com/AQS.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;//是否中断,传入的node是否是第一个节点
Node pred = null; // predecessor of node when enqueued

/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/

for (;;) {
//如果传入的节点为null,则它的上一个节点也是null,否则取到它的上一个节点,然后判断是否为空
//
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
//执行到这里的条件是,一开始first是false(默认是false),传入节点不是空值,并且上一个节点也存在,并且上一个节点不是
//头结点,也就是说前面有节点在等待

if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) { //如果first为ture,说明head == pred,也就是它的上一个节点是head节点,或者是没有上一个节点
boolean acquired;
try {
if (shared) //如果设置了共享模式,则调用共享模式的获取锁,否则以独占方式进行锁的获取
acquired = (tryAcquireShared(arg) >= 0); //共享模式只有CountdownLatch和ReentrantReadWriteLock
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false); //获取锁的时候如果出现异常,取消锁的获取
throw ex;
}
if (acquired) {
if (first) { //如果获取到了锁,并且当前节点的前一个节点是头结点,则将当前节点设置为head
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node); //如果是共享锁,则通知后面的一个节点,将状态改为wait,并且唤醒线程
if (interrupted)
current.interrupt(); //如果出现了中断,则中断当前的线程
}
return 1; //在拿到了锁之后,退出循环
}
}
if (node == null) { // 如果传入的节点为null,生成一个新的Node
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // 如果pred为null,则说明节点是新生成的,尝试将新节点插入到等待队列中去
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node)) //如果插入尾结点失败,将新节点的prev还设置为null
node.setPrevRelaxed(null); // back out
else
t.next = node; //将新节点插入到最后面
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) { //默认情况下status是0
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus(); //在挂起指定时间后,将节点的状态置位0,开始下一次循环
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}

TryAcquire 独占方式获取锁

如果当前锁的状态为0,也就是未持有,并且前面没有线程在等待获取锁,则会尝试使用cas操作进行锁的写入,并且将当前线程设置为独占线程。

1
2
3
4
5
6
7
8
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

cleanQueue 从尾结点向前清理所有的处于取消状态的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void cleanQueue() {
for (;;) { // restart point
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
if (q == null || (p = q.prev) == null)
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
break; // inconsistent
if (q.status < 0) { // cancelled
if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
q.prev == p) {
p.casNext(q, s); // OK if fails
if (p.prev == null) //如果节点没有prev,则唤醒节点
signalNext(p);
}
break;
}
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
p.casNext(n, q);
if (p.prev == null)
signalNext(p);
}
break;
}
s = q;
q = q.prev;
}
}
}

Release 释放锁

尝试调用实现类的tryRelease进行锁的释放,如果释放成功,通知下一个等待的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}

//ReentrantLock中的tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}

SignalNext 通知下一个节点

当节点不为空,并且下一个节点也不为空,并且状态不为0.

因为如果节点状态为0,说明是刚建立出来的,需要将状态该改为WAITING之后才能被唤醒

1
2
3
4
5
6
7
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}

Await() 程序挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode(); //新建一个有条件等待的节点
int savedState = enableWait(node); //如果没有抛出异常,说明已经正常加入到等待列表,并且释放了锁
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false;
while (!canReacquire(node)) { //如果当前节点还在等待队列中
if (interrupted |= Thread.interrupted()) { //检查是否出现了中断
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) //getAndUnsetStatus(COND)之后status为1,&COND也是11
break; // else interrupted after signal
} else if ((node.status & COND) != 0) { //如果节点状态为COND或者WAITING,与的结果不为0
try {
ForkJoinPool.managedBlock(node); //挂起节点,等待被释放
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
} //如果canRequire为true,说明当前线程已经进入了同步队列,开始正常的锁的抢占
LockSupport.setCurrentBlocker(null);
node.clearStatus(); //清空节点的状态,尝试获取锁
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}

EnableWait

将目标节点加入到等待列表的尾结点,并且释放锁

如果没有释放掉锁,说明当前不持有锁或者释放出错,直接抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) { //如果当前是持有锁的线程
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING); //CAS设置当前节点的状态为3
ConditionNode last = lastWaiter; //获取到最后一个等待的节点
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node; //将当前节点扔到最后
int savedState = getState();
if (release(savedState)) //释放当前持有的锁
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}

CanReAcquire

是否能够重新获取锁

如果节点从等待队列进入到了同步队列,则可以进行重新获取锁。

如果当前节点还在阿等待队列中,则会返回false

1
2
3
4
private boolean canReacquire(ConditionNode node) {
// check links, not status to avoid enqueue race
return node != null && node.prev != null && isEnqueued(node);
}

SignalAll

将所有等待队列中的节点,放入到同步队列中去,参与锁的竞争

1
2
3
4
5
6
7
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, true);
}

doSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);
if (!all)
break;
}
first = next;
}
}

getAndUnsetStatus

这是个比较隐晦的一个点,用来对node的节点进行修改。传入一个值v,然后会获取到当前内存中的值,然后与传入的值的取反进行与运算。

例如原本的值是EnableWait里面设定的3,传入的v是COND,也就是2,取反之后是-3 (10取反得到101)。然后3&-3 = 1,再把1存入到内存中对应的位置,也就是修改status为1

1
2
3
4
5
6
7
8
9
10
11
12
final int getAndUnsetStatus(int v) {     // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}

public final int getAndBitwiseAndInt(Object o, long offset, int mask) {
int current;
do {
current = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset,
current, current & mask));
return current;
}