我们常说的“并发包”指的是java.util.concurrent
这个包,后面简称 J.U.C,里面包含大量多线程和并发编程的工具。J.U.C 包是 JDK 1.5 版本引入,由 Doug Lea 和众多其他大神合力完成,包含大量对并发编程的思考精华,仔细观摩思考他们的设计思路,对于我们学习多线程和并发编程有非常大的帮助。本文所有内容基于 JDK11。
概述
先从两个角度看下并发包的层次结构,一个是继承关系图:
一个是功能结构:
并发包非常庞大,结构复杂,个人能力有限也不可能总结非常清楚,我们先从常见、常用的开始看起吧。
工具类
工具包内主要有这几个工具:计数器CountDownLatch
、回环栅栏(光看名字估计一头雾水)CyclicBarrier
、信号量Semaphore
、创建线程池的Executors
,最后一个交换数据的Exchanger
。本章几个工具的原理都是基于 AQS,关于 AQS 和ReentrantLock
,都有单独的文章解读,就不专门介绍了。
CountDownLatch
用法
一个计数器,常见这样一种场景:多个任务分发个不同的线程去执行,全部执行完毕后回到主线程。当然有不同的实现方式,用CountDownLatch
实现起来就很简单:
1 | int taskNum = 20; |
2 | CountDownLatch countLatch = new CountDownLatch(taskNum); |
3 | |
4 | for (int i = 0; i < taskNum; i++) { |
5 | new Thread(() -> { |
6 | try { |
7 | Thread.sleep(200); //假装做了点什么 |
8 | System.out.println(Thread.currentThread().getName() + "执行完毕"); |
9 | } catch (InterruptedException e) { |
10 | e.printStackTrace(); |
11 | } |
12 | countLatch.countDown(); |
13 | }).start(); |
14 | } |
15 | |
16 | try { |
17 | countLatch.await(); |
18 | System.out.println("全部任务执行完毕"); |
19 | } catch (InterruptedException e) { |
20 | e.printStackTrace(); |
21 | } |
Seamphore
使用
信号量,类似于控制并发的时候用到的“令牌桶”算法,通过控制信号总数,不断释放和回收信号来控制并发数量。假设有这么一个场景,假设我们有五个通道可以执行任务,任务总数是 40,所以同一时刻只能有最多五个线程执行,其余的要等待,因此我们使用Seamphore
来不断方法许可和收回许可:
1 | Semaphore semaphore = new Semaphore(5); |
2 | |
3 | for (int i = 0; i < 40; i++) { |
4 | new Thread(() -> { |
5 | try { |
6 | semaphore.acquire(); |
7 | System.out.println(Thread.currentThread().getName() + "取得许可,开始执行任务"); |
8 | Thread.sleep(new Random().nextInt(2000)); |
9 | System.out.println(Thread.currentThread().getName() + "任务完成,释放许可"); |
10 | semaphore.release(); |
11 | } catch (InterruptedException e) { |
12 | e.printStackTrace(); |
13 | } |
14 | }).start(); |
15 | } |
CyclicBarrier
使用
回环栅栏光看名字不好理解,其实作用很上面的计数器类似,有点类似于 Java 虚拟机中的“安全点”。执行一个任务的时候,所有线程到达栅栏之后停止,等待所有其他线程都到达这个点,然后一起进入下一阶段。与计数器不同的是,CyclicBarrier
可以重复使用,举个栗子:
1 | CyclicBarrier cyclicBarrier = new CyclicBarrier(20); |
2 | |
3 | for (int i = 0; i < 20; i++) { |
4 | new Thread(() -> { |
5 | long timeStamp = System.currentTimeMillis(); |
6 | try { |
7 | Thread.sleep(new Random().nextInt(2000)); |
8 | System.out.println(Thread.currentThread().getName() + ":一阶段任务完成,花费了" + (System.currentTimeMillis() - timeStamp) + "毫秒,开始等待其他线程"); |
9 | cyclicBarrier.await(); |
10 | System.out.println(Thread.currentThread().getName() + ":所有线程执行完成,开始下一阶段"); |
11 | timeStamp = System.currentTimeMillis(); |
12 | Thread.sleep(new Random().nextInt(2000)); |
13 | System.out.println(Thread.currentThread().getName() + ":二阶段任务完成,花费了" + (System.currentTimeMillis() - timeStamp) + "毫秒,开始等待其他线程"); |
14 | cyclicBarrier.await(); |
15 | System.out.println(Thread.currentThread().getName() + ":所有线程任务完成"); |
16 | } catch (InterruptedException | BrokenBarrierException e) { |
17 | e.printStackTrace(); |
18 | } |
19 | }).start(); |
20 | } |
Exchanger
使用
交换器顾名思义,就是用来交换数据,理解和使用起来是最简单的,但是内部实现很精巧复杂。使用很简单,只有两个方法,作用就是两个线程在一个安全点交换数据,产生数据慢的那个会阻塞等待。
1 | Exchanger<Integer> exchanger = new Exchanger<>(); |
2 | |
3 | new Thread(() -> { |
4 | int num = new Random().nextInt(1000); |
5 | System.out.println("交换之前:Thread1:" + num); |
6 | try { |
7 | num = exchanger.exchange(num); |
8 | System.out.println("交换完毕:Thread1:" + num); |
9 | } catch (InterruptedException e) { |
10 | e.printStackTrace(); |
11 | } |
12 | }).start(); |
13 | new Thread(() -> { |
14 | int num = new Random().nextInt(1000); |
15 | System.out.println("交换之前:Thread2:" + num); |
16 | try { |
17 | Thread.sleep(2000); |
18 | num = exchanger.exchange(num); |
19 | System.out.println("交换完毕:Thread2:" + num); |
20 | } catch (InterruptedException e) { |
21 | e.printStackTrace(); |
22 | } |
23 | }).start(); |
线程池
在 J.U.C 包中,创建线程池有两种方式,一种是手动创建,一种是通过Executors
工厂类创建预设的几种线程池。
基础的线程有如下几个:
而Executors
默认提供了六种线程池,但是不推荐在生产环境中直接使用,因为默认的设置对数据量没有进行限制,有可能出现问题。
ThreadPoolExecutor
ThreadPoolExecutor
提供了四个构造方法:
1 | public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) |
2 | public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) |
3 | public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) |
4 | public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) |
可以看出参数都类似,首先要确定初始化线程数量和最大数量,然后是存活时间和时间单位。如果线程池已满,就会把多余的任务放到一个阻塞队列中,你需要定义这么一个队列。如果队列也满了,还需要指定拒绝策略。同时还可以传入一个线程工厂来确定线程生成策略。
一个最简单的线程池就可以这么创建了:
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 20, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); |
这里要注意线程池的扩容策略,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()
或者prestartCoreThread()
方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize
个线程或者一个线程。当线程池中的线程数目达到corePoolSize
后,就会把到达的任务放到缓存队列当中,只有当缓存队列也满了才会出发扩容,创建新的线程,然后当线程池容量扩建到设定的最大值之后,如果还有新的任务,就会触发拒绝策略,返回相应的结果或者默认抛出异常。
keepAliveTime
是线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
。
关于拒绝策略,除了自行实现以外,提供了四种预设的策略:
1 | ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 |
2 | ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 |
3 | ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) |
4 | ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 |
线程中的任务通过execute()
或者submi()
方法来提交,二者的区别是有没有返回值。
execute()
方法实际上是Executor
中声明的方法,在ThreadPoolExecutor
进行了具体的实现,这个方法是ThreadPoolExecutor
的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。submit()
方法是在ExecutorService
中声明的方法,在AbstractExecutorService
就已经有了具体的实现,在ThreadPoolExecutor
中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()
方法不同,它能够返回任务执行的结果,去看submit()
方法的实现,会发现它实际上还是调用的execute()
方法,只不过它利用了Future
来获取任务执行结果。
执行任务还可以通过invokeAll
方法,需要预先创建Callable
的集合,然后放到线程池中执行,invokeAll
会创建线程依次执行任务,主线程阻塞等待结果。但是,需要注意一种特殊情况:
当批量执行的任务数量大于线程池数量+队列数量,这时根据拒绝策略不同,会产生不同的结果,DiscardPolicy,DiscardOldestPolicy 这两种策略会导致线程池锁住。所以要是使用这两种拒绝策略的时候,就要控制任务数量,或者准备足够大的线程池。
ForkJoinPool
ForkJoinPool
主要思想就是分而治之,在能够用分治算法的场景下,ForkJoinPool
有很高的效率。
ForkJoinPool的本质就是两点:
- 如果任务很小:直接计算得出结果
- 如果任务很大
- 拆分成N个子任务
- 调用子任务的
fork()
进行计算 - 调用子任务的
join()
合并结果
我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,ForkJoin 框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask :用于有返回结果的任务。
比如这样一个场景:我们想要计算从 1 到 2000 的数字累加和,假设这个任务很消耗资源,我们打算交给多个线程分开去计算,最后把每个线程的结果加到一起。拆分任务的规则是每个线程计算不超过 20 个数字的累加和,我们就可以创建这么一个对象:
1 | class MyTask extends RecursiveTask<Integer> { |
2 | int start; |
3 | int end; |
4 | |
5 | public MyTask() { |
6 | } |
7 | |
8 | public MyTask(int start, int end) { |
9 | this.start = start; |
10 | this.end = end; |
11 | } |
12 | |
13 | |
14 | protected Integer compute() { |
15 | if (end - start < 20) { |
16 | int count = 0; |
17 | for (int i = start; i <= end; i++) { |
18 | count += i; |
19 | } |
20 | return count; |
21 | } |
22 | int middle = (start + end) / 2; |
23 | MyTask leftTask = new MyTask(start, middle); |
24 | MyTask rightTask = new MyTask(middle + 1, end); |
25 | invokeAll(leftTask, rightTask); |
26 | leftTask.fork(); |
27 | rightTask.fork(); |
28 | return leftTask.join() + rightTask.join(); |
29 | } |
30 | } |
可以看出,当计算量小于 20 的时候,开始计算,然后返回结果。如果计算量大于 20,就拆分任务,然后再创建两个子任务,等待子任务返回。这里的例子只是一个简单类比,实际上ForkJoinPool
更适合计算密集型的任务,像这种小规模的简单累加线程调度的开销比计算本身大多了。
使用ForkJoinPool
有几点需要注意:
- 活跃线程数被控制在 CPU 核心数以内,所以不需要创建过多的线程,线程池内部调度的时候就会做限制。
- 最大线程数被限制在
0x7fff
,即 32767。 - 可以选择队列模式是 FOFO 或者 LIFO。
- 任务队列的初始化容量是 8192,最大容量限制是 67108864 即 64M,超过的话会抛异常。\
相比ThreadPoolExecutor
,ForkJoinPool
的优势是什么?
使用ForkJoinPool
能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用 4 个线程来完成超过 200 万个任务。但是,使用ThreadPoolExecutor
时,是不可能完成的,因为ThreadPoolExecutor
中的Thread
无法选择优先执行子任务,需要完成 200 万个具有父子关系的任务时,也需要 200 万个线程,显然这是不可行的。
ScheduledPoolExecutor
自 JDK1.5 开始,JDK 提供了ScheduledThreadPoolExecutor
类来支持周期性任务的调度。在这之前的实现需要依靠Timer
和TimerTask
或者其它第三方工具来完成。
ScheduledThreadPoolExecutor
继承ThreadPoolExecutor
来重用线程池的功能,它的实现方式如下:
将任务封装成
ScheduledFutureTask
对象,ScheduledFutureTask
基于相对时间,不受系统时间的改变所影响;ScheduledFutureTask
实现了java.lang.Comparable
接口和java.util.concurrent.Delayed
接口,所以有两个重要的方法:compareTo
和getDelay
。compareTo
方法用于比较任务之间的优先级关系,如果距离下次执行的时间间隔较短,则优先级高;getDelay
方法用于返回距离下次任务执行时间的时间间隔;ScheduledThreadPoolExecutor
定义了一个DelayedWorkQueue
,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序;ScheduledFutureTask
继承自FutureTask
,可以通过返回Future对象来获取执行的结果。ScheduledThreadPoolExecutor
的构造函数有以下几个:
1 | // 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。 |
2 | ScheduledThreadPoolExecutor(int corePoolSize) |
3 | // 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。 |
4 | ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) |
5 | // 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。 |
6 | ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) |
7 | // 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。 |
8 | ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) |
ScheduledThreadPoolExecutor
最多支持 3 个参数:核心线程数量,线程工厂,拒绝策略。
为什么没有最大线程数量?由于 ScheduledThreadPoolExecutor
内部是个无界队列,maximumPoolSize
也就没有意思了。
常用的方法有以下几个:
1 | // 创建并执行在给定延迟后启用的一次性操作。 |
2 | ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) |
3 | |
4 | // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。 |
5 | ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) |
6 | |
7 | // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。 |
8 | ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) |
schedule
方法很好理解,是一次性操作,只不过加了个延迟。后面两个方法的区别就是:
cheduleAtFixedRate
是两次执行开始时间的间隔固定,不管单次执行的时长,有可能上次还没执行完毕下次就开始了。scheduleWithFixedDelay
是上次结束和下次开始之间的间隔固定,永远不会出现上次还没执行完毕下次就开始的情况。
需要注意的是,任务的排序是通过 ScheduledFutureTask
的 compareTo
方法排序的,规则是先比较执行时间,如果时间相同,再比较加入时间。
还要注意一点就是:如果任务执行过程中异常了,那么将不会再次重复执行。因为 ScheduledFutureTask
的 run
方法没有做catch
处理。所以程序员需要手动处理,相对于Timer
异常就直接费了调度系统来说,要好很多。
Executors
除了以上三种创建方式,J.U.C 包中还提供了Executors
工厂方法直接创建几种预设好的线程池,包括以下几类:
newCachedThreadPool()
- 缓存型线程池,先查看有没有以前建立的线程,如果有,就 reuse 如果没有,就建一个新的线程加入池中。
- 缓存型线程池通常用于执行一些生存期很短的异步型任务。
- 能 reuse 的线程,必须是
timeout IDLE
内的池中线程,缺省timeout
是 60s,超过这个时长,线程实例将被终止及移出池。
newFixedThreadPool(int)
- 固定大小的线程池。
- 每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。
- 如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
newScheduledThreadPool(int)
- 创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
newSingleThreadScheduledExecutor
也是类似的延迟任务线程池,只不过只是单线程执行。
SingleThreadExecutor()
- 一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
- 如果唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
newWorkStealingPool()
- 一个基于
Fork/Join
模型的线程池,内部通过ForkJoinPool
创建。 - 用法跟
ForkJoinPool
一样,体现“分而治之” 的思想。
- 一个基于
但是,虽然鼓励使用线程池而不是直接新建线程,但是在生产系统中不建议直接使用Executors
创建的线程池,阿里巴巴Java手册中是这么解释的:
并发集合
Map 系列
最知名也最常用ConcurrentHashMap
本文就先不讲了,已经单独分析过,请参考那篇文章。
ConcurrentSkipListMap
Skip List(跳表)是一种可以代替平衡树的数据结构,默认是按照 Key 值升序的。Skip List 让已排序的数据分布在多层链表中,以 0-1 随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。
简单介绍跳表的原理,这里盗用了博客J.U.C 之 ConcurrentSkipListMap上的解释:
我们先看一个简单的链表,如下:
如果我们需要查询9、21、30,则需要比较次数为3 + 6 + 8 = 17 次,那么有没有优化方案呢?有!我们将该链表中的某些元素提炼出来作为一个比较“索引”,如下:
我们先与这些索引进行比较来决定下一个元素是往右还是下走,由于存在“索引”的缘故,导致在检索的时候会大大减少比较的次数。当然元素不是很多,很难体现出优势,当元素足够多的时候,这种索引结构就会大显身手。
当然,实际上 Skip List 的原理要更复杂,就不在这详细叙述了。
总之,跟ConcurrentHashMap
相比,ConcurrentSkipListMap
的key是有序的。有的文章提到ConcurrentSkipListMap
支持更高的并发,线程越多性能越强,但是经过我实际测试,从 10 个线程到 100 个线程,无论put
还是get
,都没有快于ConcurrentHashMap
,只有超过三百个线程之后,put
操作会略微快一点点。
CopyOnWrite 系列
CopyOnWrite,或者叫写入时复制,其实是一种策略,以下是维基百科的说明:
其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的(transparently)。
通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。
J.U.C 包中提供了两个 CopyOnWrite 容器,分别是CopyOnWriteArrayList
和CopyOnWriteArraySet
。这两个的性质很相似,只不过在原本的 List 和 Set 上使用了 COW 的思想,而且CopyOnWriteArraySet
的实现原理就是在内部维护了一个CopyOnWriteArrayList
。
CopyOnWrite 并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景。
CopyOnWriteArrayList
容器的原理也不复杂,只是需要在add
方法中加锁,添加完成之后用新的数组替代原有的数组,而get
操作不需要加锁
1 | public boolean add(E e) { |
2 | synchronized (lock) { |
3 | Object[] es = getArray(); |
4 | int len = es.length; |
5 | es = Arrays.copyOf(es, len + 1); |
6 | es[len] = e; |
7 | setArray(es); |
8 | return true; |
9 | } |
10 | } |
代码很简单,但是使用CopyOnWriteMap需要注意两件事情:
减少扩容开销。根据实际需要,初始化
CopyOnWriteMap
的大小,避免写时CopyOnWriteMap
扩容的开销。使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。
CopyOnWrite 容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。
Queue 系列
从 J.U.C 的导图就可以看出,提供给我们最多的集合类就是队列,主要分为三个方面:ConcurrentLinkedQueue
、BlockingQueue
和Deque
ConcurrentLinkedQueue
ConcurrentLinkedQueue
底层使用单链表存储数据,增加了空的头尾节点,是非阻塞、无界的线程安全队列。它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。由于是无界队列,所以add()
和offer()
方法没有区别,都不会抛出异常。
跟线程不安全的队列比如PriorityQueue
相比,由于数据存储是链表而不是数组,因此没有并发扩容的问题,但是入队出队是通过 CAS 实现的,大并发下有可能有效率问题,而且遍历的时候数据不是准确的。
BlockingQueue
BlockingQueue
是一个接口,定义了阻塞队列的基本方法,J.U.C 包中有多个该接口的实现类。定义的常用方法如下:
BlockingQueue
的特点:
BlockingQueue
可以是限定容量的。它在任意给定时间都可以有一个remainingCapacity
,超出此容量,便无法无阻塞地put
附加元素。没有任何内部容量约束的BlockingQueue
总是报告Integer.MAX_VALUE
的剩余容量。BlockingQueue
实现主要用于生产者-使用者队列,但它另外还支持Collection
接口。因此,举例来说,使用remove(x)
从队列中移除任意一个元素是有可能的。然而,这种操作通常不会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。BlockingQueue
实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的Collection
操作(addAll
、containsAll
、retainAll
和removeAll
)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了c
中的一些元素后,addAll(c)
有可能失败(抛出一个异常)。BlockingQueue
实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 Poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
BlockingQueue
的实现类:
- BlockingDeque:阻塞的双端队列,
LinkedBlockingDeque
是其基于链表的实现类,如果没有设置容量,那么容量将是Int的最大值。LinkedBlockingQueue
可以同时有两个线程在两端执行操作,这点与LinkedBlockingQeque
不同。 - TransferQueue:基于队列扩展的一种有趣的生产-消费者模型,其实现类是
BlockingTransferQueue
,能够实现元素在线程之间的传递。 - SynchronousQueue:跟
TransferQueue
很类似,线程 A 通过put
方法存入数据到队列中,如果没有别的线程通过take
方法去获取这个数据,那线程 A 进入阻塞状态;当有别的线程获取了这个值之后,线程 A 就恢复执行。这个特点跟TransferQueue很像,Doug Lea说从功能角度来讲,LinkedTransferQueue
实际上是ConcurrentLinkedQueue
、SynchronousQueue
(公平模式)和LinkedBlockingQueue
的超集。而且LinkedTransferQueue
更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。所以SynchronousQueue
的使用场景就很少了。 - PriorityBlokcingQueue:可以理解成
PriorityQueue
的线程安全版本,基于优先堆的一个无界队列。不允许 null 值,不允许不可比较的对象。 - DelayQueue:一个有趣的延迟队列,它的特殊之处在于队列的元素必须实现
Delayed
接口,该接口需要实现compareTo
和getDelay
方法。这个在下面用法里再详细介绍。 - ArrayBlockingQueue:基于数组的有界阻塞队列,必须指定大小。
- LinkedBlockingQueue:基于链表的有界阻塞队列,可以不指定队列大小,默认是
Integer.MAX_VALUE
ArrayBlockingQueue 和 LinkedBlockingQueue
由于这两个类的相关性,就放在一起说。同样是阻塞的有界队列,使用方法也基本一致,就着重说明两者间的区别:
队列中锁的实现不同
ArrayBlockingQueue
实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁,但是两个条件LinkedBlockingQueue
实现的队列中的锁是分离的,即生产用的是putLock
,消费是takeLock
在生产或消费时操作不同
ArrayBlockingQueue
实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的LinkedBlockingQueue
实现的队列中在生产和消费的时候,需要把枚举对象转换为Node<E>
进行插入或移除,会影响性能队列大小初始化方式不同
ArrayBlockingQueue
实现的队列中必须指定队列的大小LinkedBlockingQueue
实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE
为了对两个队列的效率有个更直观的认识,我进行了一个简单的测试,对一千万的数据量进行操作,首先是单线程存,然后单线程取:
1 | public void blockingQueueBench(BlockingQueue<Integer> queue) { |
2 | int count = 1000 * 10000; |
3 | long timestamp = System.currentTimeMillis(); |
4 | for (int i = 0; i < count; i++) { |
5 | try { |
6 | queue.put(i); |
7 | } catch (InterruptedException e) { |
8 | e.printStackTrace(); |
9 | } |
10 | } |
11 | System.out.println(System.currentTimeMillis() - timestamp); |
12 | timestamp = System.currentTimeMillis(); |
13 | for (int i = 0; i < count; i++) { |
14 | try { |
15 | queue.take(); |
16 | } catch (InterruptedException e) { |
17 | e.printStackTrace(); |
18 | } |
19 | } |
20 | System.out.println(System.currentTimeMillis() - timestamp); |
21 | } |
然后分别对比两个队列存和取的效率:
1 | new QueueTest().blockingQueueBench(new LinkedBlockingQueue<>(10000000)); |
2 | //结果存1909毫秒,取196毫秒 |
3 | new QueueTest().blockingQueueBench(new ArrayBlockingQueue<>(10000000)); |
4 | //结果存237毫秒,取135毫秒 |
然后是两个线程,一个存一个取:
1 | public void blockingQueueBench(BlockingQueue<Integer> queue) { |
2 | int count = 1000 * 10000; |
3 | long timestamp = System.currentTimeMillis(); |
4 | for (int i = 0; i < count; i++) { |
5 | try { |
6 | queue.put(i); |
7 | } catch (InterruptedException e) { |
8 | e.printStackTrace(); |
9 | } |
10 | } |
11 | Thread thread = new Thread(() -> { |
12 | for (int i = 0; i < count; i++) { |
13 | try { |
14 | queue.take(); |
15 | } catch (InterruptedException e) { |
16 | e.printStackTrace(); |
17 | } |
18 | } |
19 | }); |
20 | thread.start(); |
21 | try { |
22 | thread.join(); |
23 | } catch (InterruptedException e) { |
24 | e.printStackTrace(); |
25 | } |
26 | System.out.println(System.currentTimeMillis() - timestamp); |
27 | } |
然后看存取都完成的时间,LinkedBlockingQueue
是 2141 毫秒,ArrayBlockingQueue
是 370 毫秒。单线程的情况下,无论如何ArrayBlockingQueue
都是要快的。
同样的,我测试了 10 线程和 100 线程下的效率,总的数据量也是一千万。也是分为先存后取和同时存取。结果如下
LinkedBlockingQueue | ArrayBlockingQueue | |
---|---|---|
10 线程先存后取 | 存2147ms 取624ms | 存466ms 取339ms |
10 线程同时存取 | 1974ms | 724ms |
100 线程先存后取 | 存2347ms 取1040ms | 存464ms 取306ms |
100 线程同时存取 | 1957ms | 976ms |
代码基于下面这个,稍有改动:
1 | public void blockingQueueMultiBench(BlockingQueue<Integer> queue, int threadNum) { |
2 | int count = 1000 * 10000; |
3 | long timestamp = System.currentTimeMillis(); |
4 | CountDownLatch latch = new CountDownLatch(threadNum); |
5 | |
6 | for (int i = 0; i < threadNum; i++) { |
7 | new Thread(() -> { |
8 | for (int j = 0; j < count / threadNum; j++) { |
9 | try { |
10 | queue.put(j); |
11 | } catch (InterruptedException e) { |
12 | e.printStackTrace(); |
13 | } |
14 | } |
15 | latch.countDown(); |
16 | }).start(); |
17 | } |
18 | try { |
19 | latch.await(); |
20 | } catch (InterruptedException e) { |
21 | e.printStackTrace(); |
22 | } |
23 | System.out.println(System.currentTimeMillis() - timestamp); |
24 | CountDownLatch latch2 = new CountDownLatch(threadNum); |
25 | |
26 | timestamp = System.currentTimeMillis(); |
27 | for (int i = 0; i < threadNum; i++) { |
28 | new Thread(() -> { |
29 | for (int j = 0; j < count / threadNum; j++) { |
30 | try { |
31 | queue.take(); |
32 | } catch (InterruptedException e) { |
33 | e.printStackTrace(); |
34 | } |
35 | } |
36 | latch2.countDown(); |
37 | }).start(); |
38 | } |
39 | try { |
40 | |
41 | latch2.await(); |
42 | } catch (InterruptedException e) { |
43 | e.printStackTrace(); |
44 | } |
45 | System.out.println(System.currentTimeMillis() - timestamp); |
46 | } |
可以看出,对于效率来讲大多数时候用ArrayBlockingQueue
比较合适。
BlockingDeque
就跟 Queue 与 Deque 的区别一样,BlockingDeque
只是单端队列变成了双端队列,添加了在两端存取元素的方法。比如Quque
原本的offer、poll、put、take
等方法,都扩展了offerFirst、offerLast
等等在对首位元素分别进行操作的方法。内部使用ReentrantLock
保证了线程安全,其他没有什么特殊之处,就不详细讲了。
TransferQueue 和 SynchronousQueue
这两个也是性质非常相似的队列,用法也基本一样,主要用于不同线程一对一的传递数据。Doug Lea说从功能角度来讲,LinkedTransferQueue
实际上是ConcurrentLinkedQueue
、SynchronousQueue
(公平模式)和LinkedBlockingQueue
的超集。而且LinkedTransferQueue
更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。所以我们就尽量使用LinkedTransferQueue
吧。
TransferQueue
的transfer()
方法是这样,一次只能传递一个东西过去,如果上次穿的没有被消费掉,新的方法就就会阻塞。同样的,SynchronousQueue
的put()
方法也是一样。
1 | public void testTransferQueue() { |
2 | LinkedTransferQueue<Integer> transferQueue = new LinkedTransferQueue<>(); |
3 | new Thread(() -> { |
4 | try { |
5 | while (true) { |
6 | System.out.println(transferQueue.take()); |
7 | Thread.sleep(1000); |
8 | } |
9 | } catch (InterruptedException e) { |
10 | e.printStackTrace(); |
11 | } |
12 | }).start(); |
13 | try { |
14 | transferQueue.transfer(1); |
15 | transferQueue.transfer(2); |
16 | transferQueue.transfer(3); |
17 | } catch (InterruptedException e) { |
18 | e.printStackTrace(); |
19 | } |
20 | } |
从这个示例中可以看出,虽然三行transfer
方法是连续的,但是在take
方法取走之前的元素之前是阻塞的。另外还有个有趣的地方,由于内部没有维护容器,所以LinkedTransferQueue
的size()
方法和迭代器都是没有意义的。
PriorityBlockingQueue
是线程安全阻塞版本的PriorityQueue
,对比PriorityQueue
而言在存取删除元素和扩容的时候都有ReentrantLock
锁,所以实现了线程安全。元素为空的时候再取元素会阻塞。有内部结构是基于完全二叉树的小顶堆,所以不允许 null 值,不允许不可比较的对象。
DelayQueue
DelayQueue
是一个延迟队列,想要用它存取元素,必须实现Delayed
接口,可以看出其定义如下:
1 | public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> |
而Delayed
接口继承自Comparable
,所以其实现类需要实现getDelay
和compareTo
方法。
getDelay
方法是这样的:当返回值小于 0 的时候,该元素才能被取出。我们创建一个实现类:
1 | public class TestDelayed implements Delayed { |
2 | int num; |
3 | long time; |
4 | |
5 | public TestDelayed() { |
6 | } |
7 | |
8 | public TestDelayed(int num, int delay) { |
9 | this.num = num; |
10 | this.time = System.currentTimeMillis() + delay; |
11 | } |
12 | |
13 | |
14 | public long getDelay(TimeUnit unit) { |
15 | return time - System.currentTimeMillis(); |
16 | } |
17 | |
18 | |
19 | public int compareTo(Delayed o) { |
20 | return Long.compare(getDelay(TimeUnit.SECONDS), o.getDelay(TimeUnit.SECONDS)); |
21 | } |
22 | } |
这个类的作用就是在指定的时间之后取回元素,时间单位是毫秒,然后进行测试:
1 | public void testDelayQueue() { |
2 | DelayQueue<TestDelayed> delayQueue = new DelayQueue<>(); |
3 | new Thread(() -> { |
4 | try { |
5 | while (true) { |
6 | int num = delayQueue.take().num; |
7 | System.out.println(System.currentTimeMillis() + ",take:" + num); |
8 | } |
9 | } catch (InterruptedException e) { |
10 | e.printStackTrace(); |
11 | } |
12 | }).start(); |
13 | delayQueue.put(new TestDelayed(1, 1000)); |
14 | delayQueue.put(new TestDelayed(2, 2000)); |
15 | delayQueue.put(new TestDelayed(6, 6000)); |
16 | } |
可以看出,三个元素分别在第一秒、第二秒和第六秒被取出,然后线程阻塞等待取回下一个元素。
锁
并法包中的接口有Lock
、ReadWriteLock
、Condition
、LockSupport
等。
Lock 接口和其实现类ReentrantLock
已经专门讲过,就不赘述。我们先看看读写锁的用法。
ReadWriteLock
该接口只有两个方法,读锁和写锁。也就是说,我们在写文件的时候,可以将读和写分开,分成 2 个锁来分配给线程,从而可以做到读和读互不影响,读和写互斥,写和写互斥,提高读写文件的效率。该接口也有一个实现类ReentrantReadWriteLock
,下面我们就来学习下这个类。
读写锁的用法很简单,以ReentrantReadWriteLock
为例,使用lock.writeLock()
和lock.readLock()
分别创建锁对象。我们编写一个测试类来验证读写锁的互斥逻辑,先写两个方法,线程休眠来模拟耗时的读写操作:
1 | private void read() { |
2 | System.out.println(Thread.currentThread().getName() + "正在进行读操作"); |
3 | try { |
4 | Thread.sleep(1000); |
5 | } catch (InterruptedException e) { |
6 | e.printStackTrace(); |
7 | } |
8 | System.out.println(Thread.currentThread().getName() + "读操作完毕"); |
9 | } |
10 | |
11 | private void write() { |
12 | System.out.println(Thread.currentThread().getName() + "正在进行写操作"); |
13 | try { |
14 | Thread.sleep(1000); |
15 | } catch (InterruptedException e) { |
16 | e.printStackTrace(); |
17 | } |
18 | System.out.println(Thread.currentThread().getName() + "写操作完毕"); |
19 | } |
然后分别创建读写锁,然后进行 5 线程同时读、同时写、同时读写等等操作:
1 | ReadWriteLock lock = new ReentrantReadWriteLock(); |
2 | |
3 | Lock writeLock = lock.writeLock(); |
4 | Lock readLock = lock.readLock(); |
5 | for (int i = 0; i < 5; i++) { |
6 | new Thread(() -> { |
7 | readLock.lock(); |
8 | lockTest.read(); |
9 | readLock.unlock(); |
10 | }).start(); |
11 | new Thread(() -> { |
12 | writeLock.lock(); |
13 | lockTest.write(); |
14 | writeLock.unlock(); |
15 | }).start(); |
16 | } |
可以看出,读锁加锁之后,可以同时进行读操作,但是写锁加锁之后不能同时写。而读写互相之间也是互斥的,不能在读的同时写。
Condition
Condition
是在 Java 1.5 中才出现的,它用来替代传统的wait()
、notify()
实现线程间的协作,相比使用Object
的wait()
、notify()
,使用Condition
的await()
、signal()
这种方式实现线间协作更加安全和高效。因此通常来说比较推荐使用Condition
,阻塞队列实际上是使用了Condition
来模拟线程间协作。
Condition
是与Lock
绑定的,所以就有Lock
的公平性特性:如果是公平锁,线程为按照 FIFO 的顺序从Condition.await
中释放,如果是非公平锁,那么后续的锁竞争就不保证 FIFO 顺序了。
调用Condition
的await()
和signal()
方法,都必须在持有锁,就是说必须在lock.lock()
和lock.unlock
之间才可以使用。
一个简单的示例:
1 | Lock lock = new ReentrantLock(); |
2 | Condition condition1 = lock.newCondition(); |
3 | |
4 | new Thread(() -> { |
5 | try { |
6 | lock.lock(); |
7 | condition1.await(); |
8 | lock.unlock(); |
9 | System.out.println(Thread.currentThread().getName() + "被唤醒"); |
10 | } catch (InterruptedException e) { |
11 | e.printStackTrace(); |
12 | } |
13 | }).start(); |
14 | |
15 | new Thread(() -> { |
16 | try { |
17 | Thread.sleep(2000); |
18 | System.out.println(Thread.currentThread().getName() + "开始唤醒"); |
19 | } catch (InterruptedException e) { |
20 | e.printStackTrace(); |
21 | } |
22 | lock.lock(); |
23 | condition1.signalAll(); |
24 | lock.unlock(); |
25 | }).start(); |
LockSupport
在之前介绍 AQS 的底层实现,已经在介绍 Java 中的 Lock 时,比如ReentrantLock
,ReentReadWriteLocks
,已经在介绍线程间等待/通知机制使用的 Condition 时都会调用LockSupport.park()
方法和LockSupport.unpark()
方法。
LockSupport 主要有以下几个方法:
void park()
:阻塞当前线程,如果调用unpark
方法或者当前线程被中断,从能从park()
方法中返回void park(Object blocker)
:功能同方法 1,入参增加一个Object
对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;void parkNanos(long nanos)
:阻塞当前线程,最长不超过nanos
纳秒,增加了超时返回的特性;void parkNanos(Object blocker, long nanos)
:功能同方法 3,入参增加一个 Object 对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;void parkUntil(long deadline)
:阻塞当前线程, 直到 deadline;void parkUntil(Object blocker, long deadline
:功能同方法 5,入参增加一个Object
对象,用来记录导致线程阻塞的阻塞对象,方便进行问题排查;Object getBlocker(Thread t)
:获取线程park
入参的对象。
注意,LockSupport 是不可重入的,如果一个线程连续 2 次调用LockSupport.park()
,那么该线程一定会一直阻塞下去。
LockSupport 很类似于二元信号量(只有 1 个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可。
比如我们写一个最简单的例子:
1 | LockSupport.park(); |
2 | System.out.println("xxx"); |
线程会被阻塞,因为许可默认是被占用的,相当于许可证为 0,线程阻塞等待直到许可证为 1。
而我们可以先unpark
,释放一个许可,相当于把许可证添加为 1:
1 | Thread thread = Thread.currentThread(); |
2 | LockSupport.unpark(thread); |
3 | LockSupport.park();// 获取许可 |
4 | System.out.println("xxx"); |
这次就不阻塞,正常执行了。
原子类
J.U.C 包中的原子类可以分为五类:
- 基本类型:AtomicBoolean、AtomicInteger、AtomicLong
- 引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference
- 数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
- 对象的属性:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
- JDK1.8新增:DoubleAccumulator、LongAccumulator、DoubleAdder、LongAdder
AtomicBoolean
、AtomicInteger
、AtomicLong
和AtomicReference
的实例各自提供对相应类型单个变量的原子方式访问和更新功能。例如AtomicBoolean
提供对 int 类型单个变量的原子方式访问和更新功能。每个类也为该类型提供适当的实用工具方法。例如,类AtomicLong
和AtomicInteger
提供了原子增量方法,可以用于生成序列号。
AtomicStampedRerence
维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。AtomicMarkableReference
维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicIntegerArray
、AtomicLongArray
和AtomicReferenceArray
类进一步扩展了原子操作,对这些类型的数组提供了支持。例如AtomicIntegerArray
是可以用原子方式更新其元素的 int 数组。
AtomicReferenceFieldUpdater
、AtomicIntegerFieldUpdater
和AtomicLongFieldUpdater
是基于反射的实用工具,可以提供对关联字段类型的访问。例如AtomicIntegerFieldUpdater
可以对指定类的指定volatile int
字段进行原子更新。
DoubleAccumulator
、LongAccumulator
、DoubleAdder
、LongAdder
是 JDK1.8 新增的部分,是对AtomicLong
等类的改进。比如LongAccumulator
与LongAdder
在高并发环境下比AtomicLong
更高效。
参考文章: