并发编程

想深入研究并发,请查看 Goetz 等人编著的 《Java 并发编程实战》。还有 Bill Venners 所著的 《深入 Java 虚拟机》 也值得一看,它深入阐述了 JVM 的内部机制,包括线程。

术语

并发(concurrent),并行(parallel),多任务(mulitasking),多进程(muliprocesing)多线程(mulithreading),分布式系统(distributed systems)等术语通常会产生混淆。Brian Goetz 在其 2016 年的演讲“From Concurrent to Parallel”首次指出该问题,并给出了一个较为合理的区分方法:

  • 并发是指如何正确,高效地控制共享资源。

  • 并行是指如何利用更多的资源来产生更快速的响应。

并发通常是指“多个任务在逬行”,而并行则更多是指“多个任务在同时执行”。会发现这样定义有问题:并行说的同样也是多个任务在“进行”。区别在于如何“进行”的。此外定义也确有重合:并行类的程序有时在单 CPU 上也可以运行,而有些并发编程系统也能利用多处理器的优势。

另一种定义,其重点在于“性能究竟是在何处慢下来的”:

  • 并发:同时处理多个任务,即不必等待一个任务完成就能开始处理其他任务。并发解决的是阻塞问题,即一个任务必须要等待非其可控的外部条件满足后才能继续执行,最常见的例子是 I/O,一个任务必须要等待输入才能执行(即被阻塞),类似的场景称为 I/O 密集型问题。

  • 并行:同时在多处执行多个任务。并行解决的是所谓的计算密集型问题,即通过把任务分成多个部分,并在多个处理器上执行,从而提升程序运行的速度。

从该定义可以看出,两者的关键都是“同时处理多个任务”,而并行则额外包括了多处理器分布式处理的概念。更重要的是,两者解决的是不同类型的问题:对于 I/O 密集型问题,并行可能起不到什么明显的作用,因为瓶颈不在于速度,而在于阻塞;而对于计算密集型问题,如果想用并发在单处理器上解决,则多半会徒劳无功。这两种思路都是想在更短的时间内做更多的事,但是它们实现加速的方法是不同的,这取决于不同问题所带来的不同约束的核心矛盾不同。

两个概念易混淆的另一个原因是,许多编程语言使用了相同的机制(线程)来实现并发和并行。

甚至可以更细分定义:

  • 纯并发(purely concurrent):多个任务在单 CPU 上运行。纯粹并发系统会比时序系统更快地生成结果,但是无法利用多处理器进一步提升性能。

  • 并发式并行(concurrent-parallel):应用并发技术,使程序能利用多处理器实现更高的性能。

  • 并行式并发(parallel-concurrent):使用并行编程技术编写的程序,而且即便只有一个处理器也能运行(如 Java 8的 Stream)。

  • 纯并行(purely parallel):只能在多处理器上运行。

支持并发的语言和库似平是解决“抽象泄然”(leaky abstraction)问题的完美可选方案。抽象的目的是把不影响核心思路的具体实现“抽象掉”,让你不受细枝末节的困扰。如果抽象泄露了(内部实现细节),这些细枝末节会不断制造麻烦,变得喧宾夺主,不管你多么努力地隐藏它们。

然而真正意义上的抽象真的存在吗?在编写这类程序时,永远无法忽略底层的系统和工具,甚至还要关注 CPU 缓存执行的细节。

你可能会说,纯函数式(pure functional)语言就不会有这些局限。确实,纯函数式语言能解决很多并发问题,所以如果你面临复杂的高并发场景,你可能会考虑用纯函数式语言来实现该部分功能。

本书给出的并发的新定义:并发是一系列聚焦于如何减少等待并提升性能的技术

该定义的关键意义在于等待,如果没有等待发生,也就没有提速的可能。如果有等待发生,则有很多方法可以优化,具体取决于很多因素,包括系统配置、要解决的问题类型,以及很多其他方面。

并发为速度而生

利用多处理器机器,可以将多任务分发到这些处理器上,这将显著提升吞吐量。不过,并发常常也能提升运行在单处理器上的程序性能。你可能会想,将并发程序运行在单处理器上,实际应该会比让程序全部按顺序执行带来更多开销,因为上下文切换(context switch,在任务之间切换)会导致额外的开销。

给情况带来变化的是阻塞。如果程序中的某个任务由于程序无法控制的某些外部条件(典型的如 I/O)导致无法继续执行,我们便认为该任务或者该线程被阻塞了。如果不使用并发,整个程序都会停下直到外部条件改变。而如果使用并发,即使一个任务阻塞了,程序中的其他任务仍可继续执行,由此程序持续向前运转。事实上,从性能的角度来看,除非有任务可能阻塞,否则没有理由在单处理器上使用并发。

有个提升单处理器系统性能的常见例子:事件驱动编程(event-driven programming)。在用户界面编程领域中的应用设想一个程序执行某项耗时较长的操作,最终导致无法处理用户的输人,从而进入无响应状态。如果没有并发,要想实现一个高响应的用户界面,唯一的方法是对所有的任务都定期检查用户输入。而通过创建一个独立的线程负责响应用户的输人,程序便可以保证一定程度的响应性。

有个简单的实现并发的方法是在操作系统层面使用多进程,这和多线程有所区别。进程是在自有地址空间中运行的自包含程序。多进程听起来很不错,因为操作系统通常会将各个进程相互隔离,因此不会相互干扰,这使得用多进程的方式编程会很容易。对比来看,多线程会共享诸如内存和 I/O 等资源,因此编写多线程程序的基本难点之一便是在各个线程驱动的任务间调度这些资源,以使其同一时刻只能被一个任务访问。

有些人认为多进程是唯一合理的并发方法,但不幸的是,多进程通常存在数量和开销限制,这影响了多进程在并发领域的适用性。

有些编程语言的设计是将各个并发任务隔离起来,这通常叫作函数式语言(functional language),其中的每个函数调用都不会产生副作用(所以也无法干预其他函数),因此可以视为独立的任务来驱动。

并发会带来额外开销,并引人更多复杂性,但也能在程序设计、资源协调、用户便利性等方面带来很大的提升。总体来说,并发可以帮你创造出更松耦合的设计;否则,部分代码会迫使你在一些本可以用并发来正常处理的操作上付出更多的精力。

Java 并发四定律

  1. 不要使用并发。

  2. —切都不可信,一切都很重要。

  3. 能运行不代表没问题。

  4. 你终究要理解并发。

并行流

Java 8 的流很容易并行化,特别是流使用的内部迭代(internal iteration)的方式,即流会控制它们自身的迭代器。而且流会使用一种称为分流器(spliterator)的特殊迭代器,其设计要求是要易于自动分割。这就使得可以通过简单地直接使用 .parallel(),流中的一切就突然都可以作为一组并行的任务来运行了。

例如,寻找素数是很耗时的过程,可以通过给程序增加计时来证明:

注意,这里是给整个程序计时,并不是做微基准测试。而且将数据保存到磁盘上是为了保护程序不受过度优化的影响。如果对结果什么都不做,高级的编译器可能会观察到程序没有意义而终止计算(不大可能发生,但也不绝对如此)。同时注意使用 nio2 库编写文件的简单性。

当注释掉 parallel() 那一行后,耗时大概是使用 parallel() 时的 3 倍。

parallel()并非灵丹妙药

为了探寻流和并行流的不确定性,考虑对一系列递增的数字求和的问题。有很多方法可以实现,这里会通过计时比较它们,但是在计时代码执行的时候,可能会掉入某个基本的并发陷阱。结果可能不完全精准(如 JVM 可能没有“预热”),但这段代码多少会给出一些有用的结论。

首先会实现一个 timeTest() 方法,它接受 LongSupplier 为参数,并测量调用 getAsLong() 的执行耗时,然后将结果和 checkValue 做比较,最后显示结果。

注意,这里必须全部严格使用 long 类型。

第一个版本用了最简单的方法,生成 Stream,并调用 sum()。可以看到流可以在保证内存不溢出的情况下,处理 10 亿大小的 SZ,用 parallel() 来进行基本的 range 操作则会显著提升程序的运行速度。

如果使用 iterate() 方法来生成序列,则速度会大为减慢,大概是因为每生成一个数字,都会调用一次 lambda 表达式。但是如果我们试着并行化该操作,结果不仅会比非并行的版本慢,而且当 SZ 达到 10 亿会出现内存溢出。当然有了 range() 方法,就肯定不会用 iterate(),但如果要生成简单序列之外的东西,肯定还得用 iterate()。使用 parallel() 是相当合理的想法,但会生成这些意外的结果。我们将在后面的部分中探讨内存限制的原因,但可以对流并行算法进行初步观察:

  • 流的并行化将输人的数据拆分成多个片段,这样就可以针对这些独立的数据片段应用各种算法。

  • 数组的切分非常轻量、均匀,并且可以完全掌握切分的大小。

  • 链表则完全没有这些属性,对链表“切分”仅仅意味着会将其拆分成“第一个元素”和“其余的部分”,并没有什么实际用处。

  • 无状态生成器的表现很像数组,以上对 range 的使用就是无状态的。

  • 迭代式生成器的表现很像链表,iterate() 就是一个迭代式生成器。

现在

可以看出,并行化可以提速,甚至比只是用 basicSum() 遍历还要快一点儿。有趣的是,Arrays.parallelPrefix() 似乎实际上减慢了速度。然而所有这些技术,在其他条件下可能会更适用,这就是为什么你无法预先确定该怎么做,而只能“试着来”。

最后,换成装箱后的 Long:

这时可用的内存大约减少一半,各处计算所需的时间都呈爆炸式增长,除了 basicSum(),它只是简单地遍历了一遍数组。意外的是,Arrays.parallelPrefix() 明显比其他所有方法都慢很多。

还有 parallel() 的实现版本,如果将其放在上面的程序中,则会导致漫长的垃圾收集过程,所以单独把它拿了出来:

该方式比未使用 parallel() 的版本要稍微快一点。

处理器的缓存机制是导致耗时增加的主要原因之一。由于 Summing2.java 中用的是基本类型 long,因此数组 la 是一段连续的内存,处理器会更容易预测到对这个数组的使用情况,从而将数组元素保存在缓存中以备后续所需,而访问缓存远远比跳出去访问主存要快。Long parallelPrefix 的计算看起来似乎受到了影响,因为它每次计算都要读取 2 个数组元素,还要将结果写回数组,每次这样的操作都会对 Long 生成一个缓存外的引用。

aL 是 Long 型数组,并不是一段连续的数值数组,而是一段连续的 Long 型对象引用的数组。尽管该数组很可能会保存在缓存中,但其指向的那些对象几乎永远在缓存之外。

得出结论:盲目地应用内建 parallel 操作有时反而能让程序变得特别慢。

parallel() 和 limit() 的作用

parallel() 还会带来进一步的影响。和其他语言一样,流是围绕无限流的模型设计的。如果要处理有限数量的元素,就需要使用集合,以及专门为有限大小的集合所设计的相关算法。如果使用无限流,则需要使用这些算法专门为流优化后的版本。

Java 8 合并了以上这两种情况。举例来说,Collection 没有内建的 map() 操作, Collection 和 Map 中唯一的流式批处理操作是 forEach()。如果你想要执行类似 map()reduce() 的操作,就需要首先将 Collection 转换为 Stream,这样才能有这些操作可用:

Collection 确实支持一些批处理操作,如 removeall(), removeIf()retainAll(),但这些都是破坏性的操作。ConcurrentHashMap 则对 forEach 和 reduce 操作有特别全面的支持。

在许多情况下,只在集合上调用 stream() 或者 parallelStream() 没有问题。但是,有时将 Stream 与 Collection 混合会产生意外。下面是一个有趣的现象:

如果注释掉 .parallel(),则每次会像预期那样,得到:

可以看到,使用了 .parallel(),程序变得不稳定了。

为什么如此简单的程序会出现这种情况呢,此时这里的目的是:并行生成。即一堆线程全都运行在一个生成器上,然后以某种方式选择一组有限的结果?代码看起来很简单,但最终造成了特别混乱的状况。

现在添加一些工具来研究该问题,由于处理的是多线程,因此必须捕获所有的跟踪信息,并保存到并发数据结构中。此处用到了 ConcurrentLinkedDeque:

current 由线程安全的 Atomiclnteger 类来定义,以避免竞态条件的发生(即 current.getAndIncrement(),保证了 AtomicInteger 的值在递增时不会出现竞态条件)。

查看 PSP2.txt,书上写的是 IntGenerator.get() 被调用了 1204 次,然而我在 JDK 21 下是被调用了 17 次:

这些分块的大小似乎是由内部实现决定的(向 limit() 传人不同的参数,可以看到不同的分块大小)。将 parallel()limit() 一起配合使用,可以告诉程序预先选取一组值,以作为流输出。

试着想象一下究竟发生了什么:流抽象了一个可按需生产的无限序列。当你让它以并行方式生成流时,实际是在让所有的线程都尽可能调用 get() 方法。加上 limit() 意味着你想要的是“只需要一些”。基本上,如果你同时使用 parallel()limit(),那就是在请求随机的输出。对当前要解决的需求来说,这可能没什么问题,但你这么做的时候,必须要清楚这一点。此功能只适合高手使用,并不能拿来作为证明“Java 运行有问题”的理由。

对于该问题,如果想要生成 int 流,可以使用 IntStream.range()

为了证明 .parallel() 确实有用,增加了对 peek()(流的一个函数,大部分情况下用来调试)的调用:它会从流中拉取出一个值(来进行想要的操作),但并不会影响在流中传递下去的元素。注意,这会干扰线程的行为,此处只是为了演示,并不是真的在调试。

boxed() 的调用,接收 int 流并将其转换为 Integer 流。

现在我们得到多个线程产生不同的值,但它只产生 10 个请求的值,而不是 16 个(书中为 1024 个)产生 10 个值。

这是否合理?这么小的数据量,上下文切换的开销很可能远远超过任何并行化所带来的速度提升。难以想象,什么时候才有必要用并行生成一个简单的数字序列,除非要使用的对象的创建开销非常大,可能会有用。记住:先运行起来,再优化,并且仅在必要时。parallel()limit() 的搭配使用仅限高手。

实际上,在很多情况下,并行流确实可以毫不费力地更快生成结果。但是如你所见,简单粗暴地在 Stream 操作上使用 parallel() 并不一定是安全的做法。在使用 parallel() 之前,你必须了解并行化可能会对你的操作带来的怎样的影响,究竟是有利还是有害。流意味着你无须完全重写代码,就可以使其并行化工作。但流永远无法取代你的思考,包括对并行化运行原理的理解,以及它是否能帮你达成目标。

创建和运行任务

如果无法通过并行流实现并发,则必须自行创建和运行任务。你将看到运行任务的理想 Java 8 方法是 CompletableFuture,但我们将使用更基本的工具介绍概念。

Java 并发的历史始于非常原始和有问题的机制,并且充满了各种尝试的改进。这些主要归入下一章内容。这里将展示一个规范形式,表示创建和运行任务的最简单,最好的方法。与并发中的所有内容一样,存在各种变体,要么存在于下一章,要么不在本书范围内。

Task 和 Executor

在 Java 的早期版本中,要使用多线程,需要直接创建自己的 Thread 对象,甚至是实现它们的子类以创建自定义的特殊“任务一线程”对象。你需要手动调用构造器,并且自行启动线程。

创建所有这些线程的开销都非常大,所以现在并不鼓励这些手动的方法。。Java 5 专门新增了一些类来处理线程池,不再需要为每个不同的任务类型都创建一个新的 Thread 子类,而只需将任务创建为一个单独的类型,然后传递给某个 ExecutorService 来运行该任务。该 ExecutorService 会为你管理多线程,并且在线程完成任务后不会丢弃,而是会回收它们。

先创建一个几乎什么都不做的任务,它会“睡眼”(挂起执行)100 毫秒,然后显示它的标识符和正在执行任务的 Thread 名称,最后结束:

这是个简单的 Runnable:一个包含 run() 方法的类。它的实现并没实际运行任何任务。通过 Nap 类来实现睡眠:

第二个构造器在时间结束后会显示出相关信息。

调用 Timelnit.MLLSECONDS.sleep(),会获得“当前线程",并让它按参数中传入的时长睡眠,即该线程将被挂起。但这并不意味着底层的处理器停止了。操作系统会切换到某些其他任务,比如运行计算机上的其他窗口。操作系统的任务管理器会定期检查 sleep() 是否到时间了。如果时间到了,线程会被“唤醒",并继续分配给处理器时间。

sleep() 会抛出 InterruptedException 异常,这是 Java 早期设计的产物,通过立刻跳出任务来终止它们。由于这容易产生不稳定的状态,后续便不再鼓励如此终止任务了。然而我们必须捕获各种情况下的该异常,以应对必要或不可控的任务终止。

要执行任务,从最简单的 SingleThreadExecutor 开始:

注意,并不存在 SingleThreadExecutor 类,newSingleThreadExecutor() 是 Executors 中的工厂方法,用于创建特定类型的 Executorservice。

创建了 10 个 NapTask 并将它们提交到了 Executorservice,它们会自行启动,同时,main() 方法会继续处理其他的事。当我调用 exec.shutdown() 时,会告诉 ExecutorService 完成所有已提交的任务,但不再接收任何新任务。不过此时那些任务仍在运行,所以我们在退出 main() 前必须等待这些任务完成。这是通过检查 exec.isTerminated() 的结果来实现的,所有的任务都完成后,该方法会返回 true。

main() 中的线程名字是 main,除此以外只有唯一的一个线程 pool-1-thread-1。同样,从交错的输出也可以看出,这两个线程确实在并发地运行着。

如果仅仅调用 exec.shutdown(),程序会在所有任务完成后立即结束。即后面的 while 循环不是必需的:

一旦调用了 exec.shutdown(),此后再提交新任务,就会抛出 RejectedExecutionException 异常:

exec.shutdown() 的兄弟方法是 exec.shutdownNow(),其作用是不再接受新任务,同时还会尝试通过中断来停止所有正在运行的任务。再次提醒,中断线程容易引发混乱和错误,并不鼓励这么做。

使用更多的线程

使用多线程的主要目的(几乎)总是使任务完成得更快一些,不用局限于单个线程,看看 Javadoc 中的 Executors 部分,你会发现更多的选择。比如,CachedThreadPool:

由于不再使用同一个线程按顺序运行所有任务,每个任务都有自己的线程,因此它更快了,这看起来没有缺点,为什么还要有 SingleThreadExecutor 呢?

看一个更复杂的任务:

每个任务都会使 val 自增 100 次,看起来很简单,使用 CachedThreadPool 试一下:

输出结果很意外,且每次运行结果都不一样。问题在于所有的任务都在试图对单例的 val 进行写操作,它们在相互打架。我们认为这样的类是非线程安全(no thread-safe)的。再来看看用 SingleThreadExecutor 会是怎样的情形:

现在我们每次都得到一致的结果,尽管 InterferingTask 缺乏线程安全性。这是 SingleThreadExecutor 最大的好处一一由于其同时只会运行一项任务,这些任务永远不会互相影响,因此保证了线程安全性。这样的现象称为线程封闭(thread confinement),因为将多个任务运行在单线程上可以限制它们之间的影响。线程封闭限制了提速,但也节省了很多困难的调试和重写工作。

生成结果

因为 InterferingTask 是一个 Runnable,它没有返回值,因此只能使用副作用产生结果,即操纵缓冲值而不是返回结果。副作用是并发编程的主要问题之一,原因正如我们在 CachedThreadPool2.java 中所见。InterferingTask 中的 val 称作可变共享状态(mutable shared state),正是它带来了问题:多个任务同时修改同一个变量会导致所谓的竞态条件。结果由哪个任务抢先得到终点并修改了变量(以及其他各种可能性)而决定。

避免竞态条件的最好方法是避免使用可变共享状态,这里将其称为自私儿童原则(selfish child principle),即什么都不共享。

对于 InterferingTask,如果能消除副作用,并且只返回任务结果就好了。要达到这个目的,我们需要创建一个 Callable,而不是 Runnable:

call() 完全独立地生成结果,并不会存在可变共享状态。

ExecutorService 允许在集合中通过 invokeAll() 来启动所有的 Callable:

只有当所有的任务都完成时,invokeAll() 才会返回由 Future 组成的 List,每个 Future 都对应一个任务。Future 是 Java 5 引入的机制,它允许你提交一个任务,并且不需要等待它完成。此处,我们使用了 ExecutorService.submit()

对尚未完成任务的 Future 调用 get() 方法时,调用会持续阻塞(等待),直到结果可用。

这意味着,在 CachedThreadPool3.java 中,由于在所有任务完成之前,invokeAll() 甚至不会返回,因此显得 Future 有些冗余。然而,此处 Future 并不是用于延迟得到结果,而是为了捕获任何可能发生的异常。

同时注意 CachedThreadPool.java 中提取结果部分的混乱状况。get() 会抛出异常,因此 extractResult() 是在 Stream 内部完成提取的。

由于在调用 get() 时 Future 会阻塞,因此它只是将等待任务完成的问题推迟了。最终,Future 被认为是一个无效的解决办法,现在更推荐 Java 8 的 CompletableFuture,稍后会介绍它。当然,在各种遗留的库中,你还是会遇到 Future。

我们可以用简洁且优雅的方式——并行 Stream 解决这个问题:

这样更易理解,而且只是将 parallel() 插入一个顺序操作中就可以并行运行了。

作为任务的 lambda 与方法引用

有了 lambda 和方法引用,就不再仅限于使用 Runnable 和 Callable。由于 Java 8 通过匹配签名的方式支持 lambda 和方法引用(即支持结构一致性),可以将非 Runnable 或 Callable 类型的参数传递给 ExecutorService:

此处,前两次 submit() 调用可以替代为 execute() 调用。所有的 submit() 调用都会返回 Future,你可以在第二次的两个调用中用它们来提取结果。

终止长时间运行的任务

并发程序通常会运行耗时较长的任务。虽然 Callable 的任务在完成时会返回值,生命周期是有限的,但还是可以运行很久。Runnable 任务有时会被设置为永久运行的后台进程。你可能想要在任务正常结束前提前终止它。

最初的 Java 提供了某种机制(为向后兼容,仍然存在)来中断正在进行的任务,中断机制在阻塞方面存在一些问题。中断任务乱且复杂,因为你必须了解可能发生中断的所有可能状态,以及可能导致的数据丢失。中断被认为是一种反模式,但由于向后兼容设计的残留,我们仍然不得不捕获 InterruptedException 异常。

终止任务最好的方法是设置一个任务会定期检查的标识,由此任务可以通过自己的关 闭流程来优雅地终止。你可以请求任务在合适的时候自行终止,而不是在某一时刻突然拨掉任务的插头。

这样终止任务听起来很简单:设置一个任务可见的 boolean 标识。修改任务,让其定期检查该标识,并优雅地终止。但还是有麻烦的地方——共享可变状态。如果该标识可以被其他任务操作,那么就有可能产生冲突。

解决该问题的方法很多,最常见的是通过 volatile 关键字。我们应该使用更简单的技术,以避免 volatile 所带来的所有不确定性。

Java 5 引入了 Atomic 类,它提供了一组类型,让你可以无须担心并发问题。下面我们引入 AtomicBoolean 标识来告诉任务自行清理并退出:

多个任务可以在同一实例中调用 quit(),但 AtomicBoolean 阻止多个任务同时修改 running,由此保证 quit() 方法是线程安全的。

这里 running 需要定义为 AtomicBoolean,体现了编写 Java 并发的难点之一,如果使用的是普通 boolean,可能无法在执行程序中看到问题,但这样是不安全的。通常,编写线程安全代码的唯一方法就是通过了解事情可能出错的所有细微之处。

下面是测试,启动了许多 QuittableTask 任务,然后关闭:

这里通过 peek() 将 QuittableTask 传给 ExecutorService,再将任务收录到 List 中。

main() 保证只要还有任务在运行,程序就不会退出。任务关闭的顺序和创建的顺序并不一致,即使每个任务是按顺序调用 quit() 方法的。这些独立运行的任务对信号的响应是不可控的。

CompletableFuture

先将 QuittingTasks.java 改为用 CompletableFuture 来实现:

在这里,并没有使用 peek() 来将 QuittableTask 逐个提交给 ExecutorService,而是在 cfutures 的创建过程中将任务传给了 Completablefuture::runAsync,这样就会执行 QuittableTask.run(),并返回 CompletableFuture<Void>。由于 run() 并不会返回任何东西,因此在这里只用了 CompletableFuture 来调用 join(),以等待任务完成。

注意,并不要求用 ExecutorService 来运行任务。这是由 CompletableFuture 管理的(虽然可以选择实现自定义的 ExecutorService)。你也无须调用 shutdown(),除非像这里一样显式调用 join(),否则程序会在第一时间退出,而不会等待任务完成。

基本用法

下面这个类通过静态的 work() 方法对该类对象执行了某些操作:

这是一个没有分支的有限状态机,只是从一条路径的头部移动到尾部。work() 方法使状态机从一个状态移动到下一个状态,并请求了 100 毫秒来执行该 "work"。

还可以利用 CompletableFuture,通过 completedFuture() 方法来包装一个对象:

completedfuture() 创建了一个“已完成”的 CompletableFuture。这种 future 唯一能做的事是 get() 内部对象,乍一看好像并没有什么用处。

注意,CompletableFuture 的类型为它所包含的对象。

一般来说,get() 会阻塞正在等待结果的被调用线程。该阻塞可以通过 InterruptedException 或 ExecutionException 来退出。在这里,由于 Completablefuture 已经完成,因此永远不会发生阻塞,当时就能得到结果。

一旦将 Machina 用 CompletableFuture 包装起来,就可以通过在 CompletableFuture 上增加操作来控制其包含的对象:

thenApply() 用到了一个接收输入并生成输出的 Function。在本例中,work() 这个 Function 返回和输人相同的类型,由此每个返回的 Completablefuture 都仍然是 Machina 类型,但是(类似于 Stream 中的 map())Function 也可以返回不同的类型,这可以从它的返回类型看出来。

可以从中看出 CompletableFuture 的一些本质:当执行某个操作时,它们会自动对其所携带的对象拆开包装,再重新包装。这样就不会陷入混乱的细节,从而可以大幅简化代码的编写和理解。

可以消除中间变量,将多个操作串联起来,就像使用 Stream 那样:

这里增加了一个 Timer,可以看到每一步都增加了 100 毫秒的等待时间,并且还有一些额外的开销。

使用 CompletableFuture 的好处是促使我们什么都不共享。默认情况下,通过 thenApply() 来应用函数并不会产生任何通信,它只是接收参数并返回结果,这就是函数式编程的基础之一,也是适合并发的一个原因。并行流和 CompletableFuture 便是基于这些原则而设计的。只要你决定怎样都不分享任何数据(分享极易发生,甚至是意外发生的),就可以编写出相对安全的并发程序。

操作是通过调用 thenApply() 开始的。本例中,CompletableFuture 的创建过程会等到所有任务都完成后才会完成。虽然这有时会有用,但更多的价值还是在于可以开启所有的任务,然后就可以在任务运行时继续做其他的事情。可以通过在操作最后增加 Async 来实现该效果:

同步调用(即平时使用的那种)意味着“完成工作后返回”,而异步调用则意味着“立即返回,同时在后台继续工作”。现在 cf 的创建过程变快了很多。对 thenApplyAsync() 的每次调用都会立刻返回,这样就可以立即执行下一个调用,整个链式调用序列就会比之前快得多了。

执行速度就是这么快,在没有调用 cf.join() 的情况下,程序在任务完成前就退出了(去掉该行代码,就没有任务完成的输出内容了)。而对 join() 的调用会一直阻塞 main() 线程的执行,直到 cf 操作完成。

这种可以“立刻返回”的异步能力依赖于 Completablefuture 库的某些背后操作。通常来说,该库需要将你请求的操作链保存为一组回调(callback)当第一个后台操作完成并返回后,第二个后台操作必须接收相应的 Machina并开始工作,然后当该操作完成后下一个操作继续,以此类推。但是由于这里并非由程序调用栈控制的普通函数调用序列,其调用顺序会丢失,因此改用回调来存储,即一个记录了函数地址的表格。

程序员将手动操作带来的混乱称为“回调地狱”。通过 Async 调用,CompletableFuture 会为你管理所有的回调。除非你知道系统里有特别的影响因素,否则你通常会使用 Async 调用。

其他操作

查看 Javadoc 中 CompletableFuture 的相关内容,可以看到它有很多方法,但大部分都是各种不同操作的变种。例如,有 thenApply() 和它的变种 thenApplyAsync(),以及 thenApplyAsync() 的另一种形式,它接收参数 Executor 来运行任务(这里不会涉及带 Executor 的版本)。

下面会演示所有的“基本”操作,这些操作既不会涉及两个 CompletableFuture 的合并,也不会涉及异常。首先,应该复用下面这两个工具,以简化代码并增加便利性:

showr() 调用了 CompletableFuture<Integer>get(),并显示了结果,同时对两个可能的异常进行了捕获。voidr()showr() 针对 CompletableFuture<Void> 的实现版本,即针对只在任务完成或失败时存在以用来展示的 CompletableFuture。

下面的 CompletableFuture 仅包装了 Integer 类型。cfi() 则是一个简化的方法,其在一个完整的 CompletableFuture<Integer> 内部包装了一个 int 类型。

mian() 中的测试由 int 值引用。cfi(1) 演示了 showr() 正常工作,cfi(2) 是调用 runAsync() 的例子。Runmable 不会返回任何值,因此结果是个 CompletableFuture<Void>,并且用到了 voidr()

cfi(3) 的 thenRunAsync()runAsync() 似乎完全一样。区别体现在之后的测试:runAsync() 是静态方法,所以你通常不会像在 cfi(2) 中一样调用它,而是会像在 QuittingCompletable.java 中一样调用它。再往后的测试演示了 supplyAsync() 同样也是静态的,但它依赖 Supplier 而不是 Runnable,并且会生成 CompletableFuture<Integer>,而不是 CompletableFuture<Void>

then 系列方法针对已有的 CompletableFuture<Integer> 进行操作。不同于 thenRunAsync(),用于 cfi(4)cfi(5)cfi(6) 的系列 then 方法接收未包装的 Integer 类型作为参数。正如在 voidr() 的用法中可以看到的,thenAcceptAsync() 接收 Consumer 作为参数,所以不会返回结果。thenApplyAsync() 接收 Function 作为参数,因此会返回结果(可以是和参数不同的类型)。thenComposeAsync()thenApplyAsync() 非常像,只是它的 Function 必须返回已在 CompletableFuture 中被包装后的结果。

cfi(7) 演示了 obtrudeValue() 方法,它强制输入一个值作为结果。cfi(8) 使用了 toCompletableFuture() 以从当前的 CompletionStage 生成 CompletableFuture。c.complete(9) 演示了可以通过传入结果来让一个 future 完成执行(而 obtrudeValue() 则可以强制用自己的结果来替换这个结果)。

如果 cancel() 掉 CompletableFuture,它同样会变成“已完成”(done),并且是特殊情况下的完成(completed exceptionally)。

getNow() 方法要么返回 CompletableFuture 的完整值,要么返回 getNow() 的替代参数(如果该 future 尚未完成)。

dependent,依赖项,即正在等待该 CompletableFuture 完成的 CompletableFuture 的预估数量。如果我们将两次对 CompletableFuture 的 thenAplyAsync() 调用连在一起,dependent 的数量仍然还是一个。但是如果我们直接将另一个 thenpplyAsync() 添加到 c,那么就有了两个 dependent:两个连续的调用和一个额外的调用。这说明了一个单独的 CompletionStage 可以在其完成后,基于它的结果 fork 出多个新任务。

合并多个 CompletableFuture

CompletableFuture 中的第二类方法接收两个 CompletableFuture 作为参数,并以多种方式将其合并。一般来说一个 CompletableFuture 会先于另一个执行完成,两者看起来就像在彼此竞争。这些方法使你可以用不同的方式处理结果。

为测试这些方法,先创建一个任务,该任务的参数之一是完成该任务所需的时长,由此可以控制首先完成哪个 CompletableFuture:

make() 中,work() 方法被用于 CompletableFuture。work() 花了 duration 的时间完成执行,然后将字母 W 附加到 id 的后面,以标识该“work”已完成。

现在可以创建多个相互竞争的 CompletableFuture,并通过 CompletableFuture 库中的方法将它们关联起来:

为了便于访问,cfA 和 cfB 都是静态的。init() 方法对这两者进行初始化,其中使用"B"初始化的 cfB 被赋子了更短的延时,因此 cfB 总是会“胜出"。join() 是另一个便利的方法,cfA 和 cfB 分别都调用了 join() 方法,并显示出分界线。

所有这些 "dual" 方法都用了一个 CompletableFuture 作为调用方法的对象(用它调用方法),以及另一个 CompletableFuture 作为第一个参数,然后加上要执行的操作。

通过 showr()voidr() 的用法,可以看到 "run" 和 "accept" 是终结操作,而 "apply" 和 "combine" 则生成新的承担负载(payload-bearing)的 CompletableFuture。

这些方法名都很好理解(self-explanatory),其中 combineAsync() 是个特别有趣的方法,它会等待两个 CompletableFuture 完成再将两者传给 BiFunction,然后 BiFunction 将结果合并(join)到最终 CompletableFuture 的荷载中。

模拟场景应用

下面模拟做蛋糕的过程来示范如何通过 CompletableFuture 将一系列的操作捆绑到一起。第一步是准备配料(ingredient),并将它们混入面糊(batter)中:

每种配料都需要一些时间来准备。allOf() 会等待所有的配料准备完毕后再用一些时间将它们混合后放入面糊中。

下一步是将一份面糊分摊到 4 个平底锅中,然后开始烘焙。成品会以 CompletableFuture 类型的 Stream 形式返回:

最后,创建了一份 Frosting(糖霜),并将其洒在蛋糕上:

异常

与 CompletableFuture 在处理链中包装对象的方式相同,它也会缓冲异常。在处理过程中调用者并不会对此有所感知,这种效果只会在尝试提取结果时体现出来。为演示其机制,先创建一个会在特定条件下抛出异常的类:

通过正整型的 failCount,每次向 work() 方法传递对象,failCount 都会递减。当它等于 0 的时候,work() 会抛出异常。如果直接传入值为 0 的 failCount,则永远不会抛出异常。

在随后的 test() 方法中,work() 被多次应用于 Breakable,如果 failCount 在范围内,则会抛出异常。不过,从 A 到 E 的测试中,你可以从输出看到异常被抛出,但并未显露出来:

可以看到,只有在测试中调用 get() 时,才会看到抛出的异常。

从测试 G 可以看出,可以先检查处理过程中是否有异常抛出,而不必真的抛出该异常。然而,测试 H 告诉我们该异常仍符合“完成”的条件,不论其是否真的成功。

最后一部分演示了可以如何向 CompletableFutuie 插入异常,不论是否出现任何失败。

相比于在合并或获取结果时粗暴地使用 try-catch,我们更倾向于利用 CompletableFuture 所带来的更先进的机制来自动地响应异常。你只需照搬在所有 CompletableFuture 中看到的方式即可:在调用链中插入 CompletableFuture 调用。一共有3个选项: exceptionally(), handle() 以及 whenComplete()

只有在出现异常时,exceptionally() 参数才会运行。它的限制在于 Function 返回值的类型必须和输入相同。将一个正确的对象插回到流,可使 exceptionally() 恢复到工作状态。

handle() 总是会被调用的,而且你必须检查 fail 是否为 true,以确定是否有异常发生。但是 handle() 可生成任意新类型,因此它允许你执行处理,而不是像 exceptionally() 那样只是恢复。

whenComplete()handle() 类似,都必须测试失败情况,但是它的参数是 Consumer,只会使用而不会修改传递中的 result 对象。

流异常

对 CompletableExceptions.java 做一些改动,看看 CompletableFuture 的异常和 Stream 的异常有什么区别:

使用 Completablefuture 时,我们看到了从测试 A 到测试 E 的执行过程。但是使用 Stream 时,甚至直到你应用到终结操作之前,什么都不会发生。 Completablefuture 会执行任务,并捕捉任何异常,以备后续的结果取回。因为 Stream 的机制是在终结操作前不做任何事,所以这两者并不好直接比较。不过 Stream 肯定不会保存异常。

检查型异常

CompletableFuture 和并行流都不支持包含检查型异常的操作。因此,必须在调用操作的时候处理检查型异常,而这会使代码不优雅:

如果像使用 nochecked() 的引用一样,使用 withchecked() 的方法引用,编译器就会在 1 和 2 处报错。所以,你必须写出 lambda 表达式(或者写一个不会抛出异常的包装方法)。

死锁

因为任务会被阻塞,所以一项任务 A 很可能因等待另一项任务 B 而卡住,而任务 B 又在等待另一项任务 C,以此类推,直到整个链条指回到一项正在等待任务 A 的任务。此时形成了一个互相等待的无限循环,谁都动不了。这称为死锁。当两个任务有能力改变它们的状态,也就是说不会被阻塞,但永远不会取得任何有效进展时,也会产生活锁。(死锁是进程彻底停滞,而活锁是进程虽然在运行但无实际进展。)

如果程序看起来运行正常,但是存在潜在的死锁风险。这时,你可能无法发现会导致死锁的迹象,这样隐患就潜伏在程序中,直到它突然发生,一般会暴露给使用者(以一种极难重现的方式)。因此,依靠在程序设计中的谨慎小心来防止死锁发生,是并发系统开发极为重要的一部分。

Edsger Dijkstra 提出的哲学家用餐问题(Dining Philosophers problem),是对死锁的经典诠释。问题设定为五个哲学家坐在圆桌旁,每个哲学家之间放置一根筷子。哲学家的生活由思考和吃饭两种状态交替组成。要吃饭时,哲学家需要同时拿起左边和右边的筷子。当一个哲学家成功拿到两根筷子后,就可以进餐,进餐完毕后放下筷子继续思考。

当其中一位哲学家想要吃东西时,他必须从左右手边各拿起一根筷子。如果坐在旁边的另一位哲学家正在使用他想要的筷子,那么这位哲学家就必须等待,直到那根筷子处于可用状态。

下面 StickHolder(筷子持有者)类将一个 Chopstick(筷子)类保存在一个长度为 1 的 BlockingQueue 中,以进行管理。BlockingQueue 是一种线程安全的集合,专门用于并发程序,如果调用 take() 且队列为空,它就会阻塞(等待)。一旦新的元素被放入队列,阻塞就会被解除,并会返回该元素值:

简单起见,Chopstick 不会由 StickHolder 实际生成,而仅在类中是私有的。如果你调用 pickUp() 且筷子当前不可用,pickUp() 会一直阻塞,直到筷子被其他 Philosopher 通过调用 putDown() 被返回。注意本类中所有的线程安全性都是由 BlockingQueue 来保证的。

每个 Philosopher 都是一个任务,它尝试从左右两边 pickUp() 筷子来吃饭,然后通过 putDown() 来释放这些筷子:

两个 Philosopher 不可能成功地同时 take() 同一根筷子。另外,如果一根筷子已经被一个 Philosopher 拿走,下一个尝试拿走同一根筷子的 Philosopher 会阻塞,等待该筷子被释放。

结果是一个看似没问题的程序死锁了。为了使语法更清晰,这里把集合改用数组来实现:

当发现输出停止时,程序便死锁了。死锁的发生似乎依赖于机器 CPU 核数,双核的机器并不会发生死锁,但在双核以上的机器上很快就会发生。此行为使该示例更好地说明了死锁,如果你在一个双核机器上编写程序,确信其运行正常,而直到安装到另一台设备上才出现死锁。注意,程序不易发生死锁,并不意味着不会发生,它仍然是有风险的,只是很难出现,同时这也是最糟糕的情况。

在 DiningPhilosophers 的构造器中,每个 Philosopher 都被分配了左右各一个 StickHolder 的引用。代码[1]演示了通过模数 n 选择右手边的筷子,并将最后一位 Philosopher 指回到第一位 Philosopher 旁边。

为了在[3]处逐个启动 Philosopher,调用了 runAsync(),这意味着 DiningPhilosopher 的构造器会在[4]处立刻返回。如果不想办法阻止 main() 完成,本程序就会很快退出,也做不了什么事。Nap 对象阻塞了 main() 的退出,然后经过3秒后,强制退出该已死锁的程序。

在当前配置下,Philosopher 实际上并没有时间思考,因此他们全都因吃饭而抢夺筷子,死锁很快就会发生。你可以做以下改变:1. 在[4]处增加更多的 Philosopher。2. 取消 Philosopher.java 中[1]处的注释。

上面的任何一项改变都会降低死锁的可能性,这演示了编写并发程序的危险性。本例之所以有趣,正因为它演示了一个程序可以在存在死锁风险的情况下依然看似运行正确。

可以观察到,出现死锁需要同时满足以下 4 个条件:

  1. 互斥条件。任务使用的资源中至少有一个不能共享的。这里,一根筷子同时只能被一个哲学家使用。

  2. 至少一个任务必须持有一项资源,并且等待正被另一个任务持有的资源。即如果要出现死锁,一位哲学家必须持有一根筷子,并且正在等待另一根。

  3. 不能从一个任务中抢走一项资源。任务只能以正常的事件释放资源。这里哲学家很有礼貌,他们并不会从其他人手里抢走筷子。

  4. 会发生循环等待,其中一个任务等待另一个任务持有的资源,另一个任务又在等待另一个任务持有的资源,以此类推,直到某个任务正在等待第一个任务持有的资源,由此一切都陷入了死循环。在 DiningPhilosophers.java 中,由于每一位哲学家都在试图先获取左边的筷子,再获取右边的,因此发生了循环等待。

由于这些条件必须同时发生才能导致死锁,因此可以阻止其中一个来避免发生。在本例中,可以破坏第四个条件,该条件会发生是因为每个人都是按指定顺序拿起筷子,因此就会发生都是拿着左边筷子,等待右边筷子的循环等待情况。然而,如果其中一位哲学家尝试先拿起左边的筷子,该哲学家就绝不会阻碍右边的哲学家拿起筷子,这样就阻止了循环等待。

可以通过取消[2]处注释确保第二位哲学家先拿起和放下右边的筷子,我们消除了死锁的可能性。这只是解决该问题的办法之一,你还可以通过阻止其他条件来解决。

语言层面上的支持无法帮助你避免死锁,你只能通过谨慎的设计来避免这个问题。

构造器并不是线程安全的

想象对象的构造过程,会很容易认为其是线程安全的。毕竟,在对象初始化完成前对外不可见,又怎么可能去竞争该对象呢?的确,Java 语言规范(Java Language Specification, JLS)明确说过:

将构造器设为同步并没有实际意义,因为这样做会阻塞正在构造的对象。在对象的所有构造器完成工作之前,其他线程通常无法使用该对象。

不幸的是,对象的构造过程如其他操作一样,也会受到共享内存并发问题的影响,只是作用机制可能更加微妙。

思考用一个静态字段为每个对象自动创建一个唯一标识的过程。为了测试不同的实现,我们从一个接口开始:

然后简单地实现该接口:

然后写一个测试工具,看一下生成多个创建对象的并发任务时会发生什么:

MakeObjects 类是个 Supplier,通过 get() 方法来生成 List<Integer>。该 List 是通过从每个 HasID 对象中提取 id 而生成的。test() 方法创建了两个并行的 CompletableFuture 来运行 MakeObjects 的 supplier,然后接收两者的结果,并通过 Guava 库 Sets.intersection() 来找出这两个 List<Integer> 中有多少个 id 是相同的(Guava 比 retainAll() 要快很多)。

测试 StaticIDField:

重复 ID 的数量很多,显然,单纯的 static int 对于构造过程并不安全。下面通过 AtomicInteger 来让该过程变为线程安全的:

构造器有种更巧妙的办法来共享状态,即通过构造器参数:

此处 SharedUser 的构造器共享了相同的参数。尽管 Shareduser 是通过完全无害且合理的方式来使用它的参数的,但构造器的调用方式导致了冲突。

虽然 Java 语言层面并不支持 synchronized 修饰的构造器,但是可以通过 synchronized 语句块,来创建自己的同步构造器(要了解 synchronized 关键字,看下一章)。虽然 JLS 声明“……这会阻塞正在创建的对象”,但这并不是真的一一构造器事实上是个静态方法,因此 synchronized 的构造器实际上会阻塞 Class 对象。我们可以通过创建自己的静态对象并对它上锁,来复现该过程:

这时对 Unsafe 类的共享是安全的了。

另一种方法是将构造器设为私有的(因此会阻止继承),并实现一个静态的工厂方法来生成新的对象:

通过将静态工厂方法设为同步的,在构造过程中对 Class 对象上锁。

以上这些示例强调了在并发 Java 程序中,发现和管理共享状态是多么难以捉摸。即使你采用了“什么都不共享”的策略,意外的共享还是格外容易发生。

工作量,复杂性,成本

假设要制作一个比萨,在制作过程中,从当前步骤到下一步骤所需的工作量在这里用枚举的一部分来表示:

这是一个简单的状态机,当比萨被放到盒子中时,状态机到达终点。

如果是一个人做一个比萨,那么所有步骤都是线性的:

时间以毫秒为单位,加总所有步骤的工作量,会得出与我们的期望值相符的数字。

如果像这样制作 5 个比萨,那么预期的耗时会变成 5 倍。如果想要更快,可以先从并行流的方式开始:

可以看到现在用一个比萨的时间可以做 5 个,如果移除 parallel(),就会发现不用并行又变成了 5 倍耗时。试着将 QUANTITY 改为 8,10,16,17,可以发现做 8 个的时间还是一个比萨的耗时,但是 10 和 16 个时就变成了两个比萨的耗时,而 17 个就变成了 3 个比萨的耗时。(这里机器只有 8 个逻辑处理器)

这个例子是在 forEach() 中完成所有工作的,如果将独立的步骤 map 起来,会有什么变化呢:

没有变化,因为每个比萨都需要所有的步骤按顺序执行,因此无法分离各个步骤来进一步提速。

可以用 CompletableFuture 来重写该示例:

并行流和 CompletableFuture 是 Java 并发工具中最发达的技术。不论何时你都应该优先选择其中之一。并行流方案最适合解决无脑并行类型问题,即那种很容易将数据拆分成相同的、易处理的片段来处理的问题(要自己实现这部分的话,你得先撸起袖子好好钻研 Spliterator 的文档)。 CompletableFuture 处理的工作片段最好是各不相同的,这样效果最好。CompletableFuture 看起来更像是面向任务的,而不是面向数据。

如果你确实需要使用并,并行 Stream 和 CompletableFuture 可能很容易带来显著的收益。但是当你想要进一步优化时,请一定要小心,因其耗费的成本和精力可能很快就会远远超过优化的好处。

总结

并发的主要缺点如下:

  1. 线程等待共享资源时会导致系统变慢。

  2. 管理线程需要额外的 CPU 开销。

  3. 不合理的设计决策会导致不必要的复杂性。

  4. 会带来饥饿、竞争、死锁、活锁(多个线程在各自独立的任务上工作,导致整体无法完成)等病态现象。

  5. 平台间不一致。有些情况下在某些机器上竞态条件很快就会出现,但在另一些机器上则不会。

另外,并发的应用是一门艺术。Java 的设计理念是,你需要多少对象,就可以创建多少对象一一至少理论上如此。然而,Thread 并不是典型的对象:每个线程都有自己的执行环境,包括一个栈和其他必要的元素,这使得线程比一个普通的对象要大很多。大部分环境下,系统内存只够你创建出几千个 Thread 对象(超出则会导致内存溢出)。但通常只会需要少量线程来解决问题,但是对于某些设计来说这会成为一种约束,可能会迫使你改用一个完全不同的方案。

并发伴随的一个主要难题是多个任务可能会共享一个资源,比如一个对象的内存,你必须保证这些任务不会同时读写该资源。总结:只要涉及共享内存并发,就永远不能对自己的编程能力太过自信。

本章聚焦于并行 Stream 和 Completablefuture 这两个相对安全和简单的工具上,并且只触及部分 Java 标准库中更细粒度的工具。而且并没有涉及一些在实际工作中可能会用到的库。我们用到了 Atomic 类、ConcurrentLinkedDeque,ExecutorService 和 ArrayBlockingQueue。下一章涵盖了一些其他的工具,而 Javadoc 中的java.util.concurrent 部分也值得你去探索。不过要注意,有些库组件已经被更好的新组件取代了。

《Java 并发编程实战》,Brian Goetz. Tim Peierl 等著。《Java并发编程》(第2版),Doug Lea著。这些书对于理解并发非常重要。