并发编程-线程池篇(必看👍)
线程池概述汇总
线程池好处:降低资源消耗,提高响应速度,方便统一管理。
七大核心参数:核心线程数、最大线程数、keepAlive Time(工作线程空闲后,存活时间)、TimeUnit、workQueue、ThreadFactory、拒绝策略
线程池工作原理:向线程池添加一个任务,首先看线程池中线程数是否小于核心线程数,若小于则创建一个线程执行任务,反之再判断任务队列满没,若没有满则将任务放到任务队列并等待执行,若满了再判断现在线程池中线程数是否大于最大线程数,若没有大于则创建一个线程执行任务,若大于则使用饱和策略进行处理任务。
任务队列:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue(存一个取一个,阻塞等待)、DelayedWorkQueue
拒绝策略:
1.CallerRunsPolicy 由调用线程处理该任务。
2.AbortPolicy 默认拒绝策略 直接丢弃,抛出异常RejectedExecutionException
3.DiscardPolicy 直接丢弃,不抛出异常
4.DiscardOldestPolicy 丢弃队列最早的任务,然后重新尝试执行任务。
Executors的4种功能线程池:定长、定时、可缓存、单线程化
线程池5大状态:Running(可接收,可处理)、Shutdown(不接收,可处理)、Stop(不接收,不处理,中断正在执行)、Tidying(所有任务终止)、Terminated(线程池彻底终止)。
创建线程池:
《阿⾥巴巴 Java 开发⼿册》中强制线程池不允许使⽤ Executors 去创建,⽽是通过ThreadPoolExecutor 的⽅式,避免使用Executors创建线程池,主要是避免使用其中的默认实现(比如定长缓存池使用链表任务队列,默认长度为Integer.MAX_VALUE,可能堆积大量请求,导致OOM)那么我们可以自己直接调用ThreadPoolExecutor的构造函数自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。规避资源耗尽的⻛险
Executors 返回线程池对象的弊端如下:
- newFixedThreadPool 和 newSingleThreadExecutor : 允许请求的队列⻓度为Integer.MAX_VALUE ,可能堆积⼤量的请求,从⽽导致 OOM。
- newCachedThreadPool和 newScheduledThreadPool : 允许创建的线程数量为Integer.MAX_VALUE ,可能会创建⼤量线程,从⽽导致 OOM。注意二者产生OOM的原因不一样
通过 Executor 框架的⼯具类 Executors 来实现
- 定长线程池(newFixedThreadPool)
- 特点:只有核心线程(corePoolSize=maximumPoolSize),线程数量固定,执行完立即回收,任务队列为链表结构的无界队列(Integer.MAX_VALUE)。
- 应用场景:控制线程最大并发数。
- 单线程化线程池(newSingleThreadExecutor)
- 特点:只有1个核心线程,最大线程数为1,无非核心线程,(corePoolSize=maximumPoolSize=1)执行完立即回收,任务队列为链表结构的无界队列(Integer.MAX_VALUE)。
- 应用场景:不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作、文件操作等。
- 定时线程池(newScheduledThreadPool )
- 特点:核心线程数量固定,非核心线程数量无限,执行完闲置10ms后回收,任务队列为延时阻塞队列。(corePoolSize为传进来参数,最大线程数=Integer.MAX_VALUE,使用DelayedWorkQueue())
- 应用场景:执行定时或周期性的任务。
- 可缓存线程池(newCachedThreadPool)
- 特点:无核心线程,非核心线程数量无限,执行完闲置60s后回收,任务队列为不存储元素的阻塞队列(SynchronousQueue)。(corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAlive Time=60L)
- 应用场景:执行大量、耗时少的任务。
通过ThreadPoolExecutor 构造方法
详细说说线程池的使用场景?结合具体业务场景谈谈?
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
为了最大程度利用CPU的多核性能,利用线程池统一管理各个线程,提高适合的并行运算能力是不可或缺
- 直播场景:在直播应用中,线程池可以用于处理大量并发的用户请求,如发送弹幕、点赞、送礼等操作。线程池可以有效地管理这些并发请求,提高系统的响应速度。同时,线程池也可以用于处理视频流的编解码和传输,通过并行处理提高视频处理的效率。
- 社交场景:在社交应用中,线程池可以用于处理用户的发帖、评论、点赞等请求。这些请求通常涉及到数据库操作,通过线程池,可以将数据库操作和网络请求并行处理,提高系统的吞吐量。此外,线程池还可以用于处理消息推送,提高消息推送的效率。
- 购物场景:在电商应用中,线程池可以用于处理用户的下单、支付、退款等请求。这些请求通常涉及到数据库操作和第三方支付接口的调用,通过线程池,可以将这些操作并行处理,提高系统的响应速度。同时,线程池还可以用于处理库存的更新和价格的计算,提高系统的处理效率。
- 消费场景:在消费应用中,线程池可以用于处理用户的消费请求,如预订餐厅、购买电影票等。这些请求通常涉及到数据库操作和第三方系统的接口调用,通过线程池,可以将这些操作并行处理,提高系统的响应速度。同时,线程池还可以用于处理优惠券的发放和积分的计算,提高系统的处理效率。
- 批量统计处理:我们需要统计一个大型电商网站的所有商品的销售情况。如果我们采用单线程的方式,那么需要按顺序逐一统计每个商品的销售情况,这样的处理效率会非常低。而如果我们采用线程池,那么就可以同时处理多个商品的统计任务,大大提高了处理效率
总的来说,线程池在处理高并发、I/O密集型的操作时,可以有效地提高系统的性能和响应速度,根据实际开发业务场景,可以归纳两大类:
场景1:快速响应用户请求
描述:用户发起的实时请求,服务追求响应时间。比如说用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来,展示给用户。
场景2:快速处理批量任务
描述:离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。
平时遇到过哪些常见的线程池异常?
- RejectedExecutionException:这是一个常见的异常,当任务提交到线程池被拒绝时会抛出这个异常。这通常发生在线程池已经关闭,或者当前任务队列已满且线程数量已达到最大值的情况下。解决这个问题的方法是确保线程池在接收任务时处于开启状态,以及合理设置线程池的大小和任务队列的长度,确保线程池有足够的资源处理任务。
- NullPointerException:如果试图将null作为任务提交给线程池,会抛出这个异常。解决这个问题的方法是检查任务是否为null。
- OutOfMemoryError:如果线程池的大小设置得过大,可能会消耗过多的系统资源,导致内存溢出。解决这个问题的方法是合理设置线程池的大小,避免过多的线程消耗系统资源。
- 线程异常:如果线程在执行任务时发生异常,这个异常会被线程池捕获并记录在Future对象中(如果使用了Future来接收任务的结果)。如果没有检查Future的结果,这个异常可能会被忽略。解决这个问题的方法是在任务执行完毕后,检查Future的结果,处理可能的异常。
- 线程死锁:如果线程池中的线程相互等待对方释放资源,可能会发生死锁。解决这个问题的方法是避免在任务中使用同步代码块,或者在使用同步代码块时,确保获取锁的顺序一致,避免循环等待。
线程池解决的问题是什么?
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
实际开发线程池使用不当会有哪些影响,结合具体案例讲讲?
实际问题及方案思考
线程池使用面临的核心的问题在于:线程池的参数并不好配置,使用不当,轻则影响服务性能,浪费资源。重者可能导致整个链路系统崩溃
Case1:XX页面展示接口大量调用降级
事故描述:XX页面展示接口产生大量调用降级,数量级在几十到上百。
事故原因:该服务展示接口内部逻辑使用线程池做并行计算,由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出RejectedExecutionException,触发接口降级条件,示意图如下:
图14 线程数核心设置过小引发RejectExecutionException
Case2:2XX业务服务不可用S2级故障
事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,大量下游服务调用失败。
事故原因:该服务处理请求内部逻辑使用线程池做资源隔离,由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。示意图如下:
图15 线程池队列长度设置过长、corePoolSize设置过小导致任务执行速度低
线程池任务执行机制?核心运行机制是怎样的?
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其执行流程如下图所示:
图4 任务调度流程
线程池如何维护自身状态,生命周期有哪些?
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。
在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
privatefinal AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:
- 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
- 高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。
- 用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
privatestaticint****runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
privatestaticint****workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
privatestaticint****ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
ThreadPoolExecutor的运行状态有5种,分别为:
其生命周期转换如下入所示:
图3 线程池生命周期
线程数的设置和哪些因素有关?
若参数设置不当,引发的后果也可能不堪设想,具体案例参照上述。
线程池中线程数的设置是一个非常重要的问题,它直接影响到系统的性能和稳定性。线程数的设置主要与以下几个因素有关:
- 任务的性质:如果任务是CPU密集型的,那么线程数应该设置得较小,一般建议设置为CPU核数+1,这样可以避免CPU过度切换线程导致的性能损失。如果任务是I/O密集型的,那么线程数应该设置得较大,因为I/O操作通常会阻塞线程,设置更多的线程可以使CPU得到更充分的利用。
- 系统的硬件资源:线程数的设置也需要考虑系统的硬件资源,包括CPU、内存等。如果系统的硬件资源有限,那么线程数就不能设置得太大,否则可能会导致系统过载。
- 任务的并发度:如果系统需要处理大量的并发任务,那么线程数应该设置得较大,以便能够及时处理这些任务。但是,线程数也不能设置得过大,否则可能会导致线程切换的开销过大,反而降低系统的性能。
- 系统的稳定性要求:如果系统的稳定性要求较高,那么线程数应该设置得较小,以避免过多的线程导致系统过载。如果系统对性能的要求较高,那么线程数可以设置得较大,以提高系统的吞吐量。
总的来说,线程数的设置需要根据任务的性质、系统的硬件资源、任务的并发度和系统的稳定性要求等因素进行综合考虑,以达到最佳的性能和稳定性。
是不是业务QPS越大,线程数就越大越好?
不完全是的。虽然在一定程度上增加线程数可以提高处理并发请求的能力,从而提高系统的QPS(每秒查询率),但是线程数并不能无限制地增加。因为每个线程都会占用一定的系统资源,如CPU、内存等。当线程数增加到一定程度后,线程切换的开销和资源竞争的压力可能会导致系统性能下降,甚至可能导致系统过载和崩溃。
此外,如果任务主要是CPU密集型的,那么增加线程数可能并不能提高系统的QPS,因为CPU的计算能力是有限的,增加线程数只会增加CPU切换线程的开销,反而可能降低系统的性能。
所以,线程数的设置需要根据系统的实际情况进行调整,包括任务的性质(CPU密集型还是I/O密集型)、系统的硬件资源、系统的并发度等因素。一般来说,线程数的设置应该保持在一个合理的范围内,既不能太小,导致无法处理高并发请求,也不能太大,导致资源竞争和线程切换的开销过大。
什么是 io密集型和cpu密集型?参数怎样设置?
CPU密集型和I/O密集型是描述计算机程序运行特性的两个术语,主要是指程序运行过程中对CPU和I/O资源的依赖程度。
- CPU密集型:CPU密集型任务是指程序运行过程中,大部分时间都在进行计算,对CPU的使用率非常高,而对I/O设备的依赖较小。这类任务通常涉及到大量的数学运算、逻辑运算等,例如图像处理、科学计算等。
- I/O密集型:I/O密集型任务是指程序运行过程中,大部分时间都在进行I/O操作,如读写文件、网络通信等,而CPU计算的时间较少。这类任务通常涉及到大量的数据读写操作,例如数据库查询、文件传输等。
在并发编程中,理解CPU密集型和I/O密集型的区别非常重要,因为它们对线程数的设置有着直接的影响。对于CPU密集型任务,由于CPU的计算能力是有限的,所以线程数通常设置为CPU核数+1,以避免过多的线程切换导致的性能损失。而对于I/O密集型任务,由于大部分时间都在等待I/O操作,所以可以设置更多的线程,以提高CPU的利用率。
如何去监控线程池的这些核心参数?
监控线程池的核心参数是保证系统稳定运行的重要手段。Java的java.util.concurrent.ThreadPoolExecutor类提供了一些方法,可以用来监控线程池的状态和性能:
- getPoolSize():返回线程池中的线程数,包括空闲线程和工作线程。
- getActiveCount():返回线程池中当前正在执行任务的线程数。
- getLargestPoolSize():返回线程池曾经达到的最大线程数。
- getTaskCount():返回线程池已经接收的任务数。
- getCompletedTaskCount():返回线程池已经完成的任务数。
- getQueue():返回线程池的任务队列,可以用来查看待处理的任务数。
通过定期收集和分析这些数据,我们可以了解线程池的运行状态,如是否有任务积压、线程是否充分利用等。如果发现线程池的性能不佳或者有异常情况,我们可以根据这些数据调整线程池的配置,如增加或减少线程数、调整任务队列的大小等,以优化线程池的性能和稳定性。
另外,还可以使用一些监控工具,如JMX、Prometheus等,来自动收集和展示这些数据,方便我们进行监控和分析
有哪几种线程池?那你了解 forkjoinpool 吗?
Java中的java.util.concurrent.ExecutorService接口提供了四种线程池:
- FixedThreadPool:固定大小的线程池。创建后,线程池的大小不变。如果所有线程都在工作,新任务会被放在队列中等待。
- CachedThreadPool:缓存线程池。线程池的大小不固定,可以根据需求自动的更改数量。
- SingleThreadExecutor:单线程的线程池。只有一个线程在工作,所有任务按照指定顺序(FIFO,LIFO,优先级)执行。
- ScheduledThreadPool:调度线程池。可以安排命令在给定的延迟后运行,或者定期执行。
另外,还有一种特殊的线程池,叫做ForkJoinPool。
ForkJoinPool 是Java 7提供的一个用于并行执行任务的线程池,它的主要思想是将一个大任务分割(Fork)成若干个小任务(子任务),然后并行执行这些子任务,最后合并(Join)这些子任务的结果得到大任务的结果。
ForkJoinPool的主要优点是利用了多核CPU的优势,可以充分利用CPU资源来提高程序的执行效率。它使用了一种称为工作窃取(work-stealing)的算法,可以把暂时无法执行的任务线程分配给其他的线程,使得所有的线程都能尽可能地保持忙碌,从而提高了线程的利用率。
ForkJoinPool适用于那些可以并行处理的任务,以及那些任务量大但可以分解成许多独立小任务的情况。但是,如果任务之间有很强的依赖性,或者任务的执行时间很短,使用ForkJoinPool可能并不会带来太大的性能提升。
谈谈各种线程池的具体使用场景?
- FixedThreadPool:固定大小的线程池,适用于处理CPU密集型任务,或者需要限制线程数量的场景。比如在电商网站,如果有一个需要大量计算的服务,如推荐算法,可以使用FixedThreadPool来限制线程数量,防止因为线程过多导致CPU过载。
- CachedThreadPool:缓存线程池,适用于处理大量短期异步小任务,或者负载较轻的服务器。比如在社交网站,用户上传图片后,服务器需要生成缩略图,这种任务通常耗时较短,可以使用CachedThreadPool来处理。
- SingleThreadExecutor:单线程的线程池,适用于需要保证顺序执行的场景。比如在电商网站,处理用户下单的服务,需要保证订单的生成、库存的减少、支付的处理等操作按照顺序执行,可以使用SingleThreadExecutor。
- ScheduledThreadPool:调度线程池,适用于需要定时执行任务的场景。比如在消费类网站,需要定时推送优惠信息给用户,或者定时清理过期的优惠券,可以使用ScheduledThreadPool。
- ForkJoinPool:适用于大任务可以拆分成小任务并行处理的场景,且每个小任务处理时间较长,需要较高的处理器利用率。比如在电商网站,需要对大量商品进行价格比较,可以将这个任务拆分成多个小任务,每个小任务处理一部分商品,然后使用ForkJoinPool并行处理。
这些线程池的选择和使用,需要根据实际的业务需求和系统的资源状况来决定。在选择线程池时,需要考虑任务的性质(CPU密集型还是I/O密集型,是否需要定时执行,是否可以并行处理等)、任务的数量和大小、系统的资源状况(如CPU的核数、系统的负载等)等因素。
假如用 Executors 的静态方法创建线程池,有哪几种?
Java的java.util.concurrent.Executors类提供了几种静态方法,用于创建不同类型的线程池:
- newFixedThreadPool(int nThreads):创建一个固定大小的线程池。这个线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中等待的任务。
- newSingleThreadExecutor():创建一个只有一个线程的线程池。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。
- newCachedThreadPool():创建一个可缓存的线程池。如果线程池的当前规模超过处理需求时,将自动回收空闲线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
- newScheduledThreadPool(int corePoolSize):创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
- newWorkStealingPool(int parallelism):创建持有足够线程的线程池来支持给定的并行级别,并通过使用多数处理器,以减少竞争,它在概念上等同于"足够的线程运行,而又不会有过多的线程运行"。此方法是在JDK1.8版本添加的。
这些方法都返回java.util.concurrent.ExecutorService对象,这是一个更高级的线程管理接口,包含了更多的线程控制方法。
CachedThreadPool 和 FixedThreadPool 提交一个任务处理流程有啥区别?
CachedThreadPool 和 FixedThreadPool 都是 java.util.concurrent.Executors 类提供的用于创建线程池的静态方法,它们在提交任务处理流程上有一些区别:
- CachedThreadPool:当提交一个任务时,CachedThreadPool会先检查线程池中是否有空闲的线程,如果有,就直接使用这个线程来执行新提交的任务。如果没有,CachedThreadPool会创建一个新的线程来处理这个任务。CachedThreadPool的线程数量并不固定,可以根据需要自动的更改数量。此外,CachedThreadPool会在一定时间(默认60秒)内自动回收空闲(没有执行任务)的线程。
- FixedThreadPool:当提交一个任务时,FixedThreadPool会先检查线程池中是否有空闲的线程,如果有,就直接使用这个线程来执行新提交的任务。如果没有,新提交的任务会被放入一个任务队列中,等待有线程空闲时再由线程池来执行。FixedThreadPool的线程数量是固定的,不会因为需求的变化而改变。
总的来说,CachedThreadPool更适合处理大量短期异步的小任务或者是负载较轻的服务器,而FixedThreadPool更适合处理CPU密集型的任务,或者是需要限制当前线程数量的应用场景。
手动创建线程池的参数?
在Java中,我们可以通过java.util.concurrent.ThreadPoolExecutor类来手动创建线程池。创建线程池时,我们需要提供以下参数:
- corePoolSize:核心线程数,即线程池中始终存活的线程数。即使线程空闲,也不会回收。
- maximumPoolSize:线程池最大线程数,表示线程池能够容纳同时执行的最大线程数。
- keepAliveTime:非核心线程的闲置超时时间,超过这个时间就会被回收。当线程数大于corePoolSize的时候,这个参数才会起作用。
- unit:keepAliveTime的时间单位,有TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)等。
- workQueue:任务队列,用于存放提交的等待执行任务。常用的有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。
- threadFactory:线程工厂,用于创建新线程并设置线程的一些参数,如线程名字等。
- handler:饱和策略,当任务太多来不及处理时,如何拒绝任务。常见的有AbortPolicy(直接抛出异常)、CallerRunsPolicy(用调用者所在的线程来运行任务)、DiscardOldestPolicy(丢弃队列里最老的未处理任务,然后重新尝试执行任务(重复此过程))、DiscardPolicy(直接丢弃任务,不做任何处理)。
以下是一个创建线程池的示例:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // unit
new ArrayBlockingQueue<>(100), // workQueue
new ThreadFactoryBuilder().setNameFormat("my-thread-pool-%d").build(), // threadFactory
new ThreadPoolExecutor.AbortPolicy() // handler
);
在这个示例中,我们创建了一个线程池,它有5个核心线程,最多可以有10个线程,非核心线程的闲置超时时间是60秒,任务队列的大小是100,线程的名字格式是"my-thread-pool-%d",当任务太多来不及处理时,会直接抛出异常