0%

深入理解AQS

前言

本文是介绍AbstractQueueSynchronizer。

目录

一、概念

队列同步器AbstractQueuedSynchronizer(以下简称同步器或AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过维护一个双向链表FIFO队列来完成资源获取线程的排队工作。它能够成为实现大部分同步需求的基础。

1
2
3
4
5
6
7
8
9
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

/**
* The synchronization state.
*/
private volatile int state;
}

二、使用

AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法getState()setState(int newState)compareAndSetState(int expect,int update)来进行操作,因为它们能够保证状态的改变是安全的。

自定义一个同步组件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
*类说明:实现自己独占锁,不可重入
*/
public class SelfLock implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {

/*判断处于占用状态*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}

/*获得锁*/
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/*释放锁*/
@Override
protected boolean tryRelease(int arg) {
if(getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

/* 返回一个Condition,每个condition都包含了一个condition队列*/
Condition newCondition() {
return new ConditionObject();
}
}

/* 仅需要将操作代理到Sync上即可*/
private final Sync sync = new Sync();

public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}

public boolean tryLock() {
return sync.tryAcquire(1);
}

public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}

public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

使用自定义同步组件同步线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final Lock lock = new SelfLock();
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
...
} finally {
lock.unlock();
}
}
}

public void test() {
// 启动4个子线程
for (int i = 0; i < 4; i++) {
Worker w = new Worker();
w.start();
}
}

在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch、ThreadPoolExecutor的Worker类等)。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:

  • 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
  • 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
  • 实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

模板方法设计模式

同步器的设计基于模板方法模式。模板方法模式的意图是,定义一个操作中的算法的骨架,而将一些步骤的实现延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。

三、AQS中的方法

实现自定义同步组件时,将会调用同步器提供的模板方法。

模板方法

img

这些模板方法同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放、同步状态和查询同步队列中的等待线程情况。

可重写的方法

img img

访问或修改同步状态的方法

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

  • getState():获取当前同步状态。
  • setState(int newState):设置当前同步状态。
  • compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。

四、实现原理

基于CLH队列锁,即Craig, Landin, and Hagersten (CLH) lock queue。是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。

当一个线程需要获取锁时:

  1. 创建一个的QNode,将其中的locked设置为true表示需要获取锁,myPred表示对其前驱结点的引用。
  2. 线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred。
  3. 线程就在前驱结点的locked字段上旋转,直到前驱结点释放锁(前驱节点的锁值 locked == false)。
  4. 当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点。下一个结点就获取到锁。

CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个myNode,L个锁有L个tail)。CLH队列锁常用在SMP体系结构下。Java中的AQS是CLH队列锁的一种变体实现。

五、ReentrantLock的实现

锁的可重入

重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。

1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。

2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。

非公平锁的nonfairTryAcquire方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
非公平锁和公平锁

ReentrantLock的构造函数中,默认的无参构造函数将会把Sync对象创建为NonfairSync对象,这是一个非公平锁;而另一个构造函数ReentrantLock(boolean fair)传入参数为true时将会把Sync对象创建为公平锁FairSync,传入false则创建非公平锁NonfairSync。构造函数如下:

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

区别:是否排队等待获取锁。

非公平锁:nonfairTryAcquire(int acquires)方法,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

公平锁:tryAcquire方法,判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

总结

深入理解AQS(AbstractQueuedSynchronizer)

图文并茂,带你深入了解AQS的源码