AbstractQueuedSynchroizer(AQS)同步器详解详解
一、什么是同步器
同步器是用来构建锁或者其他同步组件的基础框架,它使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,它能实现大部分的同步需求。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程的并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了状态管理、线程的排队、等待与唤醒等底层操作。锁和同步容器很好地隔离了使用者和实现者所需要关注的领域。
二、同步器的基本成员(介绍常用的类好方法)
Node 是AQS的内部类构成AQS队列的一种数据结构。
站在用户的角度思考问题,与客户深入沟通,找到黄州网站设计与黄州网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:网站建设、成都做网站、企业官网、英文网站、手机端网站、网站推广、域名申请、雅安服务器托管、企业邮箱。业务覆盖黄州地区。
成员变量 | 作用 |
---|---|
waitStatus | 记录节点的等待状态。包括如下状态:① CANCELLED,值为1,由于同步队列中等待线程超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化。② SIGNAL值为-1,后继节点的线程处于等待状态,而当前线程如果释放了同步状态或者取消,将会通知后继节点,使得后继节点得以运行。③ CONDITION值为-2,节点在等待队列中,节点等待在Condtion上,当其他线程对Condtion调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。④ PROPAGATE值为-3,表示下一次共享式同步状态将会无条件地被传播下去。⑤ INITAL,值为0,初始状态 |
SHARED = new Node() | 表示共享式的node |
EXCLUSIVE = null | 独占式的node |
Node prev | Node的前节点 |
Node next | Node的后节点 |
nextWaitert | 等待队列的中node的下一个节点 |
ConditionObject是AQS的内部类构成类似Object的等待/通知机制。
成员/方法 | 作用 |
---|---|
Node firstWaiter | 等待队列的头节点 |
Node lastWaiter | 等待队列的尾节点 |
await() | 当前线程进入等待状态知道被通知或中断,当前线程进入运行状态且从await()返回的情况如下,包括:① 其它线程调用Interrupt()方法中断当前线程。② 如果当前线程从await()方法返回,那么表明该线程已经获取了Condtion对象锁对应的锁 |
awaitUninterruptibly() | 当前线程进入等待直到被通知,该方法对中断不敏感 |
awaitNanos(long nanosTimeout)) | 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余的时间,如果在nanosTimeout纳秒之前被唤醒,那么返回就是(nanosTimeout-实际耗时);如果返回是0或者负数,那么可以认定已经超时了 |
awaitUntil(Date deadline) | 当前线程进入等待状态直到被通知、中断或者某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示超时,方法返回false |
signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关的锁 |
signalAll | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁 |
AQS主要成员
成员变量 | 作用 |
---|---|
state | 维护锁的一个变量(同步状态,很重要)① setState 。② getState。 ③ compareAndSetState。 |
Node head | FIFO同步队列的头结点 。 |
Node tail | FIFO同步队列的尾结点 。 |
AQS主要方法
方法名 | 作用 |
---|---|
acquire() | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法会调用重写的tryAcquire(arg)方法(需要锁自己实现) |
release(int arg) | 独占式释放状态,如果释放状态成功,则会去唤醒头结点;释放状态调用tryRelease(arg)方法(需要自己实现) |
acquireShared(int arg) | 共享式获取同步状态,也就是说可以几个线程同时获取同步状态,如果当前线程未获取同步状态,将会进入同步队列。 |
releaseShared() | 共享式释放状态,释放之后会唤醒头结点 |
acquireInterruptibly() | 响应中断的独占式获取同步状态,当前线程未获取同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出中断异常,并返回 |
tryAcquireNanos() | 在acquireInterruptibly()基础上增加超时限制,如果当前线程在超时时间内没有获取同步状态,那么将返回false,如果获取到了返回true |
acquireSharedInterruptibly() | 响应中断的共享式获取同步状态 |
tryAcquireSharedNanos() | 在acquireSharedInterruptibly()的基础上增加超时限制 |
以上就是AQS的一些基本成员和方法,下面主要从现实的角度分析这些方法,理解这些方法的实现,能刚好的帮助我们去理解锁。
三、AQS的方法实现分析
1)、独占系列的方法
①、acquire()独占式获取同步状态,表示只会有一个线程获取,其它线程进入同步队列。
源代码如下:
// 获取锁的方法(独占模式)
public final void acquire(int arg) {
// tryAcquire(arg) 这个方法需要我们自己去实现,如果获取失败,
// 调用addWaiter构造节点
// acquireQueued
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们可以看见acquire方法内部调用了tryAcquire(arg)方法,这个方法需要构造同步组件的类自己去实现,不过返回值已经被AQS定义好了,返回true代表获取同步状态成功,返回false代表失败,需要将线程构造节点加入同步队列,就是调用acquireQueued这个方法。
acquireQueued这个方法实际是先去调用了addWaiter方法。
addWaiter(),这个方法其实就是把当前节点加入到同步队列,加入成功才返回,其实队列初始化时会制造一个空的节点,然后在空的节点后面设置同步节点(可以理解为每次获取获取锁的那个线程其实就是头结点,它是一个空的节点,结合acquireQueued方法,每次获取锁之后,该节点就会升级为头节点,并且变成一个空节点。)
private Node addWaiter(Node mode) {
// 构造一个节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取尾节点
Node pred = tail;
// 判断尾节点是否为空
if (pred != null) {
// 不为空,设置node节点的上一个节点为pred(也就是尾节点)
node.prev = pred;
// cas设置尾节点
if (compareAndSetTail(pred, node)) {
// 成功后,设置pred节点的next节点为node,返回
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued()方法,通过上面的addWaiter方法我们已经把这个节点加入同步队列,接下来需要处理这个节点。首先判断自己的前节点是否是头结点,自己是否获取到同步状态,如果满足,把自己设置尾头结点,返回,如果不是,进入shouldParkAfterFailedAcquire(详情见后面方法分析)方法主要作用是判断自己的前置节点是否是SIGNAL状态,是的话自己就可以阻塞自己了,调用parkAndCheckInterrupt(详情见后面方法分析)方法,直到被唤醒或者中断。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前一个节点
final Node p = node.predecessor();
// p是头结点,自己获取锁成功
if (p == head && tryAcquire(arg)) {
// 设置自己为头节点,变成一个空节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 找到前节点为signal,然后阻塞自己
// 清理等待超时或者中断的节点
// 尝试设置线程的状态为signal
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 出现异常
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法,这个方法主要做三件事,判断自己的前置节点是否是SIGNAL,返回true,就可以阻塞了,不是如果状态大于0,证明前面的节点被中断或者超时了,需要从队列清理了,不是大于0,就利用cas设置前置节点为SIGNAL,返回false。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取node前节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 如果pred节点释放了状态,会通知自己
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 大于0证明,前面的线程等待超时或者已经被中断,需要从节点中移除
// 需要找到不大于0的那个节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 找到小于等于0的前节点,设置为SIGNAL
// 这个地方ws值只会为PROPAGATE或者0
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt方法,这个方法需要前面的方法返回true才会执行,它会阻塞node的这个线程,返回线程的中断中断状态并清理Thread.interrupted(),所以独占式获取同步状态对中断不响应的。
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire方法,在finally块里面,出现异常就会执行这个方法,做一些处理当前node的操作。
// 异常后,finally里面执行的方法
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// node是为节点,设置尾节点是pred
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 不是头结点和尾节点,前节点是SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
附上一张acquire独占式获取同步状态的流程图:
②、release()独占式释放同步状态,释放线程后唤醒节点。
源代码:
其中tryRelease也需要同步组件自己去实现,语义也被AQS所定义,true代表释放成功,false代表失败,如果为true,就需要决定是否去唤醒节点,首先获取同步队列的头节点,判断头结点不是空,证明有同步对别有节点才需要唤醒,判断头结点不是刚刚初始化,如果是刚刚初始化,就还没有阻塞,请参考acquire的acquireQueued处理节点的逻辑,都为true执行unparkSuccessor方法,false返回。
public final boolean release(int arg) {
// 释放锁
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空,证明初始化了
// 证明头结点不是刚刚创建
// 那就可以去唤醒头结点或者它的后继节点
// 为0就证明没有其他节点了,不需要唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor()方法,唤醒节点,唤醒的node的后置节点,因为在获取同步状态是我们阻塞的也是后置节点,唤醒后置节点后,会去找到前节点,也就是当前的结点去获取同步状态,然后再把自己变成头结点。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 没有这个节点或者超时或者被中断了,查找一个可以用的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 证明有这个节点
if (s != null)
LockSupport.unpark(s.thread);
}
独占式释放同步状态的流程图:
③、acquireInterruptibly()响应中断的独占式获取同步状态
可以看出如果线程中断立马返回异常,然后再去执行tryAcquire()获取同步状态,获取失败执行doAcquireInterruptibly方法。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly()方法,和acquire的acquireQueued的方法差不多,区别就是在parkAndCheckInterrupt这个方法如果返回true,就会抛异常InterruptedException,说明这个方法响应异常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
④、tryAcquireNanos()带时间的获取同步状态,在时间内获取到,返回true,超时返回false,首先判断线程中断状态,为true就抛异常,为false就尝试获取同步状态tryAcquire,获取失败执行doAcquireNanos方法。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos()方法,其实这个都是在acquire方法上的改进,我们看看这个方法,首先算下时间也就是deadline,然后加入同步队列addWaiter方法,然后判断node的前节点是否为头结点,是就尝试获取同步状态,都为true就返回,为false就接着算下时间,判断node前节点是否为SIGNAL,也就是shouldParkAfterFailedAcquire这个方法,为true,线程阻塞计算的时间,然后true(等待阻塞时间到)和false都判断线程中断状态,中断就抛出异常,执行异常方法,不为true,继续循环,直到获取锁或者超时。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2)、共享系列的方法
①、acquireShared共享式获取同步状态,获取失败就加入同步队列
AQS也把语义指定好了,返货负数证明没有了,就执行doAcquireShared方法
public final void acquireShared(int arg) {
// 返回负数就证明没有锁了,加入同步队列
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared()方法,首先构造节点加入队列addWaiter,然后获取node的前节点,判断node的前节点是否为头结点,如果是,获取资源的个数,如果资源大于等于0,调用setHeadAndPropagate方法,然后返回,不满足,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法和独占式一样。
private void doAcquireShared(int arg) {
// 构建共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前节点
final Node p = node.predecessor();
// 前节点是否是头节点
if (p == head) {
// 获取锁的个数
int r = tryAcquireShared(arg);
// 大于等于0,获取锁成功
if (r >= 0) {
setHeadAndPropagate(node, r); // 设置头结点,如果有多余资源接着唤醒
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate()方法,设置头结点,设置waitStatus为Propagate,为如果还有资源,唤醒后面的节点,调用doReleaseShared方法(这个方法会在共享式释放同步状态详解)
private void setHeadAndPropagate(Node node, int propagate) {
// 头结点
Node h = head; // Record old head for check below
// 设置头结点为node
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 还有资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 当前节点的next节点是共享或者没有next节点
if (s == null || s.isShared())
// 唤醒后置节点
doReleaseShared();
}
}
②、releaseShared()共享式释放状态
tryReleaseShared是需要同步组件自己去实现,释放成功调用doReleaseShared唤醒节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()方法,方法有些复杂,不好理解,我们主要来分析三个if的含义,第一个 if (ws == Node.SIGNAL) 表示当前node需要被唤醒,然后后面利用cas设置waitStatus为0,因为是共享模式可能有多个线程同时来释放同步状态,所以只能有一个释放成功,另外一个重试;第二个else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),其实也是用来处理并发的,当第一次并发失败的线程第二次进入时,可能会看到ws等于0(因为成功的线程设置的),所以利用cas设置为PROPAGATE,表示传递,这里补充一下不管是0或者PROPAGATE,都会被唤醒的线程利用cas设置为SIGNAL(参考shouldParkAfterFailedAcquire方法);第三个(h == head)
- head.waitStatus的初始值必然为SIGNAL,因此在并发时,必然只有一个线程A能将等待状态由 SIGNAL CAS更新为 0,
该线程A会唤醒其他线程B 被唤醒的线程B会首先执行setHead
因此如果最后h!=head,说明新一轮的唤醒竞争已经开始,当前线程c已经觉察到,因此继续参与竞争,加快唤醒
因此如果最后h==head,说明新一轮的唤醒竞争尚未开始,而被唤醒的线程B必然会开启新一轮的唤醒竞争,而当前线程c可以安心退出唤醒竞选private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 头结点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 表示后面节点需要唤醒 if (ws == Node.SIGNAL) { // 多线程控制并发 ,可能存在多个线程同时来修改 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 如果ws等于0,尝试把cas设置waitStatus为PROPAGATE,传递下去 // 请联系shouldParkAfterFailedAcquire方法一起看 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果头结点没有发生变化,表示设置完成,退出循环 // 如果发生变化,加入唤醒的过程(加速唤醒,可能存在多个线程在唤醒这些node,速度比一个接一个要快) if (h == head) // loop if head changed break; } }
③、acquireSharedInterruptibly()和tryAcquireSharedNanos()一个响应中断,一个响应中断支持添加获取的超时时间(参考独占模式的这些方法)
3)、ConditionObject系列方法
①、await()方法,类似Object的await方法,阻塞线程释放锁。
我们可以看见await的第一步是调用addConditionWaiter方法,它的作用是构建等待节点加入队列的尾部,使用的也是AQS的Node,队列里面顺便也会清理清除Node不为CONDITION的节点;第二步需要释放线程获取的同步状态fullyRelease方法;第三步:阻塞线程,找到线程中断时机,也就是调用signal方法的前后顺序;第四步:调用acquireQueued方法处理节点(阻塞还是其它);第五步:清理节点unlinkCancelledWaiters方法(清除Node不为CONDITION的节点);第六步:响应await语义,await阻塞线程时调用interrupt方法会抛异常reportInterruptAfterWait方法。public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 加入等待队列,清除节点 Node node = addConditionWaiter(); // 释放状态 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 阻塞线程 LockSupport.park(this); // 线程是被中断唤醒的 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 加入同步队列 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // 清理节点 unlinkCancelledWaiters(); if (interruptMode != 0) // 响应中断 await语义 reportInterruptAfterWait(interruptMode); }
addConditionWaiter方法每次都是加入队列的尾部
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void unlinkCancelledWaiters() { // 获取第一个 Node t = firstWaiter; Node trail = null; while (t != null) { // 获取第一个的下一个 Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { // t需要断开连接 t.nextWaiter = null; // 第一次trail = null // firstWaiter = next if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
isOnSyncQueue方法判断node是否在同步队列中
final boolean isOnSyncQueue(Node node) { // 节点状态为CONDITION ,或者node.prev == null 等待节点没有前置节点 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 等待节点没有next节点 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 循环查找 return findNodeFromTail(node);
checkInterruptWhileWaiting 方法,判断是否是中断唤醒,这方法就是为了确认中断的时机是在signal的前面还是后面signal,因为需要响应中断
private int checkInterruptWhileWaiting(Node node) { // 判断是否是线程中断唤醒 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { // 设置成功表示在signal 执行之前 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ // 设置成功表示在signal 执行之后 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
acquireQueued和unlinkCancelledWaiters方法前面都介绍过了,一个是加入同步队列,一个是清理节点,介绍下reportInterruptAfterWait方法,它是我为了响应线程Interrupt方法,interruptMode == THROW_IE只在在signal方法后调用Interrupt方法才满足,线程阻塞时调用Interrupt方法会抛异常,这是Object.await里面满足的,请参考checkInterruptWhileWaiting方法里面的transferAfterCancelledWait方法理解其实现。
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
②、awaitNanos(long nanosTimeout)和awaitUntil(Date deadline)都是提供了超时时间,和await方法类似,只是加入了时间机制。
③、awaitUninterruptibly不响应中断方法,发现里面都没有判断是都发生中断的标记,只有调用signal唤醒node,循环才会结束,然后调用acquireQueued处理这个节点(阻塞还是其它)public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
④、signal方法,唤醒第一个等待队列的node。
public final void signal() { // 判断是否获取锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 初始化 Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal方法,首先把这个first节点和等待队列断开连接,然后把调用transferForSignal方法把节点从等待队列加入同步队列,唤醒节点的线程,然后被唤醒的线程就会在await方法里面执行acquireQueued这个方法。
private void doSignal(Node first) { do { // 队列里面即将没有节点,所以首尾都要为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 把first 断开连接 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
⑤、signalAll唤醒所有等待队列的节点加入同步队列,并且清空等待队列
public final void signalAll() { // 判断是否获取锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取第一个节点 Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { // 队列设置为null lastWaiter = firstWaiter = null; // 从首节点开始加入同步队列,知道队列为空 do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
四、总结
我们学习AQS其实我觉得主要从三个方面,也就是本文的第三部分,从独占式获取和释放同步状态、共享式获取和释放同步状态和ConditionObject里面的等待/通知机制;这里在说一下独占式释放锁和共享式释放锁,独占式因为只会有一个线程获取同步状态,所以释放时也只会有一个,但是在共享这一块,我们在释放同步同步状态时可能会有多个线程同时来释放,可能出现并发的情况,理解doReleaseShared是理解共享式释放的重点;学习获取和释放同步状态,理解同步队列节点的变化是重点;学习等待/通知理解等待队列和同步队列的关系和节点的转换;只有学习好了AQS才能更好的学习后面JUC的那些锁。
最后感慨下AQS里面的逻辑是真心有些绕,本人有些理解的可能有些不够。
参考《Java 并发编程的艺术》
当前文章:AbstractQueuedSynchroizer(AQS)同步器详解详解
文章链接:http://hbruida.cn/article/pdeepj.html