—— Feellmoose

Java并发编程与设计

本文章主要带大家从:基本概念,Java 并发工具 JUC,并发与并发系统,与高并发设计等几个方面梳理 Java多线程编程。

引入

并发是一系列性能技术,专注于减少等待。 ——ON JAVA 8

并发与并行

  1. 并发是关于正确有效地控制对共享资源的访问。
  2. 并行是使用额外的资源来更快地产生结果。

这两个概念混合在一起的一个主要原因是包括Java在内的许多编程语言使用相同的机制线程来实现并发和并行。

同步并发编程

线程模型

线程是Java程序中执行代码的最小单元。

每个Java程序至少有一个主线程,可以创建额外的线程来执行并发的任务。守护线程是在后台提供服务的线程,当所有的非守护线程都终止时,守护线程会自动退出。典型的守护线程包括垃圾回收线程。

线程的生命周期包括几个不同的状态:NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED。

线程可以在这些状态之间转换,Java虚拟机(JVM)负责线程的调度和执行,线程调度器确定哪个线程应该在给定时间运行。

1
2
3
4
5
6
7
stateDiagram
[*] --> NEW
NEW --> RUNNABLE
RUNNABLE --> BLOCKED\nWAITING\nTIMED_WAITING
BLOCKED\nWAITING\nTIMED_WAITING --> RUNNABLE
RUNNABLE --> TERMINATED
TERMINATED --> [*]

多个线程访问共享资源时可能会引发竞态条件和数据不一致的问题,Java提供了线程间同步的工具。同时Java的线程模型通过Java内存模型提供了线程安全性的保证。

线程之间可以通过共享内存或显式通信来进行交互。Java提供了多种机制来实现与线程间通信。

互斥(Mutual Exclusion)

互斥是指确保某一资源同时只允许一个访问者对其进行访问,具有唯一性和排他性。如果有多个线程或进程试图同时访问同一资源,只有一个能够成功,其他的必须等待直到资源被释放。互斥的主要目的是防止数据不一致和冲突,确保资源在任何时刻都只被一个线程或进程访问。

同步(Synchronization)

同步是互斥的一个扩展,它在互斥的基础上增加了对访问者访问资源的顺序控制。

注意:线程同步不一定发生阻塞,只有当访问同一资源出现互相等待和互相唤醒会发生阻塞,即一个线 程主动等待另一线程释放资源。

为什么需要进行同步

CPU、内存、I/O 设备的速度是有极大差异的,为了合理利用 CPU 的高性能,平衡这三者的速度差异, 计算机体系结构、操作系统、编译程序都做出了贡献,主要体现为:

  1. CPU 增加了缓存,以均衡与内存的速度差异:导致可见性问题。
  2. 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异:导致原子性问题。
  3. 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用:导致有序性问题。

可见性:一个线程对共享变量的修改,另外一个线程一定能够立刻看到

原子性:一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行

有序性:即程序执行的顺序按照代码的先后顺序执行

顺序一致性(sequentially consistent)

如果程序是正确同步的,程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。这里的同 步是指广义上的同步,包括对常用同步原语(synchronized,volatile 和 final)的正确使用。

顺序一致性内存模型

顺序一致性内存模型是一个被计算机科学家理想化了的理论参考模型,它为程序员提供了极强的内存可 见性保证。

  1. 一个线程中的所有操作必须按照程序的顺序来执行。
  2. (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中, 每个操作都必须原子执行且立刻对所有线程可见。

JAVA内存模型(JMM)

Java内存模型规范了JVM如何提供按需禁用缓存和编译优化的方法,以达到顺序一致性。

  1. Java提供的同步原语(volatile,synchronized,final)
  2. JVM规范Happens-Before规则

JSR-133 Happens-Before规则

单一线程原则,在一个线程内,在程序前面的操作先行发生于后面的操作。

管程锁定规则,一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。

volatile 变量规则,对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。

传递性,如果操作 A 先行发生于操作 B,操作 B 先行发生于操作 C,那么操作 A 先行发生于操作C。

线程启动规则,Thread 对象的 start() 方法调用先行发生于此线程的每一个动作。

线程加入规则,Thread 对象的结束先行发生于 join() 方法返回。

线程中断规则,对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 interrupted() 方法检测到是否有中断发生。

对象终结规则,一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。

synchronized

synchronized是一种锁机制,用于实现线程同步。它可以应用于方法或代码块,防止多个线程同时访问 共享资源,从而避免数据竞争和并发问题。当一个线程进入同步代码块时,其他线程必须等待,直到该 线程释放锁,才能继续执行。

synchronized可以提供最高级别的线程安全:同步,从而保证了有序性,可见性,原子性。

锁的分类

  1. 互斥锁(Mutex):互斥锁是最基本的锁类型之一。它提供了两个状态:锁定和解锁。只有获得锁 的线程才能执行临界区代码,其他线程需要等待锁的释放。互斥锁用于保护共享资源,确保在任意 时刻只有一个线程可以访问该资源。

  2. 可重入锁 可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层 方法会自动获取锁,不会因为之前已经获取过还没释放而阻塞,可重入锁可一定程度避免死锁。

  3. 悲观锁,认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加 锁,适合写操作多的场景。(广义上的锁)

    乐观锁,认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时 候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的 数据成功写入,适合读操作多的场景。(原子类或叫做CAS)

  4. 公平锁,多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才 能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待

    队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。

    非公平锁,多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁 刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先 获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率 不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很 久才会获得锁。

  5. 排他锁,一次只能被一个线程所持有。如果线程T对数据A加上排它锁后,则其他线程不能再对A加 任何类型的锁。获得排它锁的线程即能读数据又能修改数据。 共享锁,可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享 锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。

  6. 自旋锁,自旋锁是一种基于忙等待的锁,它不会使线程进入睡眠状态,而是在获取锁之前反复检查 锁的状态。如果锁已被其他线程占用,则当前线程会一直忙等待直到锁被释放。 使用自旋锁主要是由于阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换 需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码 执行的时间还要长。此时可以使用自旋锁,避免线程频繁地切换状态。

使用synchronized进行同步

  1. 同步方法:在方法声明中使用 synchronized 关键字,将整个方法体标记为同步代码块。当一个线 程进入同步方法时,它会获取该方法所属对象的锁,并在执行完毕后释放锁,从而确保同一时间只 有一个线程可以执行该方法。
  2. 同步代码块:使用 synchronized 关键字创建同步代码块,以确保一段代码在同一时间只能被一个 线程执行。同步代码块需要指定一个对象作为锁,只有持有该对象的线程才能执行代码块。

复杂条件下进行线程间同步:

  1. 保护性暂停(Guarded Suspension):一个线程等待另一个线程的某个条件满足时再继续执行, 通常使用观察者模式 wait() 和 notify() 或者 await() 和 signal() 方法来实现。通过将线 程挂起,可以使得线程在一段指定的时间内暂停执行,或者等待某个条件满足后再继续执行。这样 可以有效地控制线程的执行时间和资源占用,以提高系统的性能和响应性。
  2. 忙等待模式(Busy Waiting Pattern):线程在等待某个条件满足期间,通过循环忙等待的方式持 续检查条件,用于实现简单的同步和等待机制。这种循环会持续占用 CPU 资源,导致 CPU 忙于执 行循环而无法处理其他任务。这种循环通常是不可取的,因为它会浪费系统资源,并可能导致程序无法正常响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Ticket {
private int ticketCount;
public Ticket(int initialCount) {
this.ticketCount = initialCount;
}
public synchronized void sellTicket(int amount) {
if (ticketCount >= amount) {
System.out.println(Thread.currentThread().getName() + " sold " + amount + " tickets.");
ticketCount -= amount;
} else {
System.out.println(Thread.currentThread().getName() + " not enough tickets to sell " + amount);
}
}
}
class TicketSeller implements Runnable {
private Ticket ticket;
public TicketSeller(Ticket ticket) {
this.ticket = ticket;
}
@Override public void run() {
ticket.sellTicket(1);
}
}
public class TicketOffice {
public static void main(String[] args) {
Ticket ticket = new Ticket(10);
Thread seller1 = new Thread(new TicketSeller(ticket), "Seller1");
Thread seller2 = new Thread(new TicketSeller(ticket), "Seller2");
Thread seller3 = new Thread(new TicketSeller(ticket), "Seller3");
seller1.start();
seller2.start();
seller3.start();}
}

synchronized的原理

synchronized底层使用 MonitorenterMonitorexit 指令,加锁对象在执行时使其锁计数器加1或者减1。每一个对象在同一时间只与一个monitor(锁)相关联,而一个monitor在同一时间只能被一个线程获得。一个对象在尝试获得与这个对象相关联的Monitor锁的所有权的时候, monitorenter指令会发生如下3中情况之一:

  1. monitor计数器为0,意味着目前还没有被获得,那这个线程就会立刻获得然后把锁计数器+1,一旦+1,别的线程再想获取,就需要等待。
  2. 如果这个monitor已经拿到了这个锁的所有权,又重入了这把锁,那锁计数器就会累加变成2, 并且随着重入的次数,会一直累加。
  3. 这把锁已经被别的线程获取了,等待锁释放。

monitorexit :将monitor的计数器减1,如果减完以后,计数器不是0,则代表刚才是重入进来的,当 前线程还继续持有这把锁的所有权,如果计数器变成0,则代表当前线程不再拥有该monitor的所有权, 即释放锁。

任意线程对某被加锁对象的访问,首先要获得被加锁对象的monitor,如果获取失败,该线程就进入同步状态,线程状态变为阻塞,当被加锁对象的monitor占有者释放后,在同步队列中得线程就会有机会重新获取该监视器,若新来的线程立即获得监视器,则不会被加入同步队列。synchronized实际上是非公平可重入的锁。

synchronized锁优化

在JVM中monitorentermonitorexit字节码依赖于底层的操作系统的Mutex Lock来实现的,但是由于使用Mutex Lock需要将当前线程挂起并从用户态切换到内核态来执行,这种切换的代价是非常昂贵的。

然而在现实中的大部分情况下,同步方法是运行在单线程环境(无锁竞争环境)如果每次都调用Mutex Lock那么将严重的影响程序的性能。

Java1.6 中对锁的实现引入了大量的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight Locking)、偏向锁(Biased Locking)、适应性自旋(Adaptive Spinning)等技术来减少锁操作的开销。

  1. 锁粗化,减少不必要的紧连在一起的unlock,lock操作,将多个连续的锁扩展成一个范围更大的锁
  2. 锁消除,通过运行时JIT编译器的逃逸分析来消除一些没有在当前同步块以外被其他线程共享的数据的锁保护,通过逃逸分析也可以在线程本的Stack上进行对象空间的分配(同时还可以减少Heap上的垃圾收集开销)。
  3. 轻量级锁,这种锁实现的背后基于这样一种假设,即在真实的情况下我们程序中的大部分同步代码一般都处于无锁竞争状态(即单线程执行环境),在无锁竞争的情况下完全可以避免调用操作系统层面的重量级互斥锁,取而代之的是在monitorentermonitorexit中只需要依靠一条CAS原子指令就可以完成锁的获取及释放。当存在锁竞争的情况下,执行CAS指令失败的线程将调用操作系统互斥锁进入到阻塞状态,当锁被释放的时候被唤醒(具体处理步骤下面详细讨论)。
  4. 偏向锁,偏向锁通过对比 Mark Word 在无锁竞争的情况下避免在锁获取过程中执行不必要的CAS原子指令,避免执行CAS操作带来非常可观的本地延迟。
  5. 适应性自旋,当线程在获取轻量级锁的过程中执行CAS操作失败时,在进入与monitor相关联的操作系统重量级锁(mutex semaphore)前会进入忙等待(Spinning)然后再次尝试,当尝试一定的次数后如果仍然没有成功则调用与该monitor关联的semaphore(即互斥锁)进入到阻塞状态。

在Java SE 1.6里 synchronized 一共有四种状态:无锁、偏向锁、轻量级锁、重量级锁,它会随着竞争情况逐渐升级。锁可以升级但是不可以降级,目的是为了提供获取锁和释放锁的效率。

在JDK 15 移除了偏向锁,原因在于引入偏向锁,主要是为了优化 Vector,Hash Table 两个集合相关的代码,但是现在看来这两个集合很少有人用到,同时 JVM 撤销偏向锁状态比较消耗资源。 JDK 15 取消了偏向锁,锁升级流程改为无锁到轻量级锁再到重量级锁。

synchronized的缺陷

  1. 效率低,锁的释放情况少,只有代码执行完毕或者异常结束才会释放锁;试图获取锁的时候不能设定超时,不能中断一个正在使用锁的线程。
  2. 不够灵活,加锁和释放的时机单一,无法执行更加复杂的优化。由虚拟机负责调度,无法获取加锁相关信息。
  3. 暂不支持虚拟线程。

为什么我们还在使用synchronized

synchronized是JVM平台提供的并发原语,如果 synchronized 关键字适合你的程序, 那么请尽量使用它,这样可以减少编写代码的数量,减少出错的概率。因为一旦忘记在 finally 里 unlock,代码可能会出很大的问题,而使用 synchronized 在编码上更安全。

volatile

JSR-133 提供了更强大、更可靠的 volatile 变量语义。使用 volatile 关键字修饰的变量可以保证线程之间的可见性、有序性,并防止指令重排序引起的问题。

我们以单例的双重检查锁为例,以下是 Java 中安全性较好性能较高的单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Singleton {
public static volatile Singleton singleton;
private Singleton() {};
public static Singleton getInstance() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}

最外层的判定,使一次加载后无需在获取单例时继续加锁;内部加锁可以保证一次只有一个线程加载单例;再次判空可以防止获取锁失败的等待线程在获取锁后再次初始化单例。但是现在还有一个疑问,即为什么需要对单例添加volatile。

要理解这个问题,先要了解对象的构造过程,实例化一个对象其实可以分为三个步骤:分配内存空间,初始化对象,将内存空间的地址赋值给对应的引用。

但是由于操作系统可以对指令进行重排序,所以上面的过程也可能会变成如下过程:分配内存空间,将内存空间的地址赋值给对应的引用,初始化对象。

在将内存空间的地址赋值给对应的引用后,如果其他线程在进入第一层判空后通过,并拿到尚未完全初始化的对象,从而导致不可预料的结果。因此,为了防止这个过程的重排序,我们需要将变量设置为volatile类型的变量。通过使用 volatile 关键字,双重检查锁能够在多线程环境中正确地实现延迟初始化的单例模式,并提供线程安全的访问。

volatile提供有序性

happens-before 规则中有一条是 volatile 变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。

Java 编译器会在生成指令系列时在适当的位置会插入内存屏障指令,禁止在volatile变量的写操作和任何后续读操作之间进行重排序,以及在volatile变量的读操作和任何后续写操作之间进行重排序。这确保了volatile变量的读写操作的有序性。

volatile提供可见性

可见性问题主要指一个线程修改了共享变量值,而另一个线程却看不到。

引起可见性问题的主要原因是每个线程拥有自己的一个高速缓存区线程工作内存,变量的值在被读取后会被缓存在内存中,而不会被强制更新。

而volatile会强制将当前处理器缓存行的数据写回到系统内存,使各处理器的缓存失效。CPU为保证各个处理器的缓存是一致的,会实现缓存一致性协议,当处理器发现本地缓存失效后,就会从内存中重读该变量数据,即可以获取当前最新值。这样每个线程都能获得该变量的最新值。(CPU缓存一致性的工作机制,将会导致多线程/多核访问(原子)变量的竞争,从而使得一个操作/指令在一定情况下(因为缓存失效)需要相对更长的时间才能完成)。

同时volatile禁止了指令重排,确保在多线程环境下,对 volatile 变量的写操作先于读操作发生,从而确保其他线程能够看到最新的值。

volatile不提供原子性

虽然volatile可以提供可见性和有序性,但它并不能保证原子性。因此,对于复杂的同步需求,通常需要使用synchronized或其他的同步机制。

volatile常用模式

volatile 对于一些需要保证操作顺序的场景,如状态同步、标志位的读写等,非常有用。

  1. 状态标志,使用变量表示某个状态的标志位,并被多个线程共享。
  2. 一次性安全发布(one-time safe publication),确保初始化发生在引用暴露之前。
  3. 独立观察(independent observation),定期发布观察结果供程序内部使用。
  4. 开销较低的读写锁策略,使用 synchronized 确保增量操作是原子的,并使用 volatile 保证当前结果的可见性。因为读路径的开销仅仅涉及 volatile 读操作,可实现更好的性能。
  5. 双重检查(double-checked),单例模式。

final

final 字段的内存语义在 JSR-133 中得到增强,其修饰的成员字段的写入操作保证在构造函数中完成,防止了与其他线程的潜在冲突,提供了初始化安全保证。

常见同步机制

同步机制可以帮助确保线程之间的正确执行顺序、避免竞态条件和提高性能。我们可以使用锁、信号量、条件变量或其他同步机制来保证多个线程之间的互斥和同步,保证共享资源的一致性和可见性:

  1. 互斥量(Mutex):
    互斥是一种基本的并发协调模式,用于实现线程之间的互斥访问共享资源。它通过引入互斥锁来确保在给定时间只有一个线程可以访问共享资源,其他线程必须等待直到锁被释放。
  2. 信号量(Semaphore):
    信号量是一种计数器,用于控制对共享资源的访问。可以实现多个线程同时访问公共区域数据,原理与操作系统中PV操作类似,先设置一个访问公共区域的线程最大连接数,每有一个线程访问共享区资源数就减一,直到资源数小于等于零。它可以用来限制同时访问共享资源的线程数量,并提供互斥和同步机制。
  3. 条件变量(Condition Variable):条件变量用于在线程之间进行条件等待和通知。它允许线程在某个条件满足之前等待,并在条件满足时被通知继续执行。条件变量通常与互斥锁一起使用,以确保线程安全地等待和通知。
  4. 屏障(Barrier):
    屏障是一种同步机制,用于确保一组线程在达到屏障点之前都暂停执行,并在所有线程都到达屏障点后继续执行。通常用于将多个线程分成阶段进行并行计算,并在每个阶段结束时进行同步
  5. 相位器(Phaser):
    相位器是一种相较于屏障更灵活、功能更强大的同步机制,它也允许线程分阶段地协调和同步。其允许线程在多个同步点上等待,每个同步点被称为一个相位。在每个相位结束时,相位器可以自动将等待的线程解除阻塞,或者在指定的条件下进行选择性解除阻塞。同时相位器还提供了更多高级功能,例如注册和注销参与者、动态调整参与者的数量,以及在同步点上执行特定的操作。

Java并发编程工具(JUC)

JSR-166 于2004年提出,是 Java 社区对并发编程的规范要求,旨在提供更强大、更丰富的并发编程支持,其中包含了一组并发编程的API和工具类,它引入了大量的并发工具和类,用于处理多线程编程的常见问题。

  1. 同步工具类:用于实现线程间的协调和同步。
  2. 原子类:提供了一组原子操作类,用于实现线程安全的原子操作。
  3. 并发集合:提供线程安全的集合类,用于在多线程环境中进行安全的数据访问和操作。
  4. 线程池:用于管理和调度线程的池,通过复用线程来提高性能和资源利用率。
  5. 并发执行框架:提供了一种高级的任务执行和调度机制,通过任务和执行器的概念,将任务的提交和执行进行解耦,简化了并发编程的模型。

阻塞同步(Block synchronization)

当线程请求获取锁或从阻塞队列中获取元素时,如果资源不可用,线程将被阻塞,直到资源可用或等待超时。

锁(Lock)

Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的Condition对象。

使用锁时应始终在try-finally块中确保在操作完成后释放锁,以确保锁能够正确释放,即使在操作过程中发生异常。这样可以避免死锁和资源泄漏等问题。

可重入锁(Reentrant Lock)

使用可重入锁来实现线程间的互斥和同步,支持线程的嵌套获取锁,它允许同一个线程多次获得该锁,而不会造成死锁。线程在每次获得锁时,需要相应地释放相同数量的锁才能完全释放它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;

public class Counter {
private final Lock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}

public class CounterThread extends Thread {
private final Counter counter;
public CounterThread(Counter counter) {
this.counter = counter;
}
public void run() {
for (int i = 0; i < 1000; i++) {
counter.increment();
}
}
}

public class Main {
public static void main(String[] args) {
Counter counter = new Counter();
CounterThread t1 = new CounterThread(counter);
CounterThread t2 = new CounterThread(counter);
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final count is: " + counter.getCount());
}
}

可重入读写锁(ReentrantReadWriteLock)

用于优化读多写少的场景,使用读写锁允许多个线程同时读取共享数据,但在写入操作时需要互斥访问。将读操作和写操作分离,来实现读操作的并发执行,写操作的互斥执行,提高读多写少场景的并发性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class CachedData {
private Object data;
private volatile boolean cacheValid;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
//必须先释放读锁再获取写锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
//重新检查状态,因为其他线程可能已经获取了写锁并改变了状态
if (!cacheValid) {
data = ...; //获取数据
cacheValid = true;
}
rwl.readLock().lock();//降级为读锁
} finally {
rwl.writeLock().unlock(); // 释放写锁
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
void use(Object data) {
//使用数据...
}
}

条件变量(Condition)

条件变量是一种并发编程中的同步机制,通常与锁结合使用,用于线程之间的等待和通知。

Condition允许程序员更精细地控制线程的等待和唤醒,可以在满足特定条件之前使线程等待,以避免忙等待的浪费。一个Lock对象可以关联多个Condition对象,每个Condition对象可以表示不同的等待条件。这样,可以在不同的条件下对线程进行等待和唤醒操作,更灵活地实现线程间的协作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerExample {
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
private int[] buffer = new int[10];
private int count = 0;
public void produce(int value) {
lock.lock();
try {
while (count == buffer.length) {
// 缓冲区已满,等待 notFull 条件
notFull.await();
}
buffer[count] = value;
count++;
// 通知 notEmpty 条件的等待线程
notEmpty.signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public int consume() {
lock.lock();
try {
while (count == 0) {
// 缓冲区为空,等待 notEmpty 条件
notEmpty.await();
}
int value = buffer[--count];
// 通知 notFull 条件的等待线程
notFull.signal();
return value;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
} finally {
lock.unlock();
}
}
}

StampedLock

Java1.8后增加的锁工具,把读锁细分为乐观读和悲观读,能进一步提升并发效率。但StampedLock为不可重入锁,不能在一个线程中反复获取同一个锁,本身不直接支持Condition接口。使用其会使代码更加复杂。

其他同步工具类

CountDownLatch

创建CountDownLatch,指定等待的线程数量。在等待的线程中,可以调用await()方法阻塞等待,直到计数器归零。在其他线程中,通过调用countDown()方法来减少计数器的值。当计数器的值减为0时,等待该计数器的线程将被唤醒。

信号量(Semaphore)

创建一个Semaphore对象,指定资源的初始数量。在需要访问受限资源的线程中,使用acquire()方法获取资源的许可(P操作),当没有可用的资源许可时,线程将被阻塞,直到有一个资源许可可用。在访问受限资源的代码执行完毕后,使用release()方法释放资源的许可(V操作)。

循环屏障(CyclicBarrier)

创建一个CyclicBarrier对象,指定等待的线程数量。这些线程会在某个点上调用await()方法,阻塞并等待指定数量的线程到达该点,后释放所有阻塞线程并继续执行。

如果在创建时指定了回调函数,那么当所有线程都到达之后,最后一个到达的线程将执行回调函数。

相位器(Phaser)

创建一个Phaser对象,指定参与线程的数量(或手动注册参与线程)。在每个线程的关键点上,调用arriveAndAwaitAdvance()方法等待其他线程到达,直到其他线程到达相同的阶段。在线程完成所有关键点时,调用arriveAndDeregister()方法立即取消注册,并继续执行后续操作。通过取消注册,可以动态地调整参与阶段性同步的线程数量。

交换器(Exchanger)

创建一个特定类型的交换器对象,用于实现两个线程之间的数据交换。交换器提供了一个点,两个线程可以在该点上交换数据,并在交换完成后继续执行。在exchange()方法调用时,当前线程将会阻塞,直到另一个线程也调用了exchange()方法,然后进行数据交换。

阻塞队列

ArrayBlockingQueue

数组阻塞队列。满足FIFO原则,队列的头部是在队列中存在时间最长的元素。队列的尾部是在队列中存在时间最短的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
Producer(BlockingQueue<Integer> q) {
this.queue = q;
}
public void run() {
try {
for (int i = 0; i < 20; i++) {
System.out.println("Produced: " + i);
queue.put(i); // 如果队列满了,会阻塞
Thread.sleep(100); // 模拟耗时的生产过程
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
Consumer(BlockingQueue<Integer> q) {
this.queue = q;
}
public void run() {
try {
while (true) {
Integer take = queue.take(); // 如果队列空了,会阻塞
System.out.println("Consumed: " + take);
Thread.sleep(1000); // 模拟耗时的消费过程
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

public class Setup {
public static void main(String[] args) {
BlockingQueue<Integer> q = new ArrayBlockingQueue<>(5);
Producer p = new Producer(q);
Consumer c = new Consumer(q);
new Thread(p).start();
new Thread(c).start();
}
}

LinkedBlockingQueue/Deque

链表阻塞队列/双端队列。链表的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

PriorityBlockingQueue

优先级阻塞队列,使用Lock机制实现并发访问。依赖于comparable排序,不允许存放null值,不允许存放不可比较的对象类型。

DelayQueue

延迟队列,一个无界阻塞优先级队列,最先过期的元素将排在队列的头部,并使用Lock机制实现并发访问。可以将延迟队列看作定时任务调度器,其中的元素必须实现 Delayed 接口,以便按照预期进行延迟排序。当消费者从队列中获取元素时,如果元素还未过期,消费者将被阻塞直到元素过期。

SynchronousQueue

一个零容量的队列,不会存储任何元素,只用于在生产者和消费者之间进行元素的同步交换。

SynchronousQueue提供了一对一的同步的元素传输机制。当生产者线程尝试向队列插入元素时,它会被阻塞,直到有一个消费者线程尝试从队列中提取元素。当消费者线程尝试从队列中提取元素时,它也会被阻塞,直到有一个生产者线程尝试向队列中插入元素(单向的Exchanger,但通常拥有更好的效率)。

LinkedTransferQueue

一个无界的队列,结合LinkedBlockingQueue和SynchronousQueue的特性,提供了更强大的线程间数据传输能力。主要用于在生产者和消费者之间进行直接的数据传输,允许多个生产者和多个消费者参与。(Java 7)

LinkedTransferQueue提供了多对多异步的元素传输机制。它允许生产者线程使用 transfer() 方法将元素传输给消费者线程,如果没有消费者线程等待接收元素,则生产者线程可以选择等待或立即返回。

同步阻塞底层实现框架

LockSupport

LockSupport是用来支持锁和其他同步类的线程阻塞原语。当调用LockSupport.park()时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark()时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行。

AQS

Java中的大部分同步类都是基于AbstractQueuedSynchronizer框架(简称为AQS)实现的,AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能,并且依赖于先进先出 (FIFO) 等待队列的简单框架。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

AQS定义两种资源共享方式

  • Exclusive(排他锁)(公平锁,非公平锁)
  • Share(共享)

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

非阻塞同步(Non-blocking Synchronization)

非阻塞同步是一种不依赖锁或其他阻塞操作的同步手段。它通常依赖于原子操作或其他的并发容器来实现。非阻塞同步的主要目标是避免线程在等待资源或同步操作时发生阻塞,从而提高程序的并发性和性能。

非阻塞同步的核心思想是在多线程环境中,不使用阻塞等待的方式来实现同步控制。线程在尝试访问共享资源或执行同步操作时,如果资源不可用或条件不满足,不会进入阻塞状态,而是立即返回并继续执行其他任务。这样,线程可以一直保持计算操作,不会被阻塞,从而提高了系统的吞吐量和响应能力。

原子类

JUC提供了一组原子操作的封装类,利用CAS通过无锁的方式实现的线程安全访问。JDK中大量使用了CAS来更新数据而防止加锁(synchronized 重量级锁)来保持原子更新。

CAS的全称为Compare-And-Swap,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值。CAS靠底层硬件实现,JVM通过Unsafe直接封装了汇编调用从而保证了原子性。

原子类进一步封装了常用操作,并保证所有操作都具有原子性(一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行)。

原子类常见使用

  1. JDK使用Lambda强制要求传入变量具有不可变性(引用不可变),若为单线程可以使用单元素数组代替,在争抢严重的情况下,使用原子类可以是一种简单有效的并发控制手段,如果对性能有严格要求可以进一步合理优化。
  2. CAS自旋锁,又被称为乐观锁,可以在一定程度上防止加锁并且保证原子更新。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 自增并返回新值
}
public int getCount() {
return count.get(); // 获取当前值
}
}

public class CounterThread extends Thread {
private final Counter counter;
public CounterThread(Counter counter) {
this.counter = counter;
}
public void run() {
for (int i = 0; i < 1000; i++) {
counter.increment();
}
}
}

public class Main {
public static void main(String[] args) {
Counter counter = new Counter();
CounterThread t1 = new CounterThread(counter);
CounterThread t2 = new CounterThread(counter);
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final count is: " + counter.getCount());
}
}

并发安全集合

ConcurrentLinkedQueue/Deque

基于链表的并发队列,提供了非阻塞的线程安全操作,这意味着在多线程环境中,线程可以安全地从队列中添加或移除元素,而不会出现线程阻塞的情况。

CopyOnWriteArrayList

线程安全的ArrayList,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。在不能或不想进行同步遍历,但又需要从并发线程中排除冲突时,它也很有用。

CopyOnWriteArraySet

线程安全的Set,底层使用CopyOnWriteArrayList。在add时,会调用addIfAbsent(),由于每次写时都要进行数组遍历,因此性能会比较低,但读操作非常快,因为不需要锁定。

ConcurrentSkipListSet

一个基于ConcurrentSkipListMap的线程安全Set,在并发环境下提供了较高的性能,特别是在有大量并发读写操作时。Set的元素可以根据它们的自然顺序进行排序,也可以根据创建Set时所提供的Comparator进行排序,具体取决于使用的构造方法。

ConcurrentSkipListMap

线程安全的TreeMap(有序),它基于跳表(Skip List)数据结构实现。这个类提供了一个可扩展的并发导航映射表实现,根据其键的自然排序或者在创建映射表时提供的比较器进行排序。适用于多线程环境中需要频繁进行搜索、插入和删除操作的场景。

ConcurrentHashMap

线程安全的HashMap(无序)。

ConcurrentHashMap在JDK 7之前是分段锁(Segment Locking)实现,分段锁可将共享资源分成多个独立的段,每个段使用独立的锁进行保护。这样,不同的线程可以同时访问不同的段,从而减少了锁竞争的范围,提高了并发性能。

自 JDK 8 更新以后,ConcurrentHashMap在内部实现上做了重大改进,以提高其在并发环境下的性能。

  1. 分段锁转为更细粒度的节点锁定:分段锁被移除,转而使用更细粒度的锁定策略,只有与特定键相关联的节点会被锁定,这减少了锁的范围并提高了并发性。
  2. CAS 操作:对于某些更新操作,ConcurrentHashMap使用了 CAS 操作,这是一种无锁的更新机制,可以在不阻塞其他线程的情况下安全地修改值。
  3. 树形化:在处理大量哈希冲突时,ConcurrentHashMap会将链表转换为红黑树,从而在极端情况下提高性能。

异步并发编程

换一个视角看问题

并发是一系列性能技术,专注于减少等待。 ——ON JAVA 8

Java 5引入的以同步阻塞为核心思想的一系列工具,虽然较好的保证了多线程的安全性,使得利用多线程平台的能力成为了可能。但这样是否让我们减少了等待呢?

答案显而易见,Java 5选择将工具全丢给程序员,让程序员自己解决问题,这无疑大大提高了并发编程的复杂性和难度。

Java 8的出现进一步解决了这个问题,Java 8在大量优化旧并发库的同时引入了lambda表达式,它的出现意味着Java在拥抱函数式编程的路上更近了一步。同时这意味着我们可以利用全新的视角来解决并发这个原本看起来非常复杂的问题。

通过使用lambda表达式和函数式编程的特性,可以更容易地编写简洁且可读性高的并发代码。程序员可以使用函数式接口和lambda表达式来定义并发任务,使用流式操作来处理数据集合,在并发编程中实现并行化和异步操作等。函数式编程可以让程序员更专注于问题的本质,而不是纠结于底层的线程同步和通信细节。

为什么推荐使用异步

虽然阻塞操作虽然使得一切看起来都是那么井然有序和协调,并且线程的个数并不受硬件限制,你的程序可以只有一个线程、也可以有成百上千个。操作系统会默默做好调度,让诸多线程共享有限的 CPU 时间片,这个调度过程是通过上下文切换实现的,操作系统利用软中断机制,把程序从任意位置打断,然后保存当前所有寄存器,整个过程会产生数微秒的开销。

这种切换带来了不少的代价,占用宝贵的 CPU 时间,切换的每个线程都会占用至少 1 页内存,这两个原因驱使我们尽可能避免创建太多的线程。C10K问题,在早期的Java企业级网络应用服务器中,连接独占线程或进程,线程/进程处理来自绑定连接的消息,在连接断开前不退也不做其他事情。当连接数逐渐增多时,线程/进程占用的资源和上下文切换成本会越来越大,这意味着在高并发情况下,如果我们使用传统的阻塞模型,当并发数越高时,性能越差。异步意味着没有阻塞,所有的线程资源都可以被最大程度的利用。

异步是声明式编程、响应式编程、函数式编程、面向方面编程的必要技术。和命令式编程不同,异步编程更优雅、可读性更强。尤其是对框架化的编程来说,异步编程可以让开发人员忽略不重要的细节,大大提高开发效率。如果异步函数是以闭包的形式提供的,还具有闭包自带的强大功能,比一般的同步函数在编程上要方便很多。

Java中的闭包通常是通过匿名内部类lambda表达式来实现的。这些结构允许你编写可以访问其外部作用域的变量的代码块,从而提供了类似闭包的行为。虽然Java中的闭包实现与其他语言如JavaScript或Python中的闭包有所不同,但它们仍然提供了将函数作为一等公民对待的能力,允许函数作为参数传递和返回值返回。

合理管理异步带来的复杂性

  1. 大多数情况下,使用异步会让程序流程走向变得复杂。
  2. 异步代码的调试难度通常远大于同步代码。如果异步函数使用了多线程技术,如果编程不当会导致子线程无声吞掉异步函数的异常,从而主线程无法捕获这个异常。
  3. 很多情况下异步编程复杂的引用和调用关系会带来各种风险,比如内存泄漏。开发人员需要根据自己的实际需要选择技术。

同步和异步并不是互斥的概念,它们可以结合使用。在复杂的应用程序中,可以使用同步并发和异步并发的组合来实现任务的最优调度和执行策略,以提高性能和响应能力。

常用异步编程风格

  1. 回调(Callbacks)通过定义回调函数(通常是一个闭包),将任务的执行结果或状态传递给相应的处理逻辑。在任务完成时,触发相应的回调函数进行后续处理

    Promise/Future是一种用于处理异步操作的对象,表示一个可能还未完成的值。通过Promise/Future对象,可以注册回调函数来处理异步操作的结果。

  2. 事件驱动(Event-driven Programming)基于事件和事件处理器的编程模型。任务通过触发事件,然后由相应的事件处理器处理。事件驱动编程通常使用事件循环来监听和分发事件。

    反应式(Reactive)最早源于函数式编程中的一种模式,Reactive 可以看作是对 Promise 的极大增强,相比 Promise,反应式引入了流(Flow)的概念。

  3. Continuation-passing style CPS 变换(Coroutine 与 async/await)函数不仅能从头运行,还能根据 Continuation 的指示继续某个点(比如调用 IO 的地方)运行,此时我们的函数已经不再是一个函数了,而是变成一个状态机。

  4. 用户态线程 用户态线程是把操作系统提供的线程机制完全抛弃,由于一切都在进程内部,切换代价要远远小于操作系统的上下文切换。

Future

Future表示了一个任务的生命周期,是一个可取消的异步运算,可以把它看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。同时,Future常用来封装Callable和Runnable,可以直接作为任务被提交到执行器中,并返回一个 Future对象来表示异步操作的结果。

FutureTask类是 Java 提供的一个实现了 RunnableFuture 接口的具体类。它可以用作一个可以执行的任务,并将计算结果封装在其中。

FutureTask通过NEW,RUNNING,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTED等状态的转换,确定任务的执行结果。同时使用volatile变量,CAS操作,LockSupport来保证多线程环境下的线程安全性。

CompletableFuture(Java 8)

CompletableFuture继承自Future并提供了一套强大的异步编程模型,它可以以链式的方式组合多个异步操作,支持任务间的串行化执行和组合,从而实现更复杂的异步流程控制,支持通过回调、组合、转换等方式处理异步操作的结果,同时提供了丰富的方法来手动完成一个异步操作或者触发异常。

CompletableFuture同样可以直接作为任务被提交到执行器中,以实现异步任务的并发执行。可以使用 supplyAsync()runAsync() 等方法将任务提交到指定的线程池中执行,并返回一个 CompletableFuture对象来表示异步操作的结果。

CompletableFuture内部使用一个FutureTask并对其进行代理和增强。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个CompletableFuture对象
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟长时间的计算任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
// 返回结果
return "Result of the asynchronous computation";
});
// 添加回调函数,在计算完成时被调用
future.thenAccept(result -> System.out.println("Computation returned: " + result));
// 等待异步操作结束
future.get();
}
}

线程池

池化带来的优点

  1. 资源重用:线程池可以重复利用预创建的线程,避免了线程的创建和销毁所带来的性能开销。
  2. 提高响应速度:当任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:线程池可以根据系统的承载能力调整线程池中线程的数量,避免了大量并发线程同时执行时对系统资源的消耗和竞争。
  4. 提供更多功能:线程池提供了定时执行、定期执行、单线程执行和并发数控制等功能,这些都是普通线程所不具备的。

ThreadPoolExecutor

当提交一个新任务时,ThreadPoolExecutor 会根据以下规则处理

  1. 如果线程池未满,无则创建新线程执行任务。
  2. 若线程池满,首先查看有无空闲线程,有则交给空闲线程,无则将任务加入等待队列。
  3. 若线程池与队列都满,则根据拒绝策略来处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务给线程池执行
for (int i = 0; i < 10; i++) {
Runnable worker = new MyRunnable("" + i);
executor.execute(worker);
}
// 关闭线程池
executor.shutdown();
while (!executor.isTerminated()) {
// 等待所有任务完成
}
System.out.println("所有任务已完成");
}
}

class MyRunnable implements Runnable {
private final String command;
MyRunnable(String s) {
this.command = s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始. 命令 = " + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " 结束.");
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承ThreadPoolExecutor,同时通过实现ScheduledExecutorSerivce来扩展基础线程池的功能,使其拥有了调度能力。其整个调度的核心在于内部类DelayedWorkQueue,一个延时工作队列(JDK8重新在内部实现了一个延时队列),它会通过每个任务按照距离下次执行时间间隔的大小来排序。

将任务封装成 ScheduledFutureTask 对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响,其继承自compareTo和getDelay。其中compareTo方法用于比较任务之间的优先级关系,如果距离下次执行的时间间隔较短,则优先级高。而getDelay方法用于返回距离下次任务执行时间的时间间隔。

ScheduledFutureTask继承自FutureTask,同样可以通过返回Future对象来获取执行的结果。

Flow

Flow API(Java 9)

在 Java 9 中,JSR-384 的 Flow API 被纳入标准库中,它是为了引入一套基于发布-订阅模型的反应式流处理 API 而提出的。该 API 旨在简化异步编程和反应式编程模型,提供了一组接口和类,用于处理流式数据以及数据的异步处理。该 API 包含了几个关键的类型,例如 Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅关系)和 Processor(处理器),用于实现基于事件的异步流处理。

Flow API 为开发者提供了一种便捷的方式来处理异步流数据,支持背压(backpressure)机制、错误处理等重要功能。它在反应式编程、事件驱动编程、流式处理等场景下非常有用,可以帮助开发者处理异步操作和高吞吐量的数据处理需求。

Java内部自带的HttpClient,内部的异步和就是基于响应式和Flow实现的。还有其他常用的响应式编程如Spring-flux,Vert.x,Quarkus框架带来了更好的响应式编程体验,但目前响应式目前对响应式流的实现细节各不相同,大家可以自己查询相关网站学习和了解。

Project Loom

Project Loom 是 OpenJDK 社区的一个尝试,旨在为 Java 引入轻量级并发构造。

它的主要目标是支持高吞吐量的轻量级并发模型。Project Loom 通过引入用户模式线程和延续(Continuation),提出了一种替代内核线程的方法。这个项目的核心是虚拟线程(Virtual Threads),它们被设计为比操作系统的内核线程更轻量,可以大幅提升并发应用的资源效率,同时保持与 Java 线程的向后兼容性。

  1. 虚拟线程,这些是轻量级的线程,可以以数百万计的规模存在,而不是传统的数千个线程限制。
  2. 结构化并发,提供了新的语言构造来更好地管理线程和任务。

Project Loom 通过引入runtime支持的Continuation结构,重写网络库并且提供java.lang.Thread的子类VitrualThread,做到了只要简单替换线程池实现就可以获得类似于go但是是协作式的用户态线程的能力,同时也给予了旧有代码升级最小化改动的帮助。

Thread::currentThread,LockSupport::park,LockSupport::unpark,Thread::sleep,也对此做了适配,这意味着我们那些基于J.U.C包的并发工具仍旧可以使用。

但是当前Loom不支持当Java代码调用本机代码(JNI)然后调用回Java,以及在一个synchronized块或方法中进行阻塞操作。在执行这些操作时依旧会将承载其的内核级线程一起阻塞。这意味着用户级线程在这些情况下,会本退化为内核级线程(如MySQL-connector目前是切换到虚拟线程的主要性能瓶颈)。

虚拟线程(Virtual Thread)(Java 21)

虚拟线程是Java 19引入的一种轻量级线程,它在很多其他语言中被称为协程、纤程、绿色线程、用户态线程等。虚拟线程可以高效执行IO密集型任务,因为它们在执行IO操作时能够被挂起,从而允许其他虚拟线程继续执行,这样就大大减少了线程的等待时间和上下文切换的成本。

1
2
3
4
5
6
7
8
9
10
// 创建并启动一个虚拟线程
Thread vt = Thread.startVirtualThread(() -> {
System.out.println("虚拟线程开始运行...");
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("虚拟线程结束运行。");
});

作用域值(ScopedValue)(预览)

ScopedValue与ThreadLocal类似,提供了范围内安全的共享变量。(Java JEP 429 还处于孵化器阶段,并没有被正式纳入 Java 语言规范)。

作用域值是一种新的语言结构,它允许声明一个只能在当前范围(extent)内访问的变量。一个范围是一个代码块或一个方法调用栈,它可以包含多个线程。作用域值只能被当前范围内的代码读取,不能被其他范围内的代码读取或修改。作用域值是不可变的,并且可以安全地在线程之间共享。

结构化并发(StructuredTaskScope)(预览)

结构化并发是一种编程范式,它旨在简化并发编程的复杂性。在这种模型中,程序的并发部分被组织成可以清晰管理和理解的结构。

  1. 并发单元管理,结构化并发允许开发者以结构化的方式创建和管理并发单元,如线程或任务。
  2. 错误处理,它提供了一种机制来集中处理并发操作中出现的错误。
  3. 资源管理,结构化并发有助于更好地管理资源,例如线程池,从而提高程序的效率和性能。
  4. 代码可读性,这种方法通过减少并发相关的样板代码,提高了代码的可读性和可维护性。

结构化并发是现代编程语言中越来越受欢迎的概念,因为它有助于开发者更容易地编写正确和高效的并发代码。

非结构化并发的代码:

1
2
3
4
5
6
7
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = executorService.submit(() -> findUser());
Future<Integer> order = executorService.submit(() -> fetchOrder());
String theUser = user.get(); // 连接findUser
int theOrder = order.get(); // 连接fetchOrder
return new Response(theUser, theOrder);
}

ExecutorService 中的子任务独立运行,可能成功或失败。

  1. 即使父任务被中断,中断也不会被传播到子任务,因此会造成泄漏。
  2. 它没有父子关系。由于父任务和子任务将出现在线程转储不相关的线程调用堆栈上,因此调试也变得困难。

尽管代码看起来具有逻辑结构,但这种结构只停留在开发人员的头脑中,而不是在执行过程中。所以,它们是非结构化的并发代码。

结构化并发的代码:

1
2
3
4
5
6
7
8
9
10
11
Response handle() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> user = scope.fork(() -> findUser());
Supplier<Integer> order = scope.fork(() -> fetchOrder());
scope.join() // Join both subtasks
.throwIfFailed(); // ... and propagate errors
// Here, both subtasks have succeeded, so compose their results
return new Response(user.get(), order.get());
}
//...
}

结构化并发带来了很多好处。

  1. 它为调用者方法及其子任务创建了一种父子关系。整个代码块变成了原子代码。
  2. 它通过线程转储中的任务层次结构来提供可观察性。
  3. 它还可以在错误处理中实现短路,如果其中一个子任务失败,其他未完成的任务将被取消。如果父任务的线程在 join()调用之前或期间被中断,两个分支将在作用域退出时自动取消。

这让并发代码的结构变得更加清晰,开发人员现在可以推理和跟踪代码,就好像它们是在单线程环境中运行。

并行

Fork/Join框架

Fork/Join框架(Java 7)是一个用于并行计算的框架,旨在通过任务的拆分和合并,高效利用多核处理器的并行能力,提高计算密集型任务的执行效率。它基于分治思想,可以自动地将一个大任务拆分成多个小任务并行执行,提高任务处理的效率和并发性能。

  1. 创建任务,在任务定义任务的执行过程,先fork()出所有子任务,最后join()子任务并等待并计算返回最终结果。计算过程中可以使用工作窃取算法保证线程安全和计算能力被充分利用。
  2. 提交任务到Fork/Join框架的线程池,这个线程池默认的线程数量是你的处理器核心数。

工作窃取算法,将任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。为了保证线程计算能力被充分利用,已经完成所有子任务的空闲线程,可以从其他队列尾部获取任务执行。

并行流

Java 并行流(Parallel Stream)与 Fork/Join 框架之间有着密切的关系。在Java 8中,引入了并行流的概念,它允许以并行方式处理流中的元素,以提高性能。当你使用并行流时,实际上是将流的任务分配给全局的ForkJoinPool线程池中执行。

Java 8将并行操作变得简单易用,不需要我们再手动Fork/Join任务,只需要将流转换为并行流就可以提高执行效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
public static void main(String[] args) {
// 创建一个字符串列表
List<String> strings = Arrays.asList("one", "two", "three", "four", "five");
// 使用并行流来转换字符串为大写
strings.parallelStream()
.map(String::toUpperCase)
.forEach(System.out::println);
}
}

但使用并行流时需要注意的是,虽然它可以提高性能,但并不是所有情况下都适用。

  1. 并行流的使用需要确保线程安全,因为它们可能会在多个线程中同时操作数据。如果处理过程中包含对外部的访问,请确保线程安全。
  2. 如果任务之间有依赖关系(等待)或者涉及到 I/O 操作(阻塞),使用并行流可能不会带来预期的性能提升。

避免竞争

线程私有变量(ThreadLocal)

用于在多线程环境下为每个线程提供独立的变量副本。它允许每个线程都拥有自己的变量副本,互不干扰,从而避免了线程安全性问题。

线程私有变量的一个常见用例是在多线程环境下存储线程相关的上下文信息,例如用户身份认证信息、数据库连接等。每个线程可以通过线程私有变量获取自己的上下文信息,而无需进行显式的同步或共享变量。

创建一个特定类型的线程私有变量对象,在需要设置变量的地方,使用set()方法设置当前线程的变量副本。在需要使用变量的地方,使用get()方法获取当前线程的变量副本。在使用完变量后,请保证使用remove()方法清除当前线程的变量副本。

需要注意的是,线程私有变量并不能解决所有线程安全性问题,它只是提供了一种线程封闭的机制,用于在多线程环境下隔离变量。在使用线程私有变量时,仍然需要注意线程安全性,并确保在适当的时机清除变量副本,避免内存泄漏。

不可变变量

优先使用final保证变量的不可变性。

  1. 避免副作用,在并发编程中,不可变对象自然是线程安全的,因为它们的状态不会改变。
  2. 使用final可以明确地表达出你的设计意图,即这个变量是不应该被改变的。
  3. 防止意外修改变量的值,从而减少因状态改变引起的潜在错误。
  4. 增加编译器优化,JIT会尝试内联final与lambda相关代码,进一步优化以提高JVM执行函数效率。

通过特殊处理以减少或避免阻塞

非阻塞IO

传统的IO操作(如读写文件或网络操作)通常是阻塞的。使用Java NIO(New IO)可以实现非阻塞IO,这样程序可以在等待IO操作完成时继续执行。

超时机制

为可能会阻塞的操作设置超时时间。如果操作在指定时间内没有完成,就自动取消或报错,从而避免程序无限期地等待。

定时轮询

轮询本身是一种非阻塞操作,因为它在等待某个事件或条件发生时不会阻塞程序的执行。轮询通常涉及定期检查或请求状态更新,而不是持续等待一个操作完成。这意味着程序可以继续执行其他任务,直到轮询操作再次发生。

常见于前后端通讯或内核异步IO,通过轮询或定时检查的方式来处理某个条件的变化。

批量处理

遇到可能阻塞或长时间等待的任务,将任务合并成批量任务进行处理,减少线程间的切换开销和同步开销。将批量执行任务异步执行,减少对主线程的阻塞影响。

Disruptor模型

Disruptor 是一个高性能的并发编程框架,通过使用环形缓冲区和事件驱动的方式,实现了无锁的并发操作。它适用于需要高吞吐量和低延迟的并发场景,如高性能计算和消息传递系统。

Actor模型

虽然 Java 标准库中没有原生的 Actor 模型支持,但可以使用第三方库(如 Akka、Vert.x 等)来实现 Actor 模型。Actor 模型通过消息传递的方式实现并发实体之间的通信和协作,提供了一种更高级别的并发编程抽象。

事件驱动

事件驱动架构是一种软件架构模式,它将系统的行为和功能抽象为一系列事件和响应,以实现高度可扩展性和灵活性。在现代大数据和人工智能领域,事件驱动架构已经成为主流的设计模式之一,因为它能够有效地处理大量实时数据和复杂的业务流程。

结语

不断发展的并发编程技术使得更加可靠快速的完成更多任务成为可能,一切的都是为了更快的速度,更优的性能和更好的安全性。

并发编程和所有的软件设计一样,天生具有其特殊的复杂性和困难性。从古至今,多少学者大师为之呕心沥血,多少平台和语言为其提交了自己更加完美的解决方案。

如今,我们站在巨人的肩膀上,眺望远方又是更美好的风景。