Mars‘s docMars‘s doc
🏠主页
  • 🍻Activiti

    • 01-核心API
    • 02-监听
    • 03-数据库表介绍
    • 04-常见问题
  • 📊JasperReport

    • 01-JaspersoftStudio工具使用
    • 02-JasperReport集成
    • 03-JasperReport常见问题
  • 🎬JavaScript

    • 01-Node
    • 02-VuePress
    • 03-Vue组件高度宽度
    • 04-显示器和屏幕高度
    • 05-页面地址
    • 06-归纳总结
  • 🍵Java

    • 01-Java8特性
    • 02-多线程
    • 03-Jar包
    • 04-Util
    • 05-validation注解
    • 06-反编译
    • 07-try-with-resource
    • 08-ThreadLocal内存泄漏
    • 09-Jvm
    • 10-Excel
    • 11-Lombook
    • 12-条件注解
    • 13-WebMvcConfigurationSupport
    • 14-WebMvcConfigurer
    • 15-分布式锁
    • 16-Caffeine
    • 17-DynamicDatasource
    • 18-MybatisPlus
    • 19-Swagger
    • 20-BeanPostProcessor
    • 21-Bean初始化
    • 22-ConfigurableApplicationContext
    • 23-常用注解
    • 24-ApplicationListener
    • 25-JavaDoc
    • 26-Spring-Cache
    • 27-StopWatch耗时统计
    • 28-Word
    • 29-Druid
    • 30-OpenFeign
    • 31-反射相关
    • 32-Fastjson
    • 33-Yaml
  • 💻Linux

    • 01-Linux常用命令
    • 02-Linux脚本汇总
    • 03-Yum源
    • 04-Debian
    • 05-Ubuntu
  • 🐋Docker

    • 01-Docker常用命令
    • 02-Dockerfile
    • 03-Swarm
    • 04-Stack
    • 05-Docker常见问题
    • 06-DockerCompose
    • 07-Docker应用用汇总
    • 08-Kasm
    • 09-Rustdesk
  • 🌐Nginx

    • 01-Nginx
  • 📈数据库

    • 01-Mysql
    • 02-Clickhouse
    • 03-Doris
    • 04-DRDS
  • 📉Kettle

    • 01-入门
    • 02-js脚本
    • 03-优化
    • 04-连接组件
    • 05-参数
    • 06-工具
    • 07-日志
    • 08-流程组件
    • 09-输入组件
    • 10-输出组件
    • 11-转换组件
    • 12-驱动
  • 🎨Git

    • 01-Git使用
  • 📝Maven

    • 01-Maven使用
    • 02-Maven配置
  • 🎯Jenkins

    • 01-Jenkins部署
    • 02-Jenkisn常见问题
  • 01-设计模式之禅
  • 02-领域驱动设计
  • 03-JavaScript高级程序设计
  • 🍓树莓派

    • 01-RaspBerry
  • 📘Markdown

    • 01-Markdown语法
    • 02-Markdown表情
    • 03-Markdown代码块语言对照
  • 📇其他

    • 01-HTML XML 转义
    • 02-GitHub
    • 03-Idea
    • 04-Nmon
    • 05-Windows
    • 06-WinSw
GitHub
🏠主页
  • 🍻Activiti

    • 01-核心API
    • 02-监听
    • 03-数据库表介绍
    • 04-常见问题
  • 📊JasperReport

    • 01-JaspersoftStudio工具使用
    • 02-JasperReport集成
    • 03-JasperReport常见问题
  • 🎬JavaScript

    • 01-Node
    • 02-VuePress
    • 03-Vue组件高度宽度
    • 04-显示器和屏幕高度
    • 05-页面地址
    • 06-归纳总结
  • 🍵Java

    • 01-Java8特性
    • 02-多线程
    • 03-Jar包
    • 04-Util
    • 05-validation注解
    • 06-反编译
    • 07-try-with-resource
    • 08-ThreadLocal内存泄漏
    • 09-Jvm
    • 10-Excel
    • 11-Lombook
    • 12-条件注解
    • 13-WebMvcConfigurationSupport
    • 14-WebMvcConfigurer
    • 15-分布式锁
    • 16-Caffeine
    • 17-DynamicDatasource
    • 18-MybatisPlus
    • 19-Swagger
    • 20-BeanPostProcessor
    • 21-Bean初始化
    • 22-ConfigurableApplicationContext
    • 23-常用注解
    • 24-ApplicationListener
    • 25-JavaDoc
    • 26-Spring-Cache
    • 27-StopWatch耗时统计
    • 28-Word
    • 29-Druid
    • 30-OpenFeign
    • 31-反射相关
    • 32-Fastjson
    • 33-Yaml
  • 💻Linux

    • 01-Linux常用命令
    • 02-Linux脚本汇总
    • 03-Yum源
    • 04-Debian
    • 05-Ubuntu
  • 🐋Docker

    • 01-Docker常用命令
    • 02-Dockerfile
    • 03-Swarm
    • 04-Stack
    • 05-Docker常见问题
    • 06-DockerCompose
    • 07-Docker应用用汇总
    • 08-Kasm
    • 09-Rustdesk
  • 🌐Nginx

    • 01-Nginx
  • 📈数据库

    • 01-Mysql
    • 02-Clickhouse
    • 03-Doris
    • 04-DRDS
  • 📉Kettle

    • 01-入门
    • 02-js脚本
    • 03-优化
    • 04-连接组件
    • 05-参数
    • 06-工具
    • 07-日志
    • 08-流程组件
    • 09-输入组件
    • 10-输出组件
    • 11-转换组件
    • 12-驱动
  • 🎨Git

    • 01-Git使用
  • 📝Maven

    • 01-Maven使用
    • 02-Maven配置
  • 🎯Jenkins

    • 01-Jenkins部署
    • 02-Jenkisn常见问题
  • 01-设计模式之禅
  • 02-领域驱动设计
  • 03-JavaScript高级程序设计
  • 🍓树莓派

    • 01-RaspBerry
  • 📘Markdown

    • 01-Markdown语法
    • 02-Markdown表情
    • 03-Markdown代码块语言对照
  • 📇其他

    • 01-HTML XML 转义
    • 02-GitHub
    • 03-Idea
    • 04-Nmon
    • 05-Windows
    • 06-WinSw
GitHub
  • 🏫技术相关

    • 🍻Activiti

      • 01-核心API
      • 02-监听
      • 03-数据库表介绍
      • 04-常见问题
    • 📊JasperReport

      • 01-JaspersoftStudio工具使用
      • 02-JasperReport集成
      • 03-JasperReport常见问题
    • 🎬JavaScript

      • 01-Node
      • 02-VuePress
      • 03-Vue组件高度宽度
      • 04-显示器和屏幕高度
      • 05-页面地址
      • 06-归纳总结
    • 🍵Java

      • 01-Java8特性
      • 02-多线程
      • 03-Jar包
      • 04-Util
      • 05-validation注解
      • 06-反编译
      • 07-try-with-resource
      • 08-ThreadLocal内存泄漏
      • 09-Jvm
      • 10-Excel
      • 11-Lombook
      • 12-条件注解
      • 13-WebMvcConfigurationSupport
      • 14-WebMvcConfigurer
      • 15-分布式锁
      • 16-Caffeine
      • 17-DynamicDatasource
      • 18-MybatisPlus
      • 19-Swagger
      • 20-BeanPostProcessor
      • 21-Bean初始化
      • 22-ConfigurableApplicationContext
      • 23-常用注解
      • 24-ApplicationListener
      • 25-JavaDoc
      • 26-Spring-Cache
      • 27-StopWatch耗时统计
      • 28-Word
      • 29-Druid
      • 30-OpenFeign
      • 31-反射相关
      • 32-Fastjson
      • 33-Yaml
  • 🏢服务器

    • 💻Linux

      • 01-Linux常用命令
      • 02-Linux脚本汇总
      • 03-Yum源
      • 04-Debian
      • 05-Ubuntu
    • 🐋Docker

      • 01-Docker常用命令
      • 02-Dockerfile
      • 03-Swarm
      • 04-Stack
      • 05-Docker常见问题
      • 06-DockerCompose
      • 07-Docker应用用汇总
      • 08-Kasm
      • 09-Rustdesk
    • 🌐Nginx

      • 01-Nginx
  • 🏩数据相关

    • 📈数据库

      • 01-Mysql
      • 02-Clickhouse
      • 03-Doris
      • 04-DRDS
    • 📉Kettle

      • 01-入门
      • 02-js脚本
      • 03-优化
      • 04-连接组件
      • 05-参数
      • 06-工具
      • 07-日志
      • 08-流程组件
      • 09-输入组件
      • 10-输出组件
      • 11-转换组件
      • 12-驱动
  • 🏬管理工具

    • 🎨Git

      • 01-Git使用
    • 📝Maven

      • 01-Maven使用
      • 02-Maven配置
    • 🎯Jenkins

      • 01-Jenkins部署
      • 02-Jenkisn常见问题
  • 🏯书籍笔记

    • 01-设计模式之禅
    • 02-领域驱动设计
    • 03-JavaScript高级程序设计
  • 🏦其他

    • 🍓树莓派

      • 01-RaspBerry
    • 📘Markdown

      • 01-Markdown语法
      • 02-Markdown表情
      • 03-Markdown代码块语言对照
    • 📇其他

      • 01-HTML XML 转义
      • 02-GitHub
      • 03-Idea
      • 04-Nmon
      • 05-Windows
      • 06-WinSw

多线程

可以理解为线程的并行执行,如下载多个文件,开启多条线程,多个文件同时进行下载,这里是严格意义上的,在同一时刻发生的,并行在时间上是重叠的。

目录

  • 线程池
    • 什么是池化
    • 什么是线程池
    • 使用场景
    • 注意事项
    • 全局线程池
  • Spring @Async 注解实现异步线程/多线程
    • 生效和失效场景
    • 无返回值方法
    • 有返回值方法
    • 多线程实现
  • Jdk 多线程
    • ExecutorService线程池
    • ScheduledExecutorService
    • Future
    • FutureTask
    • CompletionService
    • CompletableFuture
  • 线程安全
    • 线程安全原则
    • Synchronized 锁实现单机线程安全
    • Redis 分布式锁
    • 常见的线程安全集合
  • 总结&注意
  • 附录
    • 阿里规约对线程池的要求

线程池

什么是池化

池化技术 (Pool) 是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁建连的资源开销。我们日常工作中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。

什么是线程池

线程池的原理很简单,类似于操作系统中的缓冲区的概念,它的流程:先启动若干数量的线程,并让这些线程都处于睡眠状态,当客户端有一个新请求时,就会唤醒线程池中的某一个睡眠线程,让它来处理客户端的这个请求,当处理完这个请求后,线程又处于睡眠状态,而不是将线程销毁。

为什么要预先创建若干线程,而不是在需要的时候再创建?

一个系统中的线程相对于其所要处理的任务而言,总是一种非常有限的资源。线程不仅在其执行任务时需要消耗CPU时间和内存等资源,线程对象(Thread 实例)本身以及线程所需的调用栈(Call Stack)也占用内存,并Java中创建一个线程往往意味着JVM会创建相应的依赖于宿主机操作系统的本地线程(Native Thread)。因此,为每个或者每一批任务创建一个线程以对其进行执行,通常是一种奢侈而不现实的事情。比较常见的一种做法是复用一定数量的线程,由这些线程去执行不断产生的任务。绝大多数的 Web 服务器就是采用这种方法例如,Tomcat 服务器复用一定数量的线程用于处理其接收到的请求。

Thread Pool模式的核心思想是使用队列对待处理的任务进行缓存,并复用一定数量的工作者线程去取队列中的任务进行执行。

Thread Pool模式的本质是使用极其有限的资源去处理相对无限的任务。这好比一个生意兴降的饭店,虽然每天顾客不断,但饭店却不可能因为来一批客人就增加一个服务员。相反,服务员的人数还是那么多,只不过饭店生意好的时候,服务员们比较忙碌,生意不好时,服务员们比较空闲。

优点

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统

使用场景

  1. 异步业务,如后端业务执行耗时大于120s,前端会报响应超时的业务场景。
  2. 批量业务,数据量(业务量)庞大需要并行计算提升性能业务。

注意事项

  1. 批量业务(并行计算)需要注意业务切分,在开启线程前应该切分好各自线程的处理业务的粒度(简单理解为各自线程的职责或“入参”),保证不存在业务交叉问题(业务重复)。如线程间需要操作同一数据需要使用线程锁,多级多线程场景需要使用分布式锁。
  2. 合理使用集合,确保线程间不存在线程安全问题,非必要不加锁,避免性能瓶颈。

全局线程池

@EnableAsync
@Configuration
public class ThreadPoolConfig {

    public static final String LOCAL_EXECUTOR = "localExecutor";

    @Value("${local.thread.queueCapacity:100}")
    private Integer queueCapacity;
    @Value("${local.thread.queueCapacity:60}")
    private Integer keepAliveSeconds;

    @Bean(name = LOCAL_EXECUTOR)
    public Executor threadPoolTaskExecutor() {
        //获取当前机器的核数
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(cpuNum);
        // 设置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(cpuNum * 2);
        // 设置工作队列大小
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        //线程存活时间
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix("localThreadPoolTaskExecutor--");
        // 设置拒绝策略.当工作队列已满,线程数为最大线程数的时候,接收新任务抛出RejectedExecutionException异常
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 初始化线程池
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

线程池处理流程

  1. 当线程数小于核心线程数时,创建线程。

  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

  3. 当线程数大于等于核心线程数,且任务队列已满

    3.1. 若线程数小于最大线程数,创建线程

    3.2. 若线程数等于最大线程数,拒绝任务

线程池调度流程

线程池拒绝策略

  • AbortPolicy:用于被拒绝任务的处理程序,它将抛出RejectedExecutionException
  • CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务。
  • DiscardOldestPolicy:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute。
  • DiscardPolicy:用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

Spring @Async 注解实现异步线程/多线程

将方法标记为异步执行候选项的注解。也可以在类级别使用,在这种情况下,类的所有方法都被视为异步方法。但请注意,@Configuration类中声明的方法不支持@Async。

生效和失效场景

  • 它必须仅应用于公共方法;
  • 自调用(从同一类中调用异步方法)不起作用,即调用异步方法的方法不能与异步方法在同一个类中,因为它绕过代理并直接调用底层方法;
  • 异步方法使用static修饰;
  • 异步类没有使用@Component注解(或其他注解)导致spring无法扫描到异步类;
  • 类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象;
  • 如果使用SpringBoot框架必须在启动类中增加@EnableAsync注解;
  • 在Async 方法上标注@Transactional是没用的。 在Async 方法调用的方法上标注@Transactional 有效。

无返回值方法

@Async(ThreadPoolConfig.LOCAL_EXECUTOR)
public void asyncMethodWithVoidReturnType() {
    System.out.println("Execute method asynchronously. " 
      + Thread.currentThread().getName());
}

有返回值方法

@Async(ThreadPoolConfig.LOCAL_EXECUTOR)
public Future<String> asyncMethodWithReturnType() {
    System.out.println("Execute method asynchronously - " 
      + Thread.currentThread().getName());
    try {
        Thread.sleep(5000);
        return new AsyncResult<String>("hello world !!!!");
    } catch (InterruptedException e) {
        // TODO
    }
    return null;
}

Spring 还提供了一个实现 Future 的 AsyncResult 类。我们可以使用它来跟踪异步方法执行的结果。

public void testAsyncAnnotationForMethodsWithReturnType()
  throws InterruptedException, ExecutionException {
    System.out.println("Invoking an asynchronous method. " 
      + Thread.currentThread().getName());
    Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();

    while (true) {
        if (future.isDone()) {
            System.out.println("Result from asynchronous process - " + future.get());
            break;
        }
        System.out.println("Continue doing something else. ");
        Thread.sleep(1000);
    }
}

多线程实现

for(循环控制){
    @Async标记的方法
}

Jdk 多线程

Jdk 提供的几种多线程框架,主要用于简化异步模式下任务的执行。

ExecutorService线程池

实例化 ExecutorService 的方式有两种:一种是工厂方法,另一种是直接创建。

public class ExecutorServiceTest {
    List<Callable<String>> callableTasks = new ArrayList<>();
    // 工厂方法创建 ExecutorService 实例
    ExecutorService executor = Executors.newFixedThreadPool(10);
    // 直接创建 ExecutorService 的实例
    ExecutorService executorService =
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()
            );

    Runnable runnableTask = () -> {
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    Callable<String> callableTask = () -> {
        TimeUnit.MILLISECONDS.sleep(300);
        return "Task's execution ! this Thread:".concat(Thread.currentThread().getName());
    };

    private void buildCallable() {
        callableTasks.add(callableTask);
        callableTasks.add(callableTask);
        callableTasks.add(callableTask);
    }

    /**
     * 该方法返回值为空 ( void )。因此使用该方法没有任何可能获得任务执行结果或检查任务的状态 (是正在运行(running)还是执行完毕(executed))
     */
    @Test
    public void testExecute() {
        executorService.execute(runnableTask);
        executorService.shutdown();
    }

    /**
     * submit() 方法会将一个 Callable 或 Runnable 任务提交给 ExecutorService 并返回 Future 类型的结果。
     */
    @Test
    public void testSubmit() {
        try {
            Future<String> future = executorService.submit(callableTask);
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    /**
     * invokeAny() 方法将一组任务分配给 ExecutorService,使每个任务执行,并返回任意一个成功执行的任务的结果 ( 如果成功执行 )
     */
    @Test
    public void testInvokeAny() {
        try {
            buildCallable();
            String result = executorService.invokeAny(callableTasks);
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    /**
     * invokeAll() 方法将一组任务分配给 ExecutorService ,使每个任务执行,并以 Future 类型的对象列表的形式返回所有任务执行的结果。
     */
    @Test
    public void testInvokeAll() {
        try {
            buildCallable();
            List<Future<String>> futures = executorService.invokeAll(callableTasks);
            for (Future<String> future : futures) {
                System.out.println(future.get());
            }
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

ExecutorService关闭

一般情况下,ExecutorService 并不会自动关闭,即使所有任务都执行完毕,或者没有要处理的任务,也不会自动销毁 ExecutorService 。它会一直出于等待状态,等待我们给它分配新的工作。这种机制,在某些情况下是非常有用的,比如,,如果应用程序需要处理不定期出现的任务,或者在编译时不知道这些任务的数量。但另一方面,这也带来了副作用:即使应用程序可能已经到达它的终点,但并不会被停止,因为等待的 ExecutorService 将导致 JVM 继续运行。这样,我们就需要主动关闭 ExecutorService。要正确的关闭 ExecutorService,可以调用实例的 shutdown() 或 shutdownNow() 方法。

  • shutdown(),方法并不会立即销毁 ExecutorService 实例,而是首先让 ExecutorService 停止接受新任务,并在所有正在运行的线程完成当前工作后关闭。
  • sutdownNow,法会尝试立即销毁 ExecutorService 实例,所以并不能保证所有正在运行的线程将同时停止。该方法会返回等待处理的任务列表,由开发人员自行决定如何处理这些任务。

ScheduledExecutorService

用于在一些预定义的延迟之后运行任务和( 或 )定期运行任务。

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// 执行callableTask之前,延迟1s
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
// 允许在固定延迟后定期执行任务:在 100 毫秒的初始延迟后执行任务,之后每 450 毫秒执行相同的任务
Future<String> resultFuture = executorService.scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
// 任务迭代之间必须具有固定长度的延迟:保证当前执行结束与另一个执行结束之间的暂停时间为 150 毫秒
executorService.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

Future

  • 使用线程池提交Callable接口任务,返回Future接口,添加进list,最后遍历FutureList且内部使用while轮询,并发获取结果
public class FutureDemo {

    @Test
    public void TestThredFuture() {
        long start = System.currentTimeMillis();
        //开启多线程
        ExecutorService exs = Executors.newFixedThreadPool(10);
        try {
            // 结果集
            List<String> result = new ArrayList<>();
            // 需要处理的数据集 10个线程,每个线程处理一个str
            List<String> lists = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
            List<Future<String>> futureList = new ArrayList<>();
            // 1.,每个任务返回一个Future入list
            for (String str : lists) {
                futureList.add(exs.submit(new CallableTask(str)));
            }
            long getResultStart = System.currentTimeMillis();
            // 2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
            while (futureList.size() > 0) {
                Iterator<Future<String>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<String> future = iterable.next();
                    // 如果任务完成取结果,否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        result.add(future.get());
                        // 任务完成移除任务
                        iterable.remove();
                    } else {
                        // 避免CPU高速运转,这里休息1毫秒,CPU纳秒级别
                        Thread.sleep(1);
                    }
                }
            }
            System.out.println("result=" + result);
            long end = System.currentTimeMillis();
            System.out.println("总耗时=" + (end - start) + "ms,取结果归集耗时=" + (end - getResultStart) + "ms");
            // CallableTask.call “a”等待3秒,“b”等待5秒,其他等待一秒,最终结果一定是 [x,x,x,x,x,x,x,x,a_pool-x-thread-x, b_pool-x-thread-x]
            Assert.assertTrue(result.get(8).startsWith("a") && result.get(9).startsWith("b"));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    public class CallableTask implements Callable<String> {
        String str;

        public CallableTask(String str) {
            this.str = str;
        }

        @Override
        public String call() throws Exception {
            if (str.equals("a")) {
                Thread.sleep(3000);
            } else if (str.equals("b")) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            str = str.concat("_").concat(Thread.currentThread().getName());
            return str;
        }
    }
}

Future常用接口

  • isDone(),检查已分配的任务是否完成
  • cancel(),取消任务
  • isCancelled(),检查任务是否取消

FutureTask

FutureTask<V>是RunnableFuture<V>的唯一实现,RunnableFuture<V>继承自Runnable, Future<V>。

  • Runnable接口,可开启单个线程执行。
  • Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞获取结果。个人感觉多套一层FutureTask比较鸡肋,不如直接返回Future
public class FutureTaskDemo {

    long st = System.currentTimeMillis();

    @Test
    public void futureTaskSingleTest() {
        String str = "a";
        FutureTask<String> futureTask = new FutureTask<String>(new CallableFutureTaskSinle(str));
        Thread futureTaskThread = new Thread(futureTask);
        futureTaskThread.start();
        try {
            // 假设处理业务
            Thread.sleep(5000);
            // 阻塞获取结果
            String strResult = futureTask.get();
            if (strResult.contains("end")) {
                System.out.println(strResult);
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("全部完成耗时:" + (System.currentTimeMillis() - st));
    }

    @Test
    public void futureTaskTest() {
        List<String> results = new ArrayList<>();
        List<String> lists = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
        // 开启线程
        ExecutorService exs = Executors.newFixedThreadPool(10);
        try {
            // 任务
            List<FutureTask<String>> futureList = lists.stream().map(str -> {
                FutureTask<String> futureTask = new FutureTask<String>(new CallableFutureTask(str));
                exs.submit(futureTask);
                return futureTask;
            }).collect(Collectors.toList());

            while (futureList.size() > 0) {
                Iterator<FutureTask<String>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<String> future = iterable.next();
                    // 如果任务完成取结果,否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        results.add(future.get());
                        // 任务完成移除任务
                        iterable.remove();
                    } else {
                        // 避免CPU高速运转,这里休息1毫秒,CPU纳秒级别
                        Thread.sleep(1);
                    }
                }
            }
            System.out.println("执行完毕,结果:");
            results.forEach(System.out::println);
            Assert.assertTrue(CollUtil.isNotEmpty(results));
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }finally {
            exs.shutdown();
        }
    }

    class CallableFutureTaskSinle implements Callable<String> {

        String str;

        public CallableFutureTaskSinle(String str) {
            this.str = str;
        }
      
        @Override
        public String call() throws Exception {
            Thread.sleep(10000);
            System.out.println("futureTask 耗时:" + (System.currentTimeMillis() - st));
            if ("a".equals(str)) {
                return "this a's thread is end!";
            }
            return str.concat(Thread.currentThread().getName());
        }
    }

    class CallableFutureTask implements Callable<String> {

        String str;

        public CallableFutureTask(String str) {
            this.str = str;
        }
      
        @Override
        public String call() throws Exception {
            if (str.equals("a")) {
                Thread.sleep(3000);
            } else if (str.equals("b")) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            return str.concat("-").concat(Thread.currentThread().getName());
        }
    }
}

CompletionService

内部通过阻塞队列+FutureTask,实现任务先完成可优先获取到,即结果按照完成先后顺序排序。

public class CompletionServiceDemo {
    long st = System.currentTimeMillis();

    /**
     * 线程在这里阻塞等待该任务执行完毕
     */
    @Test
    public void CompletionServiceTest() {
        List<String> lists = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
        ExecutorService exs = Executors.newFixedThreadPool(10);
        List<String> results = new ArrayList<>();
        try {
            CompletionService<String> completionService = new ExecutorCompletionService<>(exs);
            List<Future<String>> futureList;
            // 添加任务
            futureList = lists.stream().map(str -> completionService.submit(new CompletionServiceTask(str))).collect(Collectors.toList());
            // 获取结果
            for (Future<String> future : futureList) {
                //线程在这里阻塞等待该任务执行完毕
                results.add(future.get());
            }
            System.out.println("全部完成耗时:" + (System.currentTimeMillis() - st));
            System.out.println("结果:");
            results.forEach(System.out::println);
            Assert.assertTrue(CollUtil.isNotEmpty(results));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    /**
     * 采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
     */
    @Test
    public void CompletionServiceTest2() {
        List<String> lists = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
        ExecutorService exs = Executors.newFixedThreadPool(10);
        List<String> results = new ArrayList<>();
        try {
            CompletionService<String> completionService = new ExecutorCompletionService<>(exs);
            List<Future<String>> futureList;
            // 添加任务
            futureList = lists.stream().map(str -> completionService.submit(new CompletionServiceTask(str))).collect(Collectors.toList());
            // 获取结果
            for (int i = 0, size = futureList.size(); i < size; i++) {
                //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                results.add(completionService.take().get());
            }
            System.out.println("全部完成耗时:" + (System.currentTimeMillis() - st));
            System.out.println("结果:");
            results.forEach(System.out::println);
            Assert.assertTrue(CollUtil.isNotEmpty(results));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    class CompletionServiceTask implements Callable<String> {
        String str;

        public CompletionServiceTask(String str) {
            this.str = str;
        }

        @Override
        public String call() throws Exception {
            if (str.equals("a")) {
                Thread.sleep(3000);
            } else if (str.equals("b")) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            str = str.concat("_").concat(Thread.currentThread().getName());
            return str;
        }
    }
}

CompletableFuture

CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回<br />supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务

public class CompletableFutureDemo {
    long st = System.currentTimeMillis();

    /**
     * 全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
     */
    @Test
    public void CompletableFutureTest() {
        // 结果集
        List<String> result = new ArrayList<>();
        // 需要处理的数据集 10个线程,每个线程处理一个str
        List<String> lists = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        CompletableFuture[] cfs = lists.stream().map(str -> CompletableFuture.supplyAsync(() -> calc(str), exs)
                                                                             //thenAccept只接受不返回不影响结果
                                                                             .thenApply("final-"::concat)
                                                                             //获取任务完成先后顺序
                                                                             .whenComplete((v, e) -> {
                                                                                 System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date());
                                                                                 result.add(v);
                                                                             })).toArray(CompletableFuture[]::new);
        //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("任务完成先后顺序,结果list2=" + result + ",耗时=" + (System.currentTimeMillis() - st));
        exs.shutdown();
        Assert.assertTrue(CollUtil.isNotEmpty(result));
    }

    /**
     * 循环创建CompletableFuture list,组装返回一个有返回值的CompletableFuture,返回结果get()获取
     */
    @Test
    public void CompletableFutureTest2() {
        // 结果集
        List<String> result;
        // 需要处理的数据集 10个线程,每个线程处理一个str
        List<String> lists = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
        //定长10线程池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        try {
            for (String str : lists) {
                //异步执行
                CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> calc(str), exs)
                                                                    //thenAccept只接受不返回不影响结果
                                                                    .thenApply("final-"::concat)
                                                                    //如需获取任务完成先后顺序,此处代码即可
                                                                    .whenComplete((v, e) -> System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date()));
                futureList.add(future);
            }
            //1.构造一个空CompletableFuture,子任务数为入参任务list size
            CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
            //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
            result = allDoneFuture.thenApply(v -> futureList.stream().map(CompletableFuture::join).collect(Collectors.toList())).get();
            System.out.println("任务完成先后顺序,结果list2=" + result + ",耗时=" + (System.currentTimeMillis() - st));
            Assert.assertTrue(CollUtil.isNotEmpty(result));
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }

    }

    private String calc(String str) {
        try {
            if (str.equals("a")) {
                Thread.sleep(3000);
            } else if (str.equals("b")) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            str = str.concat("_").concat(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return str;
    }
}

线程安全

线程安全的本质是共享资源在不同线程中的竞争。比如日常生活中12306抢票环节,当A、B两人同时想要购买某一列次火车,但系统中余票只有一张时,我们需要解决车票的超卖环节,即A、B两人只能有一人购票成功。

线程安全原则

原子性:对数据的操作不会受其他线程打断,意味着一个线程操作数据过程中不会插入其他线程对数据的操作

可见性:当线程修改了数据的状态时,能够立即被其他线程知晓,即数据修改后会立即写入主内存,后续其他线程读取时就能得知数据的变化

Synchronized 锁实现单机线程安全

synchronized 修饰代码库实现线程安全

public class SynchronizedTest {
    public static void main(String[] args) {
        RunThread runThread = new RunThread();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; i++) {
            executorService.execute(runThread);
        }
        executorService.shutdown();
    }
}

class RunThread implements Runnable {
    private static int count;

    public RunThread() {
        count = 0;
    }

    public void run() {
        synchronized (this) {
            for (int i = 0; i < 5; i++) {
                try {
                    System.out.println("线程名:" + Thread.currentThread().getName() + ":" + (count++));
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

synchronized 也可以修饰方法

public class SynchronizedTest {
    public static void main(String[] args) {
        RunThread runThread = new RunThread();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; i++) {
            executorService.execute(runThread);
        }
        executorService.shutdown();
    }
}

class RunThread implements Runnable {
    private static int count;

    public RunThread() {
        count = 0;
    }

    public void run() {
       doSomething();
    }

    public synchronized void doSomething() {
        for (int i = 0; i < 5; i++) {
            try {
                System.out.println("线程名:" + Thread.currentThread().getName() + ":" + (count++));
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果

线程名:pool-1-thread-1:0
线程名:pool-1-thread-1:1
线程名:pool-1-thread-1:2
线程名:pool-1-thread-1:3
线程名:pool-1-thread-1:4
线程名:pool-1-thread-2:5
线程名:pool-1-thread-2:6
线程名:pool-1-thread-2:7
线程名:pool-1-thread-2:8
线程名:pool-1-thread-2:9

去掉 synchronized 代码块

    public void run() {
//        synchronized (this) {
            for (int i = 0; i < 5; i++) {
                try {
                    System.out.println("线程名:" + Thread.currentThread().getName() + ":" + (count++));
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
//        }
    }

再次执行结果

线程名:pool-1-thread-1:0
线程名:pool-1-thread-2:1
线程名:pool-1-thread-1:2
线程名:pool-1-thread-2:2
线程名:pool-1-thread-2:3
线程名:pool-1-thread-1:3
线程名:pool-1-thread-2:4
线程名:pool-1-thread-1:5
线程名:pool-1-thread-2:6
线程名:pool-1-thread-1:6

显而易见,如果没有 synchronized 代码块包裹,会出现线程不安全的情况,进而导致”超卖“现象的发生。

Redis 分布式锁

多机多线程时可以使用redis分布式锁方案,具体实现参考分布式锁方案

常见的线程安全集合

多线程使用场景下,推荐使用线程安全集合,常见线程安全集合如下:

  • Vector
  • HashTable
  • StringBuffer
  • ConcurrentHashMap
  • CopyOnWriteArrayList
  • Collections.synchronizedList(T o)

总结&注意

  1. 用 ThreadPoolTaskExecutor 自定义线程池,要看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止 OOM;
  2. 如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交,保证不抛弃一个任务;
  3. 最大线程数一般设为 2N+1/2N 最好,N 是 CPU 核数;IO密集型 = 2N(可以测试后自己控制大小,常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等);计算密集型 = N(常出现于线程中:复杂算法)
  4. 核心线程数,如果任务一天跑一次设置为0,因为线程跑完就停掉了,如果是常用线程池,看任务量决定保留一个核心还是几个核心线程数;
  5. 如果要获取任务执行结果,用 CompletionService ,但是获取任务的结果要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大会导致 OOM ,所以最好异步开个线程获取结果。
  6. 单机线程安全也可以通过 ReentrantLock 解决

附录

阿里规约对线程池的要求

  • 【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。

如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

  • 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这

样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

1) FixedThreadPool 和 SingleThreadPool:

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2) CachedThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

根据阿里规约描述内容,我们分析下主要源码,可以看出来,Executors.newFixedThreadPool 和Executors.newSingleThreadExecutor 默认线程池任务队列大小为$$ 2^{31}-1 $$ (即2147483647),一个非常恐怖的数字。如果在业务编程中使用不当,后果就是灾难。

主要源码内容如下:

// Executors class
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// LinkedBlockingQueue<E> class
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

// Integer class
/**
 * A constant holding the maximum value an {@code int} can
 * have, 2<sup>31</sup>-1.
 */
@Native public static final int   MAX_VALUE = 0x7fffffff;
Edit this page
Last Updated:
Contributors: wangxiaoquan
Prev
01-Java8特性
Next
03-Jar包