学习笔记

Java并发编程实战-7结构化并发应用程序_取消与关闭
Publish: 2018/7/23   

Java没有提供任何机制来安全地终止线程。

任务取消

取消某个操作的原因?

一个可取消的任务必须拥有取消策略(Cancellaion Policy),在这个策略中将详细地定义取消操作的”How”、”When”以及”What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消时应该执行哪些(What)操作。

中断

    public class Thread{
        //设置中断状态为true
        public void interrupt(){ ... }
        //返回目标线程的中断状态
        public boolean isInterrupted(){ ... }
        //清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法
        public static boolean interrupted(){ ... }
        ...
    }

阻塞库方法(如wait、sleep、join)接收到中断请求或在开始执行前检查中断状态,响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。

调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

通常,中断是实现取消的最合理方式。

中断策略

为什么大多数阻塞库抛出InterruptedException作为中断响应?

尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

如果将InterruptedException传递给调用者外还需要执行其他操作,那么应该在捕获InterruptedException之后恢复中断状态:

    Thread.currentThread().interrupt();

由于每个线程拥有各自的中断策咯,因此除非你直到中断对该线程的含义,否则不应该中断这个线程。

响应中断

处理InterruptedException的策咯?

只有实现了线程中断策咯的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞的方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试,在这种情况下,它们应该在本地保存中断状态。

    public Task getNextTask(BlockingQueue<Task> queue){
        boolean interrupted=false;
        try{
            while(true){
                try{
                    return queue.take();
                }catch(InterruptedException e){
                    interrupted = true;
                    //重新尝试
                }
            }finally{
                if (interrupted){
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

通过Future来实现取消

ExcutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)mayInterruptIfRunning?

    public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
        Future<?> task=taskExec.submit(r);
        try {
            task.get(timeout,unit);
        }catch (TimeoutException e){
            //接下来任务将被取消
        }catch (ExecutionException e){
            //如果在任务中抛出异常,那么重新抛出该异常
            throw launderThrowable(e.getCause());
        }finally {
            //如果任务已经结束,那么执行取消操作也不会带来任何影响
            task.cancel(true);//如果任务正在运行,那么将被中断
        }
    }

当Future.get抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

处理不可中断任务

    //既能处理标准的中断,也能关闭底层的套接字。
    public void interrupt(){
        try{
            socket.cloase();
        }catch (IOException e){

        }finally {
            super.interrupt();
        }
    }

采用newTaskFor来封装非标准的取消

    public interface CancellableTask<T> extends Callable {
        void cancel();

        RunnableFuture<T> newTask();
    }

    @ThreadSafe
    public class CancellingExecutor extends ThreadPoolExecutor {
        ...
        //创建Future来代表任务,RunnableFuture扩展了Future和Runnable(并由FutureTask实现)     
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            if (callable instanceof CancellableTask) {
                return ((CancellableTask<T>) callable).newTask();
            } else {
                return super.newTaskFor(callable);
            }
        }
    }

    public abstract class SocketUsingTask<T> implements CancellableTask<T> {
        @GuardedBy("this")
        private Socket socket;

        protected synchronized void setSocket(Socket s) {
            socket = s;
        }

        public synchronized void cancel() {
            try {
                if (socket != null) socket.close();
            } catch (IOException e) {
            }
        }

        @Override
        public RunnableFuture<T> newTask() {
            return new FutureTask<T>(this) {
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    try {
                        SocketUsingTask.this.call();
                    } finally {
                        return super.cancel(mayInterruptIfRunning);
                    }
                }
            };
        }
    }

SocketUsingTask实现了CancellableTask,并定义了Future。cancel来关闭套接字和调用super.cancel。如果SocketUsingTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断。因此它提高了任务对取消操作的响应性:不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻调的套接字I/O方法。

停止基于线程的服务

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

示例:日志服务

    public class LogService{
        private final BlockingQueue<String> queue;
        private final LoggerThread loggerThread;
        private final PrintWriter writer;
        @GuardedBy("this") private boolean isShutdown;
        @GuardedBy("this") private int reservations;

        public void start(){loggerThread.start();}
        public void stop(){
            synchronized (this) {isShutdown=true;}
            loggerThread.interrupt();
        }

        /*以原子方式来检查关闭请求,并且有条件地递增一个计数器来"保持"提交消息的权利。
        代替会产生竞态条件的标志位,因为需要锁log方法*/
        public void log(String msg)throws InterruptedException{
            synchronized (this){
                if (isShutdown){
                    throw new IllegalStateException(...);
                    ++reservations;
                }
            }
            queue.put(mag);
        }

        private class LoggerThread extends Thread{
            public void run(){
                try{
                    while (true){
                        try{
                            synchronized (LogService.this){
                                if (isShutdown&&reservations==0)
                                    break;
                            }
                            String msg=queue.take();
                            synchronized (LogService.this){--reservations;}
                            writer.println(msg);
                        }catch (InterruptedException e){/* retry */}
                    }
                }finally {
                    writer.close();
                }
            }
        }
    }

关闭ExecutorService

解决shutdownNow局限性?

    public class TrackingExecutor extends AbstractExecutorService {
        private final ExecutorService exec;
        private final Set<Runnable> tasksCancelledAtShutdown
                = Collections.synchronizedSet(new HashSet<Runnable>());
        ...
        public List<Runnable> getCancelledTask(){
            if (!exec.isTerminated())
                throw new IllegalStateException(...);
                return new ArrayList<>(tasksCancelledAtShutdown);
        }

        public void execute(final Runnable runnable){
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        runnable.run();
                    }finally {
                        /*记录哪些任务是在关闭后取消的。
                         这里存在竞态条件,如果执行2次和执行一次该任务的结果是一致的
                         ,不会存在问题,否则必须考虑这种风险,并对"误报"问题做好准备.*/
                        if (isShutdown()&&Thread.currentThread().isInterrupted())
                            tasksCancelledAtShutdown.add(runnable);
                    }
                }
            });
        }
    }

“毒丸”对象

指一个放在队列上的对象,其含义是”当得到这个对象时,立即停止”。只有在生产者和消费者的数量都已知的情况下,才可以使用。只有在无界队列中,”毒丸”对象才能可靠地工作。(界队列满了,线程池执行了拒绝操作)

处理非正常的线程终止

未捕获异常的处理

UncaughtExceptionHandler能检测出某个线程由于未捕获的异常而终止的情况

    public interface UncaughtExceptionHandler {

        //将异常写入日志
        void uncaughtException(Thread t, Throwable e);
    }

在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中.

只有通过execute提交的任务,才能被异常处理器捕获,submit提交的任务,无论是抛出未检查异常还是已检查异常,都将被认为是任务返回状态的一部分.

JVM关闭

关闭钩子

在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过Runnitme.addShoutdownHook注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。在关闭应用程序时,如果有(守护或非守护)线程仍在运行,那么这些线程接下来将与关闭进程并发执行。当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为ture,那么JVM将运行终结器,然后再停止。 JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程”挂起”并且JVM必须被强行关闭,当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。

关闭钩子不应该依赖哪些可能被应用程序或其他关闭钩子关闭的服务。(对所有服务使用同一个关闭钩子,并且在该关闭钩子中执行一系列的关闭操作。避免死锁。)

守护线程

在JVM启动时创建的所有线程中,除了主线程,其他的线程都是守护线程。新线程将继承创建它的线程的守护状态,因此默认情况下,主线程创建的所有线程都是普通线程。

#####普通线程和守护线程的区别?

当一个线程退出时,JVM会检查其他正在运行的线程,如果是守护线程,JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——即不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

守护线程通常不能用来代替应用程序管理程序中各个服务的生命周期。

终结器

垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而保证一些持久化的资源被释放。

避免使用finalize。唯一例外:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。



← okhttp 源码分析 Java并发编程实战-6结构化并发应用程序_执行任务 →

Powered by Hexo, Theme designs by @hpcslag.
Style-Framework Tocas-UI designs by @yamioldmel