需要明确指定执行策咯的任务:
依赖性任务
大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果.
使用线程封闭机制的任务
单线程的Executor能确保任务不会并发地执行,使你能够放宽代码对线程安全的要求.
对响应时间敏感的任务
如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该Executor管理的服务的响应性.
使用ThreadLocal的任务
只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值.
在一些任务中,需要拥有或排除某种特定的执行策咯.如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而去报它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行策咯而破坏安全性或活跃性。
在线程池中,如果任务依赖于其他任务,那么可能产生死锁。
每当提交了一个有依赖性的Executor任务时,要清除地知道可能会出现线程”饥饿”死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。
如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。
`限定任务等待资源的时间可以缓解执行时间较长任务造成的影响。`
Ncpu=number of CPUs
Ucpu=target CPU utilization,0<=ucpu<=1 w="" c="ratio" of="" wait="" time="" to="" comput="" 要使处理器达到期望的使用率,线程池的最优大小等于:="" nthreads=Ncpu*Ucpu*(1+W/C)
通过Runtime来获得CPU的数目:
int N_CPUS=Runtime.getRuntime().availableProcessors();
=1>=u
public ThreadPoolExecutor(int corPoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){...}
线程池的基本大小(Cor Pool Size),最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上线。
通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能会耗尽资源。
ThreadPoolExecutor允许提供一个BlockingQueue来保持等待执行的任务。基本的任务排队方法有3种:
无界队列
newFixedThreadPoole和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue,如果所有工作者线程都处于忙碌状态,那么任务将在队列种等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制的增加。
有界队列
ArrayBlocking、有界的LinkedBlocking、PriorityBlockingQueue(优先级排序)。有界队列有助于避免资源耗尽的情况发生。在使用有界队列时,队列的大小和线程池的大小必须一起调节。如果线程池小而队列大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但可能会限制吞吐量。只有当任务相互独立时,为线程池或工作队列设置界值才是合理的。
同步移交(Synchronous Handoff)
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个正真的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue种,必须有另一个线程正在等待接手这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策咯,这个任务将被拒绝。使用直接移交将更高效。只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue(Java6提供了一个新的非阻塞算法替代SynchronousQueue,吞吐量提高了3倍),是一种很好的默认选择。
有界队列带来的新问题:当队列填满后,新的任务该怎么办?
`采用饱和策咯。ThreadPoolExecutor的饱和策咯可以通过调用setRejectedExecutionHandler来修改`
JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含不同的饱和策咯:
“终止”策咯是默认饱和策咯。该策咯抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,根据需求编写自己的处理代码.”抛弃策咯”为抛弃最旧,因此不要将”抛弃最旧”的饱和策咯和优先级队列放在一起。可能抛弃优先级最高的任务。
“调用者运行”策咯实现了一种调节机制,既不会抛弃任务,也不会抛出异常。而是将某些任务回退到调用者,从而降低了新任务的流量。当服务器过载时,这种过载情况会逐渐想外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。
//创建一个固定大小的线程池,并采用有界队列以及"调用者运行"饱和策咯
ThreadPoolExecutor executor
= new ThreadPoolExecutor(N_THREADS,N_THREADS,
OL,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnbale>(CAPACITY));
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
//通过使用Semaphore(信号量)来控制任务的提交速率
@ThreadSafe
public class BoundedExecutor{
private final Executor exec;
private final Semaphore semaphore;
pubic BoundedExecutor(Executor exec,int bound){
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException{
semaphore.acquire();
try{
exec.execute(new Runnable(){
public void run(){
try{
command.run();
}finally{
semaphore.release();
}
}
});
}catch(RejectedExecutionException e){
semaphore.release();
}
}
}
* DiscardPolicy
* DiscardOldestPolicy
### 线程工厂ThreadFactory
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的,默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法可以定制线程池的配置信息。
public interface ThreadFactory{
//每当线程池需要创建一个新线程时都会调用这个方法。
Thread newThread(Runnable r);
}
如果应用程序中需要利用安全策咯来控制对某些特殊代码库的访问权限,那么可以通过Executor中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。
ThreadPoolExecutor提供了几个可以在子类化中改写的方法:beforeExecute、afterExecute和terminated。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会倍调用。如果任务在完成后带有一个Error,那么就不会调用afterExecute。如果beforeExecute抛出一个RunnableException,那么任务将不被执行,并且afterExecute也不会被调用。
public<T> void sequentialRecursive(List> nodes,
Collection results){
for(Node n:nodes){
results.add(n.compute());
sequentialRecursive(n.getChildren(),results);
}
}
public T void parallelRecursive(final Executor exec,
List> nodes,
final Collection results){
for(final Node n:nodes){
exec.execute(new Runnable(){
results.add(n.compute());
});
}
parallelRecursive(exec,n.getChildren(),results);
}