在并发程序中,基于状态的条件可能会由于其他线程的操作而改变:一个资源池可能在几条指令之前还是空的,但现在却变为非空的,因为另一个线程可能会返回一个元素到资源池。依赖状态的操作可以一直阻塞直到可以继续执行,这比使它们先失败再实现起来要更为方便且更不易出错。
有界缓存提供的put和take操作中都包含有一个前提条件:不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。
有界缓存的几种实现:
每种实现中都扩展了BaseBoundedBuffer,在这个类中实现了一个基于数组的循环缓存,其中各个缓存状态变量(buf,head,tail和count)均由缓存的内置锁来保护。还提供了同步的doPut和doTake方法,并在子类中通过这些方法来实现put和take操作,底层的状态对子类隐藏。
@ThreadSafe
public abstract class BaseBoundedBuffer{
@GuardedBy("this") private final V[] but;
@GuardedBy("this") private int tail;
@GuardedBy("this") private int head;
@GuardedBy("this") private int count;
protected BaseBoundedBuffer(int capacity){
this.buf=(V[])new Object(capacity);
}
protected synchronized final void doPut(V v){
buf[tail] = v;
if(++tail==buf.length)
tail=0;
++count;
}
protected synchronized final V doTake(){
V v=buf[head];
buf[head]=null;
if(++head==buf.length)
head=0;
--count;
return v;
}
public synchronized final boolean ifFull(){
return count == buf.length;
}
public synchronized fianl boolean isEmpty(){
return count==0;
}
}
异常应该用于发生异常条件的情况中。调用者必须做好捕获异常的准备,并且在每次缓存操作时都需要重试:如果将状态依赖性交给调用者管理,那么将导致一些功能无法实现,例如维持FIFO顺序,由于迫使调用者重试,因此失去了”谁先到达”的信息。
要选择合适的休眠时间间隔,就需要在响应性与CPU使用率之间进行权衡。休眠的间隔越小,响应性就越高,但消耗的CPU资源也越高。
条件队列使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。每个Java对象可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法构成了内部条件队列的API。
Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。
//使用条件队列实现的有界缓存
@ThreadSafe
public class boundedBuffer extends BaseBoundedBuffer{
//条件谓词:not-full(!isFull())
//条件谓词:not-empty(!isEmpty())
public BoundedBuffer(int size){super(size);}
//阻塞并直到:not-full
public synchronized void put(V v) throws InterruptedException{
while(ifFull())
wait();
doPut(V);
notifyAll();
}
//阻塞并直到:not-empty
public synchronized V take() throws InterruptedException{
while(isEmpty())
wait();
V v=doTake();
notifyAll();
return v;
}
}
要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。条件谓词是使某个操作成为状态依赖操作的前提条件。
将与条件队列相关的条件谓词以及在这些条件谓词上等待的操作都写入文档。
在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须闲持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。
每一次wait调用者都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。
//状态依赖方法的标准形式
void stateDependentMethod()throws interruptedException{
//必须通过一个锁来保护条件谓词
synchronized(lock){
while(!conditionPredicate())
lock.wait();
//现在对象处于合适的状态
}
}
当使用条件等待时(例如Object.wait或Condition.await):
活跃性故障:死锁、活锁、丢失信号。
丢失信号:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。
每当在等待一个条件时,第一要确保在条件谓词变为真时通过某种方式发出通知。
在调用notify时,JVM会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll则会唤醒所有在这个条件队列上等待的线程。由于多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用notify而不是notifyAll,那么将是一种危险的操作,因为单一的通知很容易导致类似于信号丢失的问题。
只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:
//优化
//条件通知:当put或take影响到这些状态转换时,才发出通知。
public synchronized void put(V v) throws InterruptedException{
while(isFull())
wait();
boolean wasEmpty=isEmpty();
doPut(V);
if(wasEmpty)
notifyAll();
}
要想支持子类化,在设计时需要保证:如果实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。
通常,我们应该吧条件队列封装起来,因而除了使用条件队列的类,就不能在其他地方访问它。重新设计为使用私有的锁对象和条件队列代替缓存对象自身既是锁,又是条件队列,新设计不再支持任何形式的客户端加锁。
入口协议就是该操作的条件谓词,出口协议包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列。
在AbstractQueuedSynchronizer中使用出口协议。这个类并不是由同步器执行自己的通知,而是要求同步器方法返回一个值来表示该类的操作是否已经解除了一个或多个等待线程的阻塞。这种明确的API调用需求使得更难以”忘记”在某些状态转换发生时进行通知。
public interface Codition{
void wait() throws InterruptedException;
boolean await(long time,TimeUnit unit) throws InterruptedException;
void awaitNanos(long nanosTimeout)throws InterruptedException;
void awaitUniterruptibly();
boolean awaitUntil(Date deadlins) throws InterruptedException;
void signal();
void signalAll();
}
一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。Condition比内置条件队列提供了更丰富的功能:在每个锁上可以存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平或非公平的队列操作。对于每个Lock,可以有任意数量的Condition对象。
特别注意:在Condition对象中,与wait、notify和notifyAll方法对应的分别是await、signal和singalAll。但是,Condition对Object进行了扩展,因而它也包含wait和notify方法。一定要确保使用正确的版本——await和signal。
signal比signalAll更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。
ReentrantLock和Semaphore这两个接口存在许多共同点,都可以用做一个”阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或qcquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回”假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。它们在实现时都使用了一个共同的基类,即AbstractQueredSynchronizer(AQS).
AQS是同步类的基类,是一个用于构建锁和同步器的框架,基于AQS构建的包括:RenntrantLock、Semaphore、CountDownLatch、RenntrantReadWriteLock、SynchronousQueue和FutureTask。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。最基本的操作包括获取操作和释放操作。
//AQS中获取操作和释放操作的标准形式
boolean acquire() throws InterruptedException{
while(当前状态不允许获取操作){
if(需要阻塞获取请求)
如果当前线程不在队列中,则将其插入队列
阻塞当前线程
}else
返回失败
}
可能更新同步器的状态
如果线程位于队列中,则将其移除队列
返回成功
}
void release(){
更新同步器的状态
if(新的状态允许某个被阻塞的线程获取成功)
解除队列中一个或多个线程的阻塞状态
}
AQS中的获取操作可以是一种独占操作(例如:RenntrantLock)也可以是一个非独占操作(例如:Semaphore和CountDownLatch)。一个获取操作包括两部分:
如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括:tryAcquire、tryRelease和isHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquireShared和tryReleaseShared等方法。AQS中的accuire、acquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和是否操作的语义,使用getState、setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类”获取”或”释放”同步器的操作是否成功。
@ThreadSafe
public class OneShotLatch{
private final Sync sync=new Sync();
prublic void signal(){ sync.releaseShared(0); }
public void await() throws InterruptedException{
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer{
protected int tryAcquireShared(int ignored){
//如果闭锁是开的(state == 1),那么这个操作将成功,否则将失败
return (getState() == 1) ? 1 : -1;
}
}
protected boolean tryReleaseShared(int ignored){
setState(1); // 现在打开闭锁
return true; // 现在其他的线程可以获取该闭锁
}
}
只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。
//基于非公平的RenntrantLock实现tryAcquire
protected boolean tryAcquire(int ignored){
final Thread current = Thread.currentThread();
int c = getState();
if(c == 0){
if(compareAndSetState(0,1){
owner = current;
return true;
}
}else if(current == owner){
setState(c+1);
return true;
}
return false;
}
当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。
Semaphore将AQS的同步状态用于保存当前可用许可的数量。tryAcquireShared方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子方式来降低许可的数量。如果这个操作成功(这意味着许可的计数自从上一次读取后就没可以被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。
//Semaphore中的tryAcquireShared与tryReleaseShared
protected int tryAcquireShared(int acquires){
while(true){
int available = getState();
int remaining = available - acquires;
if(remaining<0||compareandsetstate(available,remaining)) return="" remaining;="" }="" protected="" boolean="" tryreleaseshared(int="" releases){="" while(true){="" int="" p="getState();" if(compareandsetstate(p,p+releases))="" true;="" <="" pre="">
CountDownlatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值。countDown方法调用release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await调用acquire,当计数器为零时,acquire将立即返回,如果将阻塞。
FutureTask
Future.get的语义非常类似于闭锁的语义——如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
在FutureTask中,AQS同步状态用来保存任务的状态。FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运动状态),因而如果任务取消,该线程就会中断。
ReentrantReadWriteLock
ReadWriteLock接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。
0||compareandsetstate(available,remaining))>