学习笔记

Java并发编程实战-8结构化并发应用程序_线程池的使用
Publish: 2018/7/23   

在任务与执行策咯之间的隐性耦合

需要明确指定执行策咯的任务:

在一些任务中,需要拥有或排除某种特定的执行策咯.如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而去报它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行策咯而破坏安全性或活跃性。

线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么可能产生死锁。

每当提交了一个有依赖性的Executor任务时,要清除地知道可能会出现线程”饥饿”死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。

运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。

`限定任务等待资源的时间可以缓解执行时间较长任务造成的影响。`

设置线程池的大小

    Ncpu=number of CPUs
    Ucpu=target CPU utilization,0<=ucpu<=1 w="" c="ratio" of="" wait="" time="" to="" comput="" 要使处理器达到期望的使用率,线程池的最优大小等于:="" nthreads=Ncpu*Ucpu*(1+W/C)

通过Runtime来获得CPU的数目:
    int N_CPUS=Runtime.getRuntime().availableProcessors();

配置ThreadPoolExecutor

public ThreadPoolExecutor(int corPoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler){...}

线程的创建与销毁

线程池的基本大小(Cor Pool Size),最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上线。

管理队列任务

通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能会耗尽资源。

ThreadPoolExecutor允许提供一个BlockingQueue来保持等待执行的任务。基本的任务排队方法有3种:

饱和策咯

有界队列带来的新问题:当队列填满后,新的任务该怎么办?

`采用饱和策咯。ThreadPoolExecutor的饱和策咯可以通过调用setRejectedExecutionHandler来修改`

JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含不同的饱和策咯:

“终止”策咯是默认饱和策咯。该策咯抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,根据需求编写自己的处理代码.”抛弃策咯”为抛弃最旧,因此不要将”抛弃最旧”的饱和策咯和优先级队列放在一起。可能抛弃优先级最高的任务。

“调用者运行”策咯实现了一种调节机制,既不会抛弃任务,也不会抛出异常。而是将某些任务回退到调用者,从而降低了新任务的流量。当服务器过载时,这种过载情况会逐渐想外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。

    //创建一个固定大小的线程池,并采用有界队列以及"调用者运行"饱和策咯
    ThreadPoolExecutor executor 
        = new ThreadPoolExecutor(N_THREADS,N_THREADS,
            OL,TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnbale>(CAPACITY));

    executor.setRejectedExecutionHandler(
        new ThreadPoolExecutor.CallerRunsPolicy());

    //通过使用Semaphore(信号量)来控制任务的提交速率
    @ThreadSafe
    public class BoundedExecutor{
        private final Executor exec;
        private final Semaphore semaphore;

        pubic BoundedExecutor(Executor exec,int bound){
            this.exec = exec;
            this.semaphore = new Semaphore(bound);
        }

        public void submitTask(final Runnable command) throws InterruptedException{
            semaphore.acquire();
            try{
                exec.execute(new Runnable(){
                    public void run(){
                        try{
                            command.run();
                        }finally{
                            semaphore.release();
                        }
                    }
                });
            }catch(RejectedExecutionException e){
                semaphore.release();
            }
        }
    }
* DiscardPolicy * DiscardOldestPolicy ### 线程工厂ThreadFactory 每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的,默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法可以定制线程池的配置信息。
    public interface ThreadFactory{
        //每当线程池需要创建一个新线程时都会调用这个方法。
        Thread newThread(Runnable r);
    }

如果应用程序中需要利用安全策咯来控制对某些特殊代码库的访问权限,那么可以通过Executor中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。

扩展ThreadPoolExecutor

ThreadPoolExecutor提供了几个可以在子类化中改写的方法:beforeExecute、afterExecute和terminated。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会倍调用。如果任务在完成后带有一个Error,那么就不会调用afterExecute。如果beforeExecute抛出一个RunnableException,那么任务将不被执行,并且afterExecute也不会被调用。

递归算法并行化

    public<T> void sequentialRecursive(List> nodes,
                                             Collection results){
        for(Node n:nodes){
            results.add(n.compute());
            sequentialRecursive(n.getChildren(),results);
        }
    }

    public T void parallelRecursive(final Executor exec,
                                    List> nodes,
                                    final Collection results){
        for(final Node n:nodes){
            exec.execute(new Runnable(){
                results.add(n.compute());
            });
        }
        parallelRecursive(exec,n.getChildren(),results);
    }


← Java并发编程实战-12高级主题_显示锁 Java并发编程实战-13高级主题_构建自定义的同步工具 →

Powered by Hexo, Theme designs by @hpcslag.
Style-Framework Tocas-UI designs by @yamioldmel