底层并发

在 Java 的早期版本中,底层并发的概念是井发编程中很重要的一部分。这里将探讨这些技术的复杂性,以及为什么应该避免使用它们。

什么是线程

并发将程序分割为多个各自独立运行的任务。每个任务都由执行线程(thread of execution)所驱动,简称为线程(thread)。线程是操作系统进程内部按单一顺序执行的控制流。由此一个进程可以包含多个并发执行的任务,而你编写程序时就好像每个任务都有一个自己的处理器一样。多线程模型提升了编程的便利性,可以简化在单个程序中调度多个任务的工作。操作系统会通过处理器为你的各个线程分配执行时间。

Java 并发的核心机制是 Thread 类。在最初的版本中,Thread 本来是打算让程序员直接创建和管理的。随着语言的演进,人们发现了更好的方法,中间机制一特别是 Executor 被加入进来,以消除自行管理多线程的精神负担(和出现的错误)。最后甚至发展出了更优于 Executor 的机制,如上一章中所示。

Thread 是一种将任务和处理器关联起来的软件结构。虽然 Thread 的创建和使用看起来和任何其他类没什么不同,但是它们的内在区别其实很大。在你创建 Thread 时,JVM 会在一块专为 Thread 保留的内存区域中分配一大块空间,从而为任务的运行提供所需的一切:

  • 一个程序计数器,指示要执行的下一条 JVM 字节码指令。

  • 一个支持 Java 代码执行的栈,包含该线程到达当前执行节点前所调用过的方法的相关信息。它同时还包含正在执行的方法的所有本地变量(包括基本类型和堆上对象的引用)。在每个线程中,该栈的大小通常在 64KB 和 1MB 之间。(在某些平台上,尤其是 Windows,可能非常难以找出栈空间的默认值。可以通过 -Xss 标志调整栈空间大小。)

  • 一个用于本地代码的栈。

  • 本地线程变量(thread-local variable)存储。

  • 用于控制线程的状态维护变量。

包括 main() 在内的所有代码都运行在某个线程之内。只要有方法被调用,当前的程序计数器便会跳向该线程的栈,然后栈指针会向下移动足够的距离,创建出一个栈帧(stack frame),其中包含该方法的所有本地变量、参数以及返回值的存储。所有的本地基本类型会直接放在该栈上。方法所创建的对象的引用全部保存在该栈帧内,而对象本身则放到堆上。堆只有一个,被程序中所有的线程所共享。

除此之外,Thread 必须由操作系统(OS)进行注册,这样它才可以在某个时机被真正关联到处理器上。这会作为 Thread 构造过程的一部分来管理。Java 通过底层操作系统的这种机制来管理多线程的执行。

最佳线程数

仔细看上一章使用的 CachedThreadPool 例子,可以发现 ExecutorService 为提交的每个任务都分配了一个线程。不过 CountingStream.java 中的并行 Stream 只分配了 8 个线程(worker 1-7,以及分配给 main() 的一个线程)。如果试着增加 range() 的上限,会发现不会创建额外的线程,这是为什么?我们可以检测出当前设备的处理器数量:

public class NumberOfProcessors {
    public static void main(String[] args) {
        System.out.println(Runtime.getRuntime().availableProcessors());
    }
}
/* output
8 */

这里机器是 4 核的,每个核心可作为 2 个超线程(hyperthread,一项硬件上的技术,可以在单个处理器上实现非常快速的上下文切换,某些情况下可以让该处理器看起来像是两条硬件级的线程)。

事实证明,线程“通常”的最佳数量就是可用处理器的数量(对某些特定问题来说可能不是这样)。这其中的缘由来自 Java 线程间的上下文切换开销:保存要挂起的线程的当前状态,读取要继续执行的线程在进入挂起状态时(所保存)的实时状态。对于 8 个处理器和 8 个(计算密集型)Java 线程的情况来说,JVM 永远不需要在运行这 8 个任务时进行上下文切换。对于任务数量少于处理器数量的情况,分配更多线程则并无益处。

Intel 的超线程定义了“逻辑处理器”的数量,它并没有增加实际的计算吞吐量一一该特性在硬件层维护了额外的线程上下文,可以提升上下文切换的速度,这有助于提升如用户界面的响应速度等方面的性能。对于计算密集型任务来说,可以考虑将线程数设置为和物理核心(而非超线程)的数量一致。虽然 Java 会将每个超线程都视作一个处理器,但这似乎是被 Intel 对超线程的过度营销所误导的结果。

可以创建多少个线程

Thread 对象中占据空间最多的部分是储存待执行方法的 Java 栈。注意,Thread 对象的大小会随不同的操作系统而不同。下面代码会不断创建 Thread 对象,直到 JVM 内存耗尽:

只要你一直向 CachedThreadPool 传人任务,它就会不断地创建 Thread。向 execute() 传入 Dummy 对象即可开启任务,分配一个新的 Thread(如果没有现成可用的 Thread)。Nap() 的参数必须足够大,这样任务就不会进人完成阶段,使得现有的 Thread 被释放,以执行新任务。只要不断传人任务并且使任务无法完成,CachedThreadPool 就最终会耗尽内存。

不一定总能在每台机器上都制造出内存溢出的错误(我这里就没有得到该错误)。

可以用 -Xss 标志减少每个 Thread 的栈大小。允许的最小栈空间是 64KB,如果将栈的大小增加到 2MB,可分配的线程数就会大大减少。

也可以用 -Xmx 标志增大 JVM 的最大可分配内存。

“工作窃取”线程池(WorkStealingPool)

这是一种能基于所有可用处理器(如 JVM 可检测出的)自动创建线程池的 ExecutorService。

工作窃取(Work Stealing)算法使得已完成自身输入队列中所有工作项的线程可以“窃取”其他队列中的工作项。该算法的目的是在执行计算密集型任务时,能够跨处理器分发工作项,由此最大化所有可用处理器的利用率,Java 的 fork/join 框架中同样也用到了该算法(通过双端队列实现工作窃取)。

捕获异常

看一个例子:

这段代码什么都没输出(不过如果将 submit() 替换为 execute(),便会看到异常)这说明在线程内部抛出异常很需要技巧,也需要特别仔细的操作。

无法捕获已逃离线程的异常。一旦异常逃逸到任务的 run() 方法之外,便会传播到控制台,除非采用专门的步骤来捕获这种不正确的异常。

下面任务抛出了传播到其 run() 方法之外的异常,而 main() 则显示了运行该任务时发生的情况:

即使在 main() 中用 try-catch 块包裹也不行:

这个代码生产了和前面示例一样的结果:一个未捕获的异常。要解决这个问题,就必须改变该 Executor 生成线程的方式。

Thread.UncaughtExceptionHandler 接口会向每个 Thread 对象添加异常处理程序。当线程由于未捕获的异常而即将消亡时,Thread.UncaughtExceptionHandler.uncaughtException()就会被自动调用。要使用该方法,可以创建一个新的 ThreadFactory 类型,它会为它所创建的每个新 Thread 对象添加一个新的 Thread.UncaughtExceptionHandler。将这个 ThreadFactory 工厂传入 Executors.newCachedThreadPool 方法,该方法会创建新 ExecutorService:

从附加的跟踪代码可以验证得知,工厂所创建的线程获得了新的 UncaughtExceptionHandler,现在未捕获的异常被 uncaughtException 捕获了。

上例基于具体问题具体分析的方式设置处理器。如果你确定要在所有地方都应用同一个异常处理程序,更简单的方法是设置默认未捕获异常处理程序,该处理器会在 Thread 类内部设置一个静态字段:

只有在为线程专门配置的未捕获异常处理程序不存在的时候,该处理器才会被调用。系统会检查是否存在为线程专门配置的处理器,如果不存在,则会检查该线程组是否专门实现了其 uncaughtException() 方法,如果没有,则会调用 defaultUncaughtExceptionHandler。

可以将该方法和之前介绍的 CompletableFuture 中改进后的方法逬行比较。

共享资源

单线程程序就像一个在问题空间中独自穿行的实体,一次只能做一件事,由于只有一个实体也就没有两个实体在同一时间使用同一个资源的问题。

有了并发,就不再是单项任务,而是变成了可能会互相影响的两个任务(甚至更多个)。如果你不去阻止这种冲突,便会出现两个任务试图同时访问同一个银行账户,同时调用同一个打印机进行打印,或者同时调整同一个阀门等情况。

资源竞争

当你启动一项任务来执行某些操作时,该操作的结果可以用两种不同的方法来捕获:通过副作用或者通过返回值。

副作用的方式看起来会更简单:只需用结果来操纵一下环境中的某个东西即可。例如,你的任务可能会执行某些计算,然后直接将结果写入一个集合。

这种方法存在的问题是,集合通常是共享资源。当正在运行的任务不止一个时,任何任务都可能同时对一个共享资源进行读或写操作。这便引出了多任务场景最主要的隐患之一:资源竞争(resource contention)问题。

解决该问题的一种途径是使用一个能够处理资源竞争问题的集合。如果同一时间有多个任务试图对这样的一个集合进行写操作,该集合能够妥善处理资源竞争问题。在 Java 并发库中能找到一些试图解决资源竞争问题的类,本章中会介绍其中的一小部分,但是并不全面。

下面的示例,一个任务会生成偶数,而其他任务则会消费这些偶数。此处,消费者任务唯一要做的事就是检查这些偶数的有效性。

这个示例中会定义 EvenChecker,即消费者任务,并使其在后续的示例中也能够复用。为了将 EvenChecker 从我们的各种实验用生成器中分离出来,首先创建一个名为 IntGenerator 的抽象类,它包含了 EvenChecker 需要用到的最小必要方法集一 next() 方法,而且它可以被取消。

cancel() 会改变 canceled 标志的状态,而 isCanceled() 则会告诉你该标志是否被设置了。由于 canceled 是个 AtomicBoolean,所以它是原子(atomic)的,这意味着诸如赋值和返回值这样的简单操作不会相互冲突,因此你不会看到该字段在进行这些简单操作时的中间状态。在本章稍后,你会学到更多关于原子性和 Atomic 类的知识。

任何 IntGenerator 都可以使用下面的 EvenChecker 类进行测试:

test() 方法启动了多个访问同一个 IntGenerator 的 EvenChecker。EvenChecker 任务持续从各自的关联 IntGenerator 中读取值并进行测试。如果 IntGenerator 导致失败,则 test() 会报错并返回。

所有依赖于 IntGenerator 对象的 EvenChecker 任务都会检查 IntGenerator,以确认任务是否已被取消。如果 generator.isCanceled() 的结果是 true,run() 方法就会返回。任何 EvenChecker 任务都可以调用 IntGenerator 上的 cancel() 方法,这会使其他所有使用该 IntGenerator 的 EvenChecker 任务正常地终止。

在这种设计中,多个任务共享了同一个公共资源(即 IntGenerator),监听该资源的终止信号。这样便消除了所谓的竞态条件,即两个以上的任务竞争响应某个条件,并因此发生冲突,或者生成了不一致的结果。

必须要谨慎地考虑并避免所有可能导致并发系统失败的因素。例如一个任务不能依赖于另一个任务,因为任务的终止顺序是无法保证的。此处,通过让任务依赖于非任务对象,可以消除竞态条件的隐患。

通常会假设 test() 方法终将失败,因为各个 EvenChecker 的任务在 IntGenerator 处于“不恰当”的状态时,仍能够访问其中的信息。不过,可能要等到 IntGenerator 完成很多次循环后,才会检测出该问题,这取决于操作系统的特性,以及实现方面的其他细节。要确保该书的自动化构建不会卡死,这里使用了 TimedAbort,定义如下:

这里使用 lambda 表达式创建一个 Runnable,该表达式使用 CompletableFuture 的 runAsync() 静态方法执行。runAsync() 方法的值会立即返回。 因此,TimedAbort 不会保持任何打开的任务,否则已完成任务,但如果它需要太长时间,它仍将终止该任务( TimedAbort 有时被称为守护进程(daemon))。

TimeAbort 同样允许 restart(),这是为了在需要持续运行某些必要的操作时,可以让程序一直保持运行。

测试一下 TimeAbort:

如果注释掉 Nap() 这一行,程序就会立刻退出,由此可以看出 TimedAbort 并未让程序保持运行。

下面是第一种 IntGenerator 的实现(EvenProducer),其中 next() 方法用来生成一系列偶数值:

[1] 一个任务能在另一个任务执行了 currentEvenValue 的第一个自增操作后调用 next(),但不能在第二个自增操作后调用。这会让该值处于“不恰当”的状态。

为证明这种情况是可能发生的,EvenChecker.test() 创建了一组 EvenChecker 对象,以持续读取 EvenProducer 的输出,并检测各个输出值是否是偶数。如果不是,便会报告错误,程序随即终止。

多线程程序的部分问题是,即使存在 bug ,如果失败的可能性很低,程序仍然可以正确显示。

注意,自增操作本身也包括多个执行步骤,而任务有可能在自增操作的中途被线程机制挂起。也就是说,Java 中的自增操作并不是原子操作。因此即使是简单的自增操作,如果不对任务进行必要的保护,也是不安全的。

该程序并不总是在首次生成非偶数值时就终止。所有的任务都不会立刻终止,这也是并发程序的典型特征。

解决资源竞争

前面的示例演示了使用线程时存在的重要问题:你永远无法知道一个线程可能在何时运行。想象你坐在餐桌前,手里拿着一根叉子,你正要叉起盘中的最后一块食物,当叉子正伸向食物的时候,食物突然消失了……这是因为你的线程被挂起了,然后另一位食客进人并吃掉了食物。这就是你在编写并发程序时面临的问题。为了让并发正确运行,需要用某种方式防止两个任务访问同一项资源,至少在关键时期要避免出现这种情况。

避免这种冲突的关键就是在一个任务使用一项资源的时候,对该资源上锁。第一个访问资源的任务必须锁上该资源,然后其他任务就无法访问该资源了,直到锁被解除,然后另一个任务再次上锁并使用该资源,以此类推。

要解决线程冲突的间题,可以使用基本的并发方案将共享资源的访问操作序列化。这意味着同一时刻只允许一个任务访问共享资源。这通常是通过用一个子句将一段代码包围起来,使得同一时刻只允许一个任务执行该段代码来实现的。由于该语句会产生互斥(mutual exclusion)的效果,因此这样的机制通常被称为互斥锁(mutex)。

考虑一下浴室的使用场景:多个人(即多个线程驱动的多个任务)都想单独使用浴室(即共享资源)。要进入浴室,一个人需要先敲门以确认浴室当前是否可用。如果可用,他就会进入浴室并锁上门。其他任何想要使用浴室的人就被“阻塞”而不能使用了,因此这些人需要在门口排队等候,直到浴室重新可用。

当浴室使用完毕,就是时候给其他任务进入,这时比喻就有点不准确了。事实上没有人排队,我们也不知道下一个使用浴室是谁,因为线程调度机制并不是确定性的。实际情况是,就好像有一组阻塞的任务在浴室门口徘徊,当锁住了浴室的任务开锁出门时,线程调度器会决定哪一个线程接下来逬入卫生间。

为了防止资源使用上的冲突,Java 实现了 synchronized 关键字这种形式上的内建支持。当一个任务想要执行一段由 synchronized 关键字所保护的代码段时,Java 编译器会生成代码以确认锁是否可用。如果可用,该任务便会获得锁,执行代码,然后释放锁。

共享资源一般只是以对象形式存在的一段内存,但它也可以是一个文件、一个 I/O 端口,或者诸如打印机这样的事物。要控制对共享资源的访问,首先要将资源放人一个对象中。然后任何使用该资源的方法都可以是 synchronized 的。如果一个任务位于对其中一个 synchronized 方法的调用中,则所有其他任务都会被阻塞,无法进入该对象的任何一个 synchronized 方法,直到第一个任务从其调用中返回。

通常需要将字段都设为 private 的,而且只通过方法来访问这些字段。可以通过使用 synchronized 关键字声明方法来防止冲突,就像这样:

所有对象都会自动包含一个锁(也称为监视器(monitor))。在调用任一 synchronized 方法的时候,该对象都会被锁上,其任何其他 synchronized 方法都无法被调用,直到第一个调用完成并释放锁。如果一个任务调用了某对象的 f() 方法,另一个任务便无法调用同一对象的 f()g(),直到 f() 的调用完成并释放锁。因此,存在一个被某特定对象的所有 synchronized 方法所共享的锁,这个锁可以防止多个任务在同一时刻对该对象的内存进行写操作。

尤其重要的是,在使用并发时要将字段都设为 private 的,否则 synchronized 关键字便无法阻止其他的任务直接访问字段,这样便产生了冲突。

一个线程可以多次获得一个对象的锁。如果一个方法调用同一对象上的第二个方法,而第二个方法又调用了同一对象上的另一个方法,便会发生这种情况,以此类推。JVM 会持续对对象被上锁的次数进行计数。如果该对象的锁被解除,该计数便为 0。一个线程第一次获得锁的时候,该计数便为 1。该线程每次获得该同一对象上的又一个锁时,该计数便会加 1。当然,只有首先获取锁的线程才被允许多次获得锁。线程每跳出一次 synchronized 方法,该计数便会减 1,直到减为 0,此时完全释放掉锁,让其他线程使用。

每个类还有一个锁(作为该类的 Class 对象的一部分),因此 synchronized static 的方法可以在类范围内相互上锁,防止同时访问 static 的数据。

应该在什么时候使用同步(synchronize)呢?可以使用 Brian 的同步法则:

如果你在对一个可能接下来会被另一个线程读取的变量进行写操作,或者读取一个可能刚被另一个线程完成写操作的变量,就必须使用同步,并且读操作和写操作都必须用同一个监控锁同步。

当类中的多个方法都要处理关键数据时,必须对所有相关方法都进行同步。如果只对其中一个方法进行同步,那么其他的方法就可以无视该对象锁,并被任意调用。这一点很重要:访问关键共享数据的所有操作都必须是 synchronized 的,否则便无法正确运行。

将 EvenProducer 同步化

下面的 SynchronizedEvenProducer 是一个将 EvenProducer.java 中的 next() 方法修改为 synchronized 版本的 IntGenerator,这样可以阻止不符合预期的线程访问:

此处在两次自增操作中间插入了 Nap() 方法,以提升 currentEvenValue 为奇数状态时,上下文切换发生的可能性。由于互斥锁在临界区一次阻止了多个任务,因此这不会导致失败发生。第一个进入 next() 的任务获取了锁,而任何试图获取锁的后续任务都因被阻塞而无法获取锁,直到第一个任务释放锁。同时,调度器机制会选择正在等待该锁的另一个任务。这样,一次只有一个任务可以调用该互斥锁所保护的代码。

volatile 关键字

volatile 可能是 Java 中最微妙、最难用的关键字了。幸运的是,在现在的 Java 中,你实际上可以永远避开使用这个关键字,并且如果你真的看到有代码用到了它,反而应该怀疑一一通常来说,这很可能意味着代码太旧了,或者这段代码的编写者并不理解 volatile 或并发可能产生的后果(或者两者兼而有之)。使用 volatile 主要有三个原因:

字分裂

字分裂(word tearing)出现在当你的数据类型足够大(如 Java 中的 long 和 double 这两者都是 64 位),对某个变量的写操作过程分为两个步骤的时候。JVM 允许将对 64 位数的读写操作分为两次对 32 位数的独立操作(注意,这在 64 位处理器上可能不会发生,也就不会有这个问题),这增加了在读写操作中途发生上下文切换的可能性,这样其他任务就会看到错误的结果。之所以称其为“字分裂”,是因为你可能会看到该数仅完成了部分变更时的值。总体来说,一个任务有时可能会在第一步之后、第二步之前读到该变量,从而读到一个垃圾值。(对于 boolean 或 int 这样的小变量来说,这不会成为问题,但 long 和 double 除外)

在没有任何其他保护的时候,将 long 或 double 定义为 volatile 变量,可以防止字分裂。不过,如果使用 synchronized 或者某个 java.util.concurrent.atomic 类保护了这些变量,volatile 便可以被取代。同样,volatile 无法改变自增操作并不是一个原子操作这个事实。

可见性

第二个问题属于“一切都不可信,一切都很重要”的一部分。必须假定每个任务都有自己的处理器,而且每个处理器都有自己的本地缓存。该缓存可以让处理器运行得更快,这是因为处理器不用每次都要从主存中获取数据了,从缓存中读取值所耗费的时间会少得多。

问题之所以出现是因为 Java 试图尽可能提升效率。缓存的全部意义就在于避免从主存读取数据。当并发时,有时不清楚 Java 什么时候应该将值从主内存刷新到本地缓存 —— 这种问题称为缓存一致性(cache coherence)问题。

每个线程都可以在处理器缓存中保存变量的本地副本。将字段定义为 volatile 的可以阻止这些编译器优化,从而直接从内存进行读取,而不会被缓存。一旦在该字段上发生了写操作,所有任务中的所有读操作都会看到(该写操作产生的)变更。如果 volatile 变量恰好存在于本地缓存中,它就会立刻被写人主存,对该字段的所有读操作都会一直发生在主存上。

在以下情况中,应该将变量定义为 volatile 的:

  1. 该变量会同时被多个任务访问

  2. 这些访问中至少有一个是写操作

  3. 试图避免使用同步(在现代 Java 中,可以使用高级工具,以取代对同步的使用)

举例来说,如果你将一个变量作为终止某个任务的标志,该变量必须至少被声明为 volatile 的(尽管这样的一个标志并不一定能保证线程安全)。否则,在一个任务对标志进行修改后,该修改会保存到本地处理器缓存中,而不会刷新到主存。当其他任务查看该标志时,便看不到这些修改。(作者更喜欢用 AtomicBoolean 来实现这样的标志,如上一章所述)

如果一个任务对自己的变量所做的任何修改都永远对该任务自身可见,即该变量只会在任务内部使用,就不需要 volatile 关键字。或者如果某个线程对一个变量进行了写操作,而其他线程只需要读取该变量,也无需设置为 volatile 的。

总体来说,如果多个线程都会对某个变量进行写操作,volatile 就无法解决你的问题,必须改用 synchronized 来防止竞态条件。但有个特殊的例外情况:可以让多个线程对该变量进行写操作,只要它们不用先读取该变量,再用读出来的值生成新的值写回该变量。如果这些线程使用结果中的旧值,便会出现竞态条件,因为某个别的线程可以在你的线程正在执行计算的时候修改该变量。

原子性和不确定性(volatility)并不是相同的概念,理解这一点很重要。对非 volatile 变量进行原子操作并不能保证该操作刷新到了主存中。

同步会触发刷新到主存的操作,所以如果某个变量受 synchronized 方法或语句块所保护(或者是 java.util.concurrent.atomic 类型),那就不需要将它设为 volatile 的。

(指令)重排序和先行发生

Java 可能会通过对指令进行重排序来优化性能,只要结果不会造成程序行为上的改变。不过,这里所说的重排序可能会影响到逻辑处理器缓存和主存的交互方式,造成不易察觉的程序 bug。直到 Java 5,这门语言的设计者们才理解并解决了该问题。现在 volatile 关键字可以防止对 volatile 变量的读写指令进行不正确的重排序。这样的重排序规则称为先行发生(happens before)保证。

在对 volatile 变量的读或写操作之前出现的指令,保证会在该读或写操作之前执行。同样,任何在 volatile 变量的读或写操作之后的指令,会保证在读或写操作之后执行。例如:

a、b 和 c 的赋值操作可能会被重排序,只要它们都发生在 volatile 写操作之前。同样,x,y 和 z 语句也可能会被重排序,只要它们都发生在 volatile 写操作之后。 volatile 操作通常称为内存栅栏(memory barrier)。先行发生保证确保了 volatile 变量的读写指令无法穿过内存栅栏而被重排序。

先行发生保证还有另一种效果:当一个线程对某个 volatile 变量执行写操作时,所有在该写操作之前被该线程修改的其他变量一一包括非 volatile 变量一一也都会被刷新到主存。当一个线程读取 volatile 变量的时候,也会读取到所有和 volatile 变量一起被刷到主存的变量,包括非 volatile 变量。虽然这个特性很重要,解决了 Java 5 之前非常隐蔽的一些 bug,但你不应该依赖该特性,“自动”将周围的变量隐式地变为 volatile 的。如果你希望某个变量是 volatile的,就应该让其他所有的代码维护人员明确地知道。

何时使用 volatile

如果你想要使用 volatile,原因很可能是你想要使一个变量变得线程安全,而又不带来同步导致的开销。由于 volatile 的使用需要掌握一些复杂的技巧,作者建议完全不要使用它,而应该使用本章稍后会介绍的一种 java.util.concurrent.atomic 类。这些类提供了完整的线程安全性,同时又比同步的开销低得多。

如果你想要调试其他人写的并发代码,首先就要找到用了 volatile 的地方,并将它们替换为 Atomic 变量。除非你确定代码的作者对并发有着深刻的理解,否则他们可能是在滥用 volatile。

原子性

在 Java 多线程的探讨中,有一个常见但错误的说法:“原子操作不需要同步”。原子操作指的是不会被线程调度器中断的操作,一旦操作开始,直到完成之前,中途就不可能发生任何上下文切换。依赖原生(innate)的原子性(原子性是某些特定数据类型内在的属性)会很复杂,也很危险。只有在你是并发专家,或者有并发专家帮助你的情况下,才应该用原生原子性来代替同步或者某些线程安全的数据结构。

理解原生原子性,并且知道它有助于实现更智能的 java.util.concurrent 库组件,这一点很有用。但是要强烈抵制住依赖它的冲动。

原子性适用于对基本类型(除了 long 和 double 以外)的“简单操作”。它保证了对基本类型变量(除了 long 和 double 以外)的读写操作是不可分割(原子)的。

因此原子操作不会被线程机制中断。专业的程序员可以利用这一点,编写出不需要使用同步的无锁代码(lock-free code)。但即便如此,这也是过度简化了。有时,即使原子操作貌似应该很安全,实际上也可能并不安全。试图移除同步,这通常是过早优化的信号,而目会给你带来很多麻烦。

相较于单处理器系统,在多核系统中,可见性(visibility)是比原子性更重要的问题。一个任务所做的修改,即使是某种意义上不会被中断的原子操作,也可能对其他任务是不可见的(比如该修改可能临时保存在本地处理器缓存中),因此不同的任务看到的程序状态是不同的。另一方面,在多处理器系统中,同步机制会强制某个任务所做的修改在整个程序中都是可见的。如果没有进行同步,修改何时能变得可见,这一点是不确定的。

怎样的操作才算得上是原子操作?字段的赋值和返回值操作可能算是。在 Java 中,下面两个操作一定不是原子操作:

如以下方法所生成的 JVM 指令所示:

每个指令都会生成一个“get”(读取)和一个“put”(写入),中间夹杂着一些其他指令。所以在读取和写人操作中间,另一个任务可能会修改该字段,因此该操作并不是原子的。

用下面这段代码来测试原子性的这个概念:定义一个抽象类,在其中写一个方法,将一个整型变量按偶数值自增,然后由 run() 方法持续调用该方法。

IntSupplier 是一个带有 getAsInt() 方法的函数式接口。

现在创建测试,以独立任务的方式启动 run(),然后读取值,检查它是否是偶数:

我们很容易盲目地应用原子性的概念。此处,getAsInt() 看似是安全的原子操作:

但是,还是在遇到非偶数值时失败了。虽然 return i 确实是原子操作,但这里未加上同步,这会导致该值能在对象处于不稳定的中间状态时被读到。此外,i 也不是 volatile 的,因此会存在可见性间题。getValue()evenIncrement() 都必须是 synchronized 的:(这样可以在 i 未被设为 volatile 的情况下,依然保证其线程安全性)

只有并发方面的专家才有足够的能力在这种情况下进行优化。再次强调,要应用 Brian 的同步法则。

Josh 的序列号

作为第二个示例,考虑一个生成序列号的类,每次调用 nextSerialNumber() 都必须返回一个唯一值:

因为 C++ 的自增操作可以实现为单处理器指令,所以该操作可以认为是原子的。但是如前所述,Java 的自增操作并不是原子性的,它分别涉及读和写,因此即使是如此简单的操作,也有发生线程问题的可能。

这个示例中用到了 volatile,只是为了试验它能否起到作用。然而真正的问题在于 nextSerialNumber() 方法没有通过同步的方式访问共享的可变值。

为了测试 SerialNumbers,我们会创建一个不会耗尽内存(导致溢出)的 set,以防检测出问题的时问过长。此处的 Circularset 复用了用来存储这些 int 的内存,最后覆盖了旧值(复制操作通常都很快,基本上也可以用 java.util.Set 来替代):

add() 方法和 contains() 方法是 synchronized 的,以防止线程冲突。

SerialNumberChecker 中含有一个持有最新一批序列号的 Circularset,以及用于填充 Circularset 并确保这些序列号唯一的 run() 方法:

test() 方法创建了多个 SerialNumberchecker 任务来争夺一个 SerialNumbers 对象。SerialNumberChecker 任务试图生成一个重复的序列号。(该过程在多核机器上会更快)

测试基础版本的 SerialNumbers 类,运行失败:

volatile 关键字并没有起作用,可以为 nextSerialNumber() 增加 synchronized 关键字。

不再需要 volatile 了,因为 synchronized 关键字确保实现了 volatile 的行为。

对基本类型的读取和赋值操作应该是安全的原子操作。但是如 UnsafeReturn.java 中所示,用原子操作访问处于不稳定中间状态的对象仍然很容易。要尝试解决该问题,处理起来很复杂,同时也很危险。最合理的做法就是遵循 Brian 的同步法则。(并且如果可以的话,首先就不要共享变量)

原子类

Java 5 引人了特殊的原子变量类,如 AtomicInteger、AtomicLong、AtomicReference 等。这些类提供了原子性的更新能力,充分利用了现代处理器的硬件级原子性,实现了快速、无锁的操作。

可以用 AtomicInteger 重写 UnsafeReturn.java:

通过使用 AtomicInteger,去除了 synchronized 关键字。

类似的,继续使用 Atomiclnteger 重写 SynchronizedEvenProducer.java:

通过 AtomicInteger,再一次去除了所有其他形式的同步。

下面是使用了 AtomicInteger 的 SerialNumber 实现:

这些都是只用到单个字段的简单例子,在创建更复杂的类时,必须确定哪些字段需要受到保护,而且在某些情况下,可能最终还是要在方法上使用 synchronized 关键字。

临界区

有时只是想防止多个线程同时访问方法中的部分代码,而不是(防止同时访问)整个方法。要隔离的代码区域称为临界区(critical section),它同样也是通过用来保护整个方法的 synchronized 关键字来创建的,但使用的语法不同。此处,synchronized 用于指定某个对象,该对象的锁用于对花括号内的代码进行同步控制:

这也称为同步控制块(synchronized block)。必须先获得 syncObject 的锁,才能进入这段代码。如果其他某个任务已经持有了该锁,则只有在锁被释放后才能进入该临界区。如果发生了这种情况,试图获取锁的任务就会被挂起。调度器会定期回来检查锁是否被释放,如果锁被释放,就会唤醒该任务。

使用 synchronized 块(而不是对整个方法都进行同步)的主要自的是提升性能(有时也是为了更灵活的算法一一但是在涉及并发的情况下,对灵活性要特别谨慎)。下面的示例演示了对代码块(而不是整个方法)进行同步可以极大地提升方法对于其他任务的可访问性(即并发处理能力)该示例会统计成功访问 method() 方法的次数,并启动相互竞争以试图调用 method() 的任务:

Guarded 在 callCount 中跟踪记录成功调用 method() 方法的次数。SynchronizedMethod 对整个 method() 方法进行了同步控制,而 CriticalSection 仅用 synchronized 同步控制块对方法的一部分进行了同步控制。这样,消耗时间的 Nap() 方法就可以被排除在 synchronized 控制块之外。从输出可以看出,CriticalSection 中的 method() 的可访问性要高得多。

记住,使用 synchronized 控制块也有风险:它要求你必须能够确定控制块外部的非 synchronized 代码的确是安全的。

Caller 任务试图在给定的时间段内尽可能多次调用 method()(并公布调用次数)。为了建立该时间段,我们使用了 java.util.Timer,虽然有点过时,但仍能起到很好的作用。它接收 TimerTask 为参数, TimerTask 并不是函数式接口,因此我们无法使用 lambda 表达式,只能显式地创建该类(这里使用了匿名内部类)。在时间段结束后,TimerTask 将 AtomicBoolean stop 标志设为 true,从而退出循环。

test() 方法接收 Guarded 参数,并创建了4个 Caller 任务,将其全部添加到同一个 Guarded 对象中,这样它们就会竞争获取 method() 使用的锁。

这通常也是要使用 synchronized 控制块而不是对整个方法进行同步控制的原因————使其他任务能够更多地访问。(只要能保证安全即可)

在其他对象上进行同步

synchronized 控制块必须在某个对象上进行同步。通常最合理的选择就是通过 synchronized(this) 使用当前对象,这也是之前的示例中 CriticalSection 所采用的方法。通过这种方式,在获得 synchronized 控制块的锁后,便无法调用同一对象中的其他 synchronized 方法和临界区了。因此,在 this 上进行同步控制时,临界区的作用是减少同步控制的范围。

有时必须在另一个对象上进行同步,但如果要这么做,就必须确保所有的相关任务都要在同一个对象上进行同步。下面这个示例演示了一个对象中的方法在不同的锁上进行同步时,两个任务都可以进入该对象。

DualSync.f()(通过对整个方法进行同步)在 this 上进行了同步控制,而 g() 则使用了在 syncObject 上进行同步的 synchronized 控制块。因此,这两个同步控制是相互独立的在 test() 中,通过运行两个分别调用 f()g() 的独立任务演示了这一点。fNap 和 gNap 标志分别指示了在 f()g() 中,哪一个应该在其 for 循环中调用 Nap()。例如在 f() 睡眠时,它仍然继续持有自己的锁,但你可以看到这并未阻止对 g() 的调用,反之亦然。

使用显式 Lock 对象

java.util.concurrent 库提供了显式的互斥机制,定义在 java.util.concurrent.locks 中。Lock 对象必须显式地创建、加锁以及解锁,因此它的语法不如内建的 synchronized 关键字优雅。不过它在解决某些类型的问题时更灵活。下面用显式的 Lock 重写 SynchronizedEvenProducer.java:

MutexEvenProducer 中增加了一个名为lock的互斥锁,并在 next() 中通过 lock()unlock() 方法创建了一个临界区。注意,在使用 Lock 对象时,在调用 lock() 后,必须放置一个 try-finally 语句,并在 finally 子句中调用 unlock()———这是唯一能保证锁一定会被释放的方法。注意,return 语句必须放在 try 子句中,以确保 unlock() 不会执行得太早,导致将数据暴露给下一个任务。

虽然 try-finally 比使用 synchronized 关键字所需的代码量更多,但也展现了显式 Lock 对象的一个优点。如果在使用 synchronized 关键字时出现了什么问题,便会抛出异常,但是并没有机会执行任何清理操作,以将系统维持在正常的状态。而如果使用显式 Lock 对象,便可以用 finally 子句来维护。

总体来说,在使用 synchronized 时,要写的代码会更少,同时用户错误发生的概率也会大大降低,因此通常只需要在解决特殊问题的时候才使用显式 Lock 对象。例如使用 synchronized 关键字时,你无法在开始尝试获取锁后,(如果没有获取成功)就立刻主动放弃,也无法等待一段时间(仍然没有获取成功)后再放弃——要想这样做,就必须使用 concurrent 库:

ReentrantLock 可以尝试获取锁,然后放弃,因此如果其他人已经持有了锁,你便可以决定转而去做其他事情,而不是一直等到锁释放,如 untimed()。在 timed() 中,先尝试获取锁,然后可以在2秒后放弃(注意 TimeUnit 类指定了单位)。在 main() 中以匿名类的方式创建了一个独立的 Thread,它获取了锁,因此 untimed()timed() 方法有了可以竞争的对象。

相较于内建的 synchronized 锁,显式 Lock 对象能够提供更细粒度的加锁和释放锁机制,这对于实现专门的同步结构非常有用,例如交替锁(hand-over-hand locking),也称为锁耦合(lock coupling),可用于遍历链表节点一一执行遍历的代码必须先捕获下一个节点的锁,然后才能释放当前节点的锁。

库组件

java.util.concurrent 库提供了大量用于解决并发问题的类,可以实现更简单、更稳健的并发程序。但是要注意,相比并行流或 CompletableFuture,这些工具属于更底层的机制。

这节会探讨一些使用了不同组件的示例以及无锁(lock-free)库。

延迟队列(DelayQueue)

这是一种由实现了 Delayed 接口的对象组成的无边界 BlockingQueue。其中的对象只有在延迟时间到期后才能从队列中取出。队列是有序的,按延迟时间排序的,延迟时间最短的元素排在队首(同时也是过期时间最久)。如果延迟时间还没到期,那么头部元素就相当于不存在,同时 poll() 会返回 null。(因此不能将 null 放入队列)

下面示例中,Delayed 对象自身就是任务,而 DelayedTaskConsumer 将最“紧急”的任务(即延迟时间最短的任务)从队列中取出并执行。因此,要注意 DelayQueue 是优先级队列的变体。

DelayedTask 中的 sequence 维护了创建任务的顺序,因此可以看到确实发生了排序。

Delayed 接口中有一个方法 getDelay(),它表明了延迟时间还有多久,或延迟时间已经过期了多久。该方法强制必须使用 TimeUnit 类,因为参效类型是这样定义的。这个类很好用,因为无须进行任何计算就可以转换单位。举例来说,delta 的值保存为毫秒但是 System.nanoTime() 生成的是纳秒。通过指定 delta 值的当前单位和想要转换成的目标单位,就可以转换它的值了,就像这样:

getDelay() 中,将目标单位作为 unit 参数传入方法,你可以使用该参数,在甚至不知道它是什么时间单位的情况下,就可以将触发时间的时差转换为调用者所请求的单位。(这是个策略设计模式的简单示例,将算法的一部分作为参数传入,从而提升了灵活性)

为了排序,Delayed 接口同样继承了 Comparable 接口。必须实现 compareTo() 方法。

优先级阻塞队列(PriorityBlockingQueue)

这是一种能够阻塞读取操作的优先级队列。下面示例中为 Prioritized 指定了各自优先级。一些 Producer 任务的实例将 Prioritized 对象插入 PrioritylockingQueue,但在插入操作之间加人了随机的延迟。然后,单个 Consumer 任务在执行 take() 时会呈现出多个选项,PriorityBlockingQueue 则将当前具有最高优先级的 Prioritized 对象传给它。

Prioritized 中的 static counter 是 AtomicInteger 类型。这是必要的,因为同时有多个 Producer 在并行运行,如果不使用 AtomicInteger,就会看到重复的 id 数值,这个问题在 5.12 节中介绍过。

与上个示例一样,List sequence 记住了 Prioritized 对象的创建顺序,以便与实际的执行顺序进行比较。EndSentinel 是一种特殊类型,它会告诉 Consumer 进行关闭。

Producer 用 AtomicInteger 来为 SplittableRandom 提供种子(seed),这样不同的 Producer 就可以生成不同的序列。因为多个 Producer 是并行创建的,所以这很有必要,否则构造过程就不是线程安全的了。

注意,Producer 和 Consumer 通过 PriorityBlockingQueue 相互关联起来。由于队列的阻塞特性提供了所有必要的同步控制,所以这里不需要任何显式的同步控制一一不用考虑在读取该队列的时候队列中是否含有元素。

无锁集合

早期的集合(如 Vector 和 Hashtable)有很多方法用到了 synchronized 机制,在非多线程程序中使用的时候会导致了不可接受的开销。在Java 1.2 中,新的集合库没有用到同步,而给 Collections 类中添加了多种声明为 static synchronized 的方法,来对不同的集合类型进行同步控制。这虽然是一项改进,因为可以让你选择是否对集合使用同步,但由于 synchronized 的锁定,开销依然存在。Java 5 增加了专门用来提升线程安全性能的新集合,使用一些技巧消除了锁定。

  1. 复制策略

利用“复制”策略,修改是在部分数据结构的一个单独副本上进行的(或者有时是整个数据结构的副本),该副本在修改过程中不可见。只有在修改完成后,修改后的结构才会安全地与“主”数据结构进行交换,然后读取方才能看到修改。

在 CopyOnWriteArrayList 中,写操作会复制整个底层数组。原始数组被留在原地,因此可以在修改副本数组的同时安全地读取(原始数组)。修改完成后,会通过原子操作将新数组交换进去,因此后续的读取操作可以看到新的信息。CopyOnwriteArrayList 的一个好处是,它不会在多个迭代器遍历及修改列表的时候抛出异常,因此你无须像过去那样编写专门的代码来避免发生这类异常。

CopyOnWriteArraySet 用 CopyOnWriteArrayList 实现了自身的无锁行为。

ConcurrentHashMap 和 ConcurrentLinkedQueue 使用类似的技巧支持并发读写操作,只对部分集合而不是整个集合进行了复制和修改。不过,读取方仍然不会看到任何未完成的修改。ConcurrentHashMap 不会抛出 ConcurrentModificationException。

  1. CAS操作

比较交换(Compare-And-Swap, CAS)操作中,你从内存中取出一个值,并在计算新值的同时继续用着原始值。然后通过 CAS 指令,将原始值和当前内存中的值进行比较,如果两者相等,则将旧的值替换为你的计算结果,以上所有操作都在单个原子操作中。如果原始值的比较失败了,替换就不会发生,因为这意味着另一个线程已经在此期间修改了内存,在这种情况下,你的代码必须重试,读取新的原始值并重复以上操作。

如果内存竞争不严重,CAS 操作基本不会重试,因此会很快。相反,synchronized 操作每次都需要获得锁和释放锁,这就导致开销要昂贵很多,而又没有额外的好处。随着内存竞争变得激烈,CAS 操作会变慢,因为它必须频繁地重试,但这是对更激烈的竞争的一种动态响应。

CAS 操作最好的地方在于,很多现代处理器的汇编语言中带有 CAS 指令,而 JVM 中的 CAS 操作(如 Atomic 类中的那些操作)利用了这些指令。CAS 指令是硬件级的原子操作,执行这样的操作就如你期望的那么快。

总结

这里介绍的只是让你能够对其有一定了解,这对该主题还远不够全面。要时刻记住,现在的语言设计者们仍旧在处理由早期语言设计者们的过度自信带来的麻烦事。(比如,可以看看 Thread 类中有多少方法被废弃了,而 volatile 直到 Java 5 才能正常工作)

并发编程需要遵循的步骤:

  1. 不要使用并发,尝试找別的方法来使程序运行得更快。

  2. 如果必须使用并发,就使用上一章演示过的最新的高级工具——并行流和 CompletableFuture。

  3. 不要在任务间共享变量。任何必须在任务间传递的信息都应该通过 java.util.concurrent 库中的并发数据结构来共享。

  4. 如果必须共享变量,要么用 java.util.concurrent.atomic 类型,要么对任何可以直接或间接访问这些变量的方法应用 synchronized。(很容易落入以为考虑全面的陷阱)

  5. 如果第 4 步生成结果的速度太慢,可以尝试用 volatile 或其他技巧来调整。

通常仅用 java.util.concurrent 库的组件就可以写出并发程序,从而完全避免挑战 volatile 和 synchronized。