学习笔记

Java并发编程实战-4基础知识_基础构建模块
Publish: 2018/7/23   

委托是创建线程安全类的一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。

同步容器类

实现线程安全的方式:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

在同步容器类中,这些复合操作(迭代,跳转,以及条件运算)在没有客户端加锁的情况下仍然是线程安全的,但其他线程并发地修改容器时,它们可能会表现出意外的行为。

    //在客户端加锁的Vector上的复合操作
    public static Object getLast(Vector list) {
        int lastIndex = list.size() - 1;
        return list.get(lastIndex);
    }

    public static void deleteLast(Vector list) {
        int lastIndex = list.size() - 1;
        list.remove(lastIndex);
    }

迭代器和ConcurrentModificationException

在容器的迭代过程中被修改时,会抛出ConcurrentModificationException,通过在所有对共享容器进行迭代地地方都需要加锁避免抛出异常。

实现方式:将计数器的变化与容器关联起来:如果在迭代期间计数器被修改,那么hasNext和next将抛出ConcurrentModificationException。

隐藏迭代器

迭代器会隐藏起来,如标准容器的toString方法。

正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。

并发容器

通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

Java5.0加入了两种新的容器类型:

ConcurrentHashMap

采用了分段锁机制。提供的迭代器不会抛出ConcurrentModificationException,不需要在迭代过程中对容器加锁。提高代码的可伸缩性。

CopyOnWriteArrayList、CopyOnWriteArraySet

替代同步List、Set,提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。

“写入时复制Copy-On-Write”容器的线程安全性在于,只要正确的发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。

仅当迭代操作远远多于修改操作时,才应该使用”写入时复制”容器。如事件通知系统。

阻塞队列和生产者-消费之模式(如Executor)

在基于阻塞队列构建的生产者-消费者设计中,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

BlockingQueue简化了生产者-消费者设计模式的实现过程:

串行线程封闭

对于可变对象,生产者-消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。线程封闭对象至鞥你由单个线程拥有,但可用通过安全地发布该对象来”转移”所有权。在转移所有权后,也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它。

双端队列与工作密取

Java6增加了两种容器类型:

双端队列适用于另一种相关模式:工作密取(每个消费者都有各自的双端队列)。如果一个消费者完成了自己的双端队列中的全部工作,它可以从其他消费者双端队列末尾秘密地获取工作。

阻塞方法与中断方法

当线程阻塞时,通常被挂起,并处于某种阻塞状态(BLOCKED,WAITING或TIMED_WAITING)。

BlockingQueue的put和take等方法会抛出受检查异常InterruptedException。

处理InterruptedException:

同步工具类

闭锁

延迟线程的进度直到其到达终止状态。例如:

CountDownLatch是一种灵活的闭锁实现。

    //在计时测试中使用CountDownLatch来启动和停止线程
    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }

FutureTask

FutureTask也可以用作闭锁。表示一种抽象的可生成结果的计算。通过Callable来实现,相当于一种可生成结果的Runnable,并且处于以下3种状态:等待运行,正在运行,运行完成。”执行完成”表示所有可能结束方式,包括正常结束、由于取消而结束、由于异常而结束。当FutureTask进入完成状态后,会永远停止在这个状态上。

FutureTask.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。FutureTask在Executor框架中表示异步任务。

信号量(Semaphore)

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量。可以用来实现某种资源池(数据库连接池)或者对容器施加边界。

Semaphore在执行操作时,首先获得许可(如果有剩余),并在使用后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release将返回一个许可。计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

栅栏(CyclicBarrier)

阻塞一组线程直到某个事件发生。

栅栏与闭锁的区别:

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集。
Exchanger是一个两方栅栏,各方在栅栏位置上交换数据。

数据交换时机:

构建高效且可伸缩的结果缓冲

    @ThreadSafe
    public class Memoizer implements Computable{
        //使用ConcurrentHashMap代替HashMap,避免对compute方法进行同步时带来的串行性
        private final ConcurrentHashMap> cache=new ConcurrentHashMap<>();
        private final Computable c;

        public  Memoizer(Computable c){this.c=c;}

        public V compute(final A arg)throws InterruptedException{
            while (true){
                /*使用FutureTask避免可能会出现的重复计算。
                  如果有结果可用,FutureTask.get立刻返回结果,
                  否则会一直阻塞,直到结果计算出来再将其返回。
                 */
                Future f=cache.get(arg);
                if (f==null){
                    Callable eval=new Callable() {
                        @Override
                        public V call() throws Exception {
                            return c.compute(arg);
                        }
                    };
                    FutureTask ft=new FutureTask(eval);
                    //若没有则添加,确保原子性
                    f=cache.putIfAbsent(arg,ft);
                    if (f==null){f=ft;ft.run();}
                }
                try{
                    return f.get();
                } catch (CancellationException e) {
                    e.printStackTrace();
                }catch (ExecutionException e){
                    throw launderThrowable(e.getCause());
                }
            }
        }

依然存在的问题:



← Java并发编程实战-0线程简介 Java并发编程实战-10活跃性、性能与测试_性能与可伸缩性 →

Powered by Hexo, Theme designs by @hpcslag.
Style-Framework Tocas-UI designs by @yamioldmel