前提
JDK19于2022-09-20发布GA版本,该版本提供了虚拟线程的预览功能。下载JDK19之后翻看了一下有关虚拟线程的一些源码,跟早些时候的Loom项目构建版本基本并没有很大出入,也跟第三方JDK如鹅厂的Kona虚拟线程实现方式基本一致,这里分析一下虚拟线程设计与源码实现。
Platform Thread与Virtual Thread
因为引入了虚拟线程,原来JDK存在java.lang.Thread类,俗称线程,为了更好地区分虚拟线程和原有的线程类,引入了一个全新类java.lang.VirtualThread(Thread类的一个子类型),直译过来就是"虚拟线程"。
题外话:在Loom项目早期规划里面,核心API其实命名为Fiber,直译过来就是"纤程"或者"协程",后来成为了废案,在一些历史提交的Test类或者文档中还能看到类似于下面的代码:
// java.lang.Fiber Fiber f = Fiber.execute({ out.println("Good morning"); readLock.lock(); try{ out.println("Good night"); } finally{ readLock.unlock(); } out.println("Good night"); });
Thread在此基础上做了不少兼容性工作。此外,还应用了建造者模式引入了线程建造器,提供了静态工厂方法Thread#ofPlatform()和Thread#ofVirtual()分别用于实例化Thread(工厂)建造器和VirtualThread(工厂)建造器,顾名思义,两种建造器分别用于创建Thread或者VirtualThread,例如:
// demo-1 build platform thread Thread platformThread = Thread.ofPlatform().daemon().name("worker").unstarted(runnable); // demo-2 create platform thread factory ThreadFactory platformThreadFactory = Thread.ofPlatform().daemon().name("worker-", 0).factory(); // demo-3 build virtual thread Thread virtualThread = Thread.ofVirtual().name("virtual-worker").unstarted(runnable); // demo-4 create virtual thread factory ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("virtual-worker-", 0).factory();
更新的JDK文档中也把原来的Thread称为Platform Thread,可以更明晰地与Virtual Thread区分开来。这里Platform Thread直译为"平台线程",其实就是"虚拟线程"出现之前的老生常谈的"线程"。
后文会把Platform Thread称为平台线程,Virtual Thread称为虚拟线程,或者直接用其英文名称
那么平台线程与虚拟线程的联系和区别是什么?JDK中的每个java.lang.Thread实例也就是每个平台线程实例都在底层操作系统线程上运行Java代码,并且平台线程在运行代码的整个生命周期内捕获系统线程。可以得出一个结论,平台线程与底层系统线程是一一对应的,平台线程实例本质是由系统内核的线程调度程序进行调度,并且平台线程的总数量受限于系统线程的总数量。
总的来说,平台线程有下面的一些特点或者说限制:
资源有限导致系统线程总量有限,进而导致与系统线程一一对应的平台线程有限
平台线程的调度依赖于系统的线程调度程序,当平台线程创建过多,会消耗大量资源用于处理线程上下文切换
每个平台线程都会开辟一块私有的栈空间,大量平台线程会占据大量内存
这些限制导致开发者不能极大量地创建平台线程,为了满足性能需要,需要引入池化技术、添加任务队列构建消费者-生产者模式等方案去让平台线程适配多变的现实场景。显然,开发者们迫切需要一种轻量级线程实现,刚好可以弥补上面提到的平台线程的限制,这种轻量级线程可以满足:
可以大量创建,例如十万级别、百万级别,而不会占据大量内存
由JVM进行调度和状态切换,并且与系统线程"松绑"
用法与原来平台线程差不多,或者说尽量兼容平台线程现存的API
Loom项目中开发的虚拟线程就是为了解决这个问题,看起来它的运行示意图如下:
当然,平台线程不是简单地与虚拟线程进行1:N的绑定,后面的章节会深入分析虚拟线程的运行原理。
虚拟线程实现原理
虚拟线程是一种轻量级(用户模式)线程,这种线程是由Java虚拟机调度,而不是操作系统。虚拟线程占用空间小,任务切换开销几乎可以忽略不计,因此可以极大量地创建和使用。总体来看,虚拟线程实现如下:
virtual thread = continuation + scheduler
虚拟线程会把任务(一般是java.lang.Runnable)包装到一个Continuation实例中:
当任务需要阻塞挂起的时候,会调用Continuation的yield操作进行阻塞
当任务需要解除阻塞继续执行的时候,Continuation会被继续执行
Scheduler也就是执行器,会把任务提交到一个载体线程池中执行:
执行器是java.util.concurrent.Executor的子类
虚拟线程框架提供了一个默认的ForkJoinPool用于执行虚拟线程任务
下文会把carrier thread称为"载体线程",指的是负责执行虚拟线程中任务的平台线程,或者说运行虚拟线程的平台线程称为它的载体线程
操作系统调度系统线程,而Java平台线程与系统线程一一映射,所以平台线程被操作系统调度,但是虚拟线程是由JVM调度。JVM把虚拟线程分配给平台线程的操作称为mount(挂载),反过来取消分配平台线程的操作称为unmount(卸载):
mount操作:虚拟线程挂载到平台线程,虚拟线程中包装的Continuation栈数据帧或者引用栈数据会被拷贝到平台线程的线程栈,这是一个从堆复制到栈的过程
unmount操作:虚拟线程从平台线程卸载,大多数虚拟线程中包装的Continuation栈数据帧会留在堆内存中
这个mount -> run -> unmount过程用伪代码表示如下:
mount(); try { Continuation.run(); } finally { unmount(); }
从Java代码的角度来看,虚拟线程和它的载体线程暂时共享一个OS线程实例这个事实是不可见,因为虚拟线程的堆栈跟踪和线程本地变量与平台线程是完全隔离的。JDK中专门是用了一个FIFO模式的ForkJoinPool作为虚拟线程的调度程序,从这个调度程序看虚拟线程任务的执行流程大致如下:
调度器(线程池)中的平台线程等待处理任务
一个虚拟线程被分配平台线程,该平台线程作为运载线程执行虚拟线程中的任务
虚拟线程运行其Continuation,从而执行基于Runnable包装的用户任务
虚拟线程任务执行完成,标记Continuation终结,标记虚拟线程为终结状态,清空一些上下文变量,运载线程"返还"到调度器(线程池)中作为平台线程等待处理下一个任务
上面是描述一般的虚拟线程任务执行情况,在执行任务时候首次调用Continuation#run()获取锁(ReentrantLock)的时候会触发Continuation的yield操作让出控制权,等待虚拟线程重新分配运载线程并且执行,见下面的代码:
public class VirtualThreadLock { public static void main(String[] args) throws Exception { ReentrantLock lock = new ReentrantLock(); Thread.startVirtualThread(() -> { lock.lock(); // <------ 这里确保锁已经被另一个虚拟线程持有 }); Thread.sleep(1000); Thread.startVirtualThread(() -> { System.out.println("first"); lock.lock(); try { System.out.println("second"); } finally { lock.unlock(); } System.out.println("third"); }); Thread.sleep(Long.MAX_VALUE); } }
虚拟线程中任务执行时候首次调用Continuation#run()执行了部分任务代码,然后尝试获取锁,会导致Continuation的yield操作让出控制权(任务切换),也就是unmount,运载线程栈数据会移动到Continuation栈的数据帧中,保存在堆内存,虚拟线程任务完成(但是虚拟线程没有终结,同时其Continuation也没有终结和释放),运载线程被释放到执行器中等待新的任务;如果Continuation的yield操作失败,则会对运载线程进行park调用,阻塞在运载线程上
当锁持有者释放锁之后,会唤醒虚拟线程获取锁(成功后),虚拟线程会重新进行mount,让虚拟线程任务再次执行,有可能是分配到另一个运载线程中执行,Continuation栈会的数据帧会被恢复到运载线程栈中,然后再次调用Continuation#run()恢复任务执行:
最终虚拟线程任务执行完成,标记Continuation终结,标记虚拟线程为终结状态,清空一些上下文变量,运载线程"返还"到调度器(线程池)中作为平台线程等待处理下一个任务
Continuation组件十分重要,它既是用户真实任务的包装器,也是任务切换虚拟线程与平台线程之间数据转移的一个句柄,它提供的yield操作可以实现任务上下文的中断和恢复。由于Continuation被封闭在java.base/jdk.internal.vm下,可以通过增加编译参数--add-exports java.base/jdk.internal.vm=ALL-UNNAMED暴露对应的功能,从而编写实验性案例,IDEA中可以按下图进行编译参数添加:
然后编写和运行下面的例子:
import jdk.internal.vm.Continuation; import jdk.internal.vm.ContinuationScope; public class ContinuationDemo { public static void main(String[] args) { ContinuationScope scope = new ContinuationScope("scope"); Continuation continuation = new Continuation(scope, () -> { System.out.println("Running before yield"); Continuation.yield(scope); System.out.println("Running after yield"); }); System.out.println("First run"); // 第一次执行Continuation.run continuation.run(); System.out.println("Second run"); // 第二次执行Continuation.run continuation.run(); System.out.println("Done"); } } // 运行代码,神奇的结果出现了 First run Running before yield Second run Running after yield Done
这里可以看出Continuation的奇妙之处,Continuation实例进行yield调用后,再次调用其run方法就可以从yield的调用之处往下执行,从而实现了程序的中断和恢复。
源码分析
主要包括:
Continuation
VirtualThread
线程建造器
Continuation
Continuation直译为"连续",一般来说表示一种语言构造,使语言可以在任意点保存执行状态并且在之后的某个点返回。在JDK中对应类jdk.internal.vm.Continuation,这个类只有一句类注释A one-shot delimited continuation,直译为一个只能执行一次的回调函数。由于Continuation的成员和方法缺少详细的注释,并且大部分功能由JVM实现,这里只能阅读其一些骨干源码和上一小节编写的Continuation相关例子去了解其实现(笔者C语言比较薄弱,有兴趣的可以翻阅JVM的源码)。先看成员变量和构造函数:
// 判断是否需要保留当前线程的本地缓存,由系统参数jdk.preserveExtentLocalCache决定 private static final boolean PRESERVE_EXTENT_LOCAL_CACHE; // 真正要被执行的任务实例 private final Runnable target; // 标识Continuation的范围, private final ContinuationScope scope; // Continuation的父节点,如果为空的时候则为本地线程栈 private Continuation parent; // Continuation的子节点,非空时候说明在子Continuation中进行了yield操作 private Continuation child; // 猜测为Continuation栈结构,由JVM管理,无法得知其真实作用 private StackChunk tail; // 标记Continuation是否已经完成 private boolean done; // 标记是否进行了mount操作 private volatile boolean mounted = false; // yield操作时候设置的信息 private Object yieldInfo; // 标记一个未挂载的Continuation是否通过强制抢占式卸载 private boolean preempted; // 保留当前线程的本地缓存的副本 private Object[] extentLocalCache; // 构造函数,要求传入范围和任务包装实例 public Continuation(ContinuationScope scope, Runnable target) { this.scope = scope; this.target = target; }
Continuation是一个双向链表设计,它的唯一一组构造参数是ContinuationScope和Runnable:
这里不深入研究内部StackChunk、Pinned等实现,直接看run、enter系列方法和yield方法:
// Continuation.run() public final void run() { // 设置死循环 while (true) { // 进行mount操作 mount(); JLA.setExtentLocalCache(extentLocalCache); // 如果Continuation已完成则抛出异常 if (done) throw new IllegalStateException("Continuation terminated"); // 获取当前虚拟线程分配的运载线程 Thread t = currentCarrierThread(); if (parent != null) { if (parent != JLA.getContinuation(t)) throw new IllegalStateException(); } else this.parent = JLA.getContinuation(t); // 运载线程设置当前Continuation实例 JLA.setContinuation(t, this); try { // 判断ContinuationScope是否虚拟线程范围 boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope()); if (!isStarted()) { // is this the first run? (at this point we know !done) // 激活enter系列方法,标记isContinue为false,标记是否虚拟线程范围 enterSpecial(this, false, isVirtualThread); } else { assert !isEmpty(); // 激活enter系列方法,标记isContinue为true,标记是否虚拟线程范围 enterSpecial(this, true, isVirtualThread); } } finally { // 设置内存屏障 fence(); try { assert isEmpty() == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this)); // 当前Continuation执行完成后,把运载线程的Continuation指向父Continuation JLA.setContinuation(currentCarrierThread(), this.parent); if (parent != null) parent.child = null; // 进行后置的yield清理工作 postYieldCleanup(); // 进行unmount操作 unmount(); // 判断是否需要保留当前线程的本地缓存并处理 if (PRESERVE_EXTENT_LOCAL_CACHE) { extentLocalCache = JLA.extentLocalCache(); } else { extentLocalCache = null; } JLA.setExtentLocalCache(null); } catch (Throwable e) { e.printStackTrace(); System.exit(1); } } // we're now in the parent continuation assert yieldInfo == null || yieldInfo instanceof ContinuationScope; // 父Continuation的yieldInfo缓存当前的scope实例,清空当前Continuation的父节点和yieldInfo if (yieldInfo == null || yieldInfo == scope) { this.parent = null; this.yieldInfo = null; // 这个位置是死循环的唯一跳出点 return; } else { // 执行到这个位置说明在当前是子Continuation并且进行了yield操作,需要跳转到父Continuation进行yield操作 parent.child = this; parent.yield0((ContinuationScope)yieldInfo, this); parent.child = null; } } } // Continuation.enter()系列方法 // 这是一个native方法,它最终会根据判断回调到enter()方法 private native static void enterSpecial(Continuation c, boolean isContinue, boolean isVirtualThread); // Continuation的入口方法,用户任务回调的入口 @DontInline @IntrinsicCandidate private static void enter(Continuation c, boolean isContinue) { // This method runs in the "entry frame". // A yield jumps to this method's caller as if returning from this method. try { c.enter0(); } finally { c.finish(); } } // 真正任务包装器执行的回调方法 private void enter0() { target.run(); } // Continuation完成,标记done为true private void finish() { done = true; assert isEmpty(); } // Continuation.yield()方法,静态方法 public static boolean yield(ContinuationScope scope) { // 获取当前运载线程的Continuation实例 Continuation cont = JLA.getContinuation(currentCarrierThread()); Continuation c; // 基于Continuation实例当前向父节点遍历,直到匹配虚拟线程类型的ContinuationScope的Continuation,如果没有匹配的Continuation会抛出异常中断流程 for (c = cont; c != null && c.scope != scope; c = c.parent) ; if (c == null) throw new IllegalStateException("Not in scope " + scope); // 把当前的Continuation挂起到给定的ContinuationScope return cont.yield0(scope, null); } // 透过上下文猜测是当前的Continuation实例挂起到给定的ContinuationScope private boolean yield0(ContinuationScope scope, Continuation child) { // 强制抢占式卸载标记为false preempted = false; // 如果当前Continuation实例的yieldInfo不等于传入的ContinuationScope实例,则进行更新,相等的情况下yieldInfo会保持是一个空值 if (scope != this.scope) this.yieldInfo = scope; // 最终的yield调用,最终当前Continuation就是阻塞在此方法,从下文源码猜测,当该方法唤醒后,res值为0的时候,当前Continuation实例会继续执行,返回其他值的时候则会打印pined线程栈 int res = doYield(); // 放置内存屏障防止指令重排,后面注释提到是防止编译器进行某些转换 U.storeFence(); // needed to prevent certain transformations by the compiler assert scope != this.scope || yieldInfo == null : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res; assert yieldInfo == null || scope == this.scope || yieldInfo instanceof Integer : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res; if (child != null) { // TODO: ugly <----- 这个位置还有一句吐槽的代码注释:丑陋的代码 if (res != 0) { child.yieldInfo = res; } else if (yieldInfo != null) { assert yieldInfo instanceof Integer; child.yieldInfo = yieldInfo; } else { child.yieldInfo = res; } this.yieldInfo = null; } else { if (res == 0 && yieldInfo != null) { res = (Integer)yieldInfo; } this.yieldInfo = null; if (res == 0) // Continuation实例继续执行前回调 onContinue(); else // Continuation固定在运载线程前回调,res是pined的级别 onPinned0(res); } assert yieldInfo == null; // 返回布尔值结果表示当前Continuation实例是否会继续执行 return res == 0; } // 最终的yield调用,看实现是抛出异常,猜测是由JVM实现 @IntrinsicCandidate private static int doYield() { throw new Error("Intrinsic not installed"); }
说实话,Continuation源码的可读性比想象中低,连代码注释也留下了"丑陋的"这句吐槽。通过上面源码分析和上一节Continuation的一个例子,可以得知Continuation#yield()可以让程序代码中断,然后再次调用Continuation#run()可以从上一个中断位置继续执行,JVM在这个过程中为使用者屏蔽了Continuation和运行此Continuation的平台线程之间的交互细节,让使用者可以专注实际的任务开发即可。
VirtualThread
前面花了不少篇幅介绍Continuation,它是一个全新的API。已有的JUC类库已经十分完善,如果可以把Continuation融入到已有的JUC体系,那么就可以通过线程池技术去管理运载线程,原有的大多数并发相关API也能直接在协程体系中使用。从这个背景来看,创造一个Thread类的全新子类用于融合JUC和Continuation是十分合适的,这样通过很小的改造成本就能通过Java继承特性把这个全新子类适配JUC体系,也能扩展一些API让它适配协程新引入的特性,这个全新的子类就是java.lang.VirtualThread:
VirtualThread类的继承体系如下:
package java.lang; final class VirtualThread extends BaseVirtualThread { // ... } package java.lang; sealed abstract class BaseVirtualThread extends Thread permits VirtualThread, ThreadBuilders.BoundVirtualThread { // ... }
VirtualThread是BaseVirtualThread的子类,而BaseVirtualThread是一个"密封类",它是Thread的子类,只对VirtualThread和ThreadBuilders.BoundVirtualThread开放,并且VirtualThread是包私有访问权限的同时用final关键字修饰,无法被继承。接着看VirtualThread的成员变量和构造函数:
// java.lang.VirtualThread // Unsafe实例 private static final Unsafe U = Unsafe.getUnsafe(); // 虚拟线程的ContinuationScope静态常量 private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads"); // 调度器,或者说执行器,默认就是用此调度器运行虚拟线程 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); // 调度线程池实例,用于唤醒带超时阻塞的虚拟线程实例,主要用于sleep的唤醒 private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler(); // pin模式,也就是pined thread的跟踪模式,决定打印堆栈的详细程度,来自于系统参数jdk.tracePinnedThreads,full表示详细,short表示简略 private static final int TRACE_PINNING_MODE = tracePinningMode(); // 下面几个都是成员地址,用于Unsafe直接操作成员 private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state"); private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit"); private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread"); private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination"); // 调度器实例 private final Executor scheduler; // Continuation实例 private final Continuation cont; // Continuation实例的Runnable包装实例 private final Runnable runContinuation; // 虚拟线程状态,这个值由JVM访问和修改 private volatile int state; // 下面的状态集合 private static final int NEW = 0; private static final int STARTED = 1; private static final int RUNNABLE = 2; // runnable-unmounted private static final int RUNNING = 3; // runnable-mounted private static final int PARKING = 4; private static final int PARKED = 5; // unmounted private static final int PINNED = 6; // mounted private static final int YIELDING = 7; // Thread.yield private static final int TERMINATED = 99; // final state // 虚拟线程unmount后可以从调度过程中挂起的状态 private static final int SUSPENDED = 1 << 8; private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED); private static final int PARKED_SUSPENDED = (PARKED | SUSPENDED); // park操作许可 private volatile boolean parkPermit; // 运载线程实例 private volatile Thread carrierThread; // 终结倒数栅栏实例,主要用于join操作 private volatile CountDownLatch termination; // 唯一构造函数 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { // 默认标记bound为false,当bound为true的时候标记为绑定到系统线程 super(name, characteristics, /*bound*/ false); Objects.requireNonNull(task); // 如果传入的调度器实例非空则直接使用 // 否则,如果父线程是虚拟线程,则使用父虚拟线程的调度器实例 // 如果传入的调度器实例为空,父线程为平台线程,那么使用默认的调度器 // choose scheduler if not specified if (scheduler == null) { Thread parent = Thread.currentThread(); if (parent instanceof VirtualThread vparent) { scheduler = vparent.scheduler; } else { scheduler = DEFAULT_SCHEDULER; } } // 赋值调度器 this.scheduler = scheduler; // 封装和初始化Continuation this.cont = new VThreadContinuation(this, task); // 初始化Continuation的Runnable包装器,最终提交到调度器中执行 this.runContinuation = this::runContinuation; } // 虚拟线程Continuation的专有子类,默认为ContinuationScope("VirtualThreads"),从而实现Continuation.enter()执行时候实际上执行的是VirtualThread.run()方法 // 也就是 Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task] private static class VThreadContinuation extends Continuation { VThreadContinuation(VirtualThread vthread, Runnable task) { super(VTHREAD_SCOPE, () -> vthread.run(task)); } // pin之前回调的方法,基于TRACE_PINNING_MODE的返回值决定pinned线程栈的打印详略 @Override protected void onPinned(Continuation.Pinned reason) { if (TRACE_PINNING_MODE > 0) { boolean printAll = (TRACE_PINNING_MODE == 1); PinnedThreadPrinter.printStackTrace(System.out, printAll); } } } // 在当前线程上运行或继续Continuation的执行,必须由平台线程运行此方法,最终会封装为Runnble包装器提交到执行器中运行 private void runContinuation() { // the carrier must be a platform thread if (Thread.currentThread().isVirtual()) { throw new WrongThreadException(); } // set state to RUNNING boolean firstRun; int initialState = state(); // 当前为STARTED状态并且CAS更新为RUNNING状态则标记首次运行为true if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { // first run firstRun = true; } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { // 当前为RUNNABLE状态并且CAS更新为RUNNING状态则标记首次运行为false,并且设置park许可为false // consume parking permit setParkPermit(false); firstRun = false; } else { // not runnable return; } // notify JVMTI before mount if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun); try { // 执行Continuation.run() cont.run(); } finally { // Continuation执行完成,回调钩子方法afterTerminate if (cont.isDone()) { afterTerminate(/*executed*/ true); } else { // Continuation没有执行完成,说明调用了Continuation.yield或者pin到运载线程中进行了park操作 afterYield(); } } } // Continuation执行完成回调的钩子方法 private void afterTerminate(boolean executed) { assert (state() == TERMINATED) && (carrierThread == null); if (executed) { if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true); } // 如果有其他线程阻塞等待虚拟线程的返回,例如调用了join方法,那么在这里解除阻塞 CountDownLatch termination = this.termination; if (termination != null) { assert termination.getCount() == 1; termination.countDown(); } // 如果执行成功则通知线程容器当前线程实例退出,清空线程本地变量引用 if (executed) { // notify container if thread executed threadContainer().onExit(this); // clear references to thread locals clearReferences(); } } // 由于Continuation的yield操作或者调用了Thread.yield()导致Continuation挂起,需要重新把Continuation的包装器"懒提交"到调度器中 private void afterYield() { int s = state(); assert (s == PARKING || s == YIELDING) && (carrierThread == null); // 如果是PARKING状态,这种对应于Continuation的yield操作调用 if (s == PARKING) { // 更变为PARKED状态 setState(PARKED); // notify JVMTI that unmount has completed, thread is parked if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); // 得到park许可,并且CAS为RUNNABLE状态 if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) { // 进行懒提交,如果可能的话,用当前线程作为运载线程继续执行任务 lazySubmitRunContinuation(); } } else if (s == YIELDING) { // 如果是YIELDING状态,这种对应于调用了Thread.yield // 更变为RUNNABLE状态 setState(RUNNABLE); // notify JVMTI that unmount has completed, thread is runnable if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false); // 进行懒提交,如果可能的话,用当前线程作为运载线程继续执行任 lazySubmitRunContinuation(); } }
这里唯一的构造函数是比较复杂的,抛开一些钩子接口,最终想达到的效果就是:
Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]
用户任务实际被包裹了很多层,在最里面一层才会回调。VirtualThread中提供了两个静态全局的线程池实例,一个用于调度,一个用于唤醒,这里看看两个线程池是如何构造的:
// java.lang.VirtualThread private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler(); private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler(); // 创建默认的调度器 private static ForkJoinPool createDefaultScheduler() { // 线程工厂,默认创建CarrierThread实例,CarrierThread是ForkJoinWorkerThread的一个子类 ForkJoinWorkerThreadFactory factory = pool -> { PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool); return AccessController.doPrivileged(pa); }; PrivilegedAction<ForkJoinPool> pa = () -> { int parallelism, maxPoolSize, minRunnable; String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism"); String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize"); String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable"); if (parallelismValue != null) { parallelism = Integer.parseInt(parallelismValue); } else { parallelism = Runtime.getRuntime().availableProcessors(); } if (maxPoolSizeValue != null) { maxPoolSize = Integer.parseInt(maxPoolSizeValue); parallelism = Integer.min(parallelism, maxPoolSize); } else { maxPoolSize = Integer.max(parallelism, 256); } if (minRunnableValue != null) { minRunnable = Integer.parseInt(minRunnableValue); } else { minRunnable = Integer.max(parallelism / 2, 1); } Thread.UncaughtExceptionHandler handler = (t, e) -> { }; boolean asyncMode = true; // FIFO return new ForkJoinPool(parallelism, factory, handler, asyncMode, 0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS); }; return AccessController.doPrivileged(pa); } // 创建调度线程池,用于虚拟线程带超时时间的unpark操作 private static ScheduledExecutorService createDelayedTaskScheduler() { String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize"); int poolSize; if (propValue != null) { poolSize = Integer.parseInt(propValue); } else { // 确保至少有一个工作线程 poolSize = 1; } ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(poolSize, task -> { return InnocuousThread.newThread("VirtualThread-unparker", task); }); // 任务取消后马上从工作队列移除 stpe.setRemoveOnCancelPolicy(true); return stpe; }
对于默认调度器(DEFAULT_SCHEDULER)的创建,它是一个ForkJoinPool实例,构造参数的选取如下:
parallelism参数由系统变量jdk.virtualThreadScheduler.parallelism决定,默认值为Runtime.getRuntime().availableProcessors(),如果配置了系统参数jdk.virtualThreadScheduler.maxPoolSize则取min(parallelism,maxPoolSize)
maxPoolSize参数由系统变量jdk.virtualThreadScheduler.maxPoolSize决定,默认值为min(parallelism, maxPoolSize)
minRunnable参数由系统变量jdk.virtualThreadScheduler.minRunnable决定,默认值为max(parallelism / 2, 1)
asyncMode参数固定值true,也就是选用FIFO模式
keepAliveTime参数为固定值30秒
saturate参数在JDK17引入,是一个Predicate函数,在此固定返回true,用于忽略minRunnable值允许线程池饱和
线程工厂用于创建CarrierThread实例,CarrierThread是ForkJoinWorkerThread的子类
在Intel 4C8T开发机器环境中,该ForkJoinPool实例创建时候的几个参数分别为:parallelism = 8, maxPoolSize = 256, minRunnable = 4。
对于调度线程池(UNPARKER)的创建,它是一个ScheduledThreadPoolExecutor实例,构造参数的选取如下:
corePoolSize参数由系统变量jdk.unparker.maxPoolSize决定,并且确保最小值为1
线程工厂用于创建InnocuousThread实例,线程名称为VirtualThread-unparker
接着看虚拟线程的启动方法start():
// java.lang.VirtualThread @Override public void start() { start(ThreadContainers.root()); } // 调度虚拟线程让之运行 @Override void start(ThreadContainer container) { // CAS由NEW转换为STARTED状态 if (!compareAndSetState(NEW, STARTED)) { throw new IllegalThreadStateException("Already started"); } // 绑定当前虚拟线程到线程容器 setThreadContainer(container); // 标记为未启动 boolean started = false; // 回调start钩子方法 container.onStart(this); // may throw try { // 从给定容器继承extent-local绑定参数 inheritExtentLocalBindings(container); // 提交'runContinuation'任务到调度器 submitRunContinuation(); // 标记为启动完成 started = true; } finally { // 如果启动失败,则标记最终状态和回调终结钩子方法 if (!started) { setState(TERMINATED); container.onExit(this); afterTerminate(/*executed*/ false); } } } // 提交'runContinuation'任务到调度器 private void submitRunContinuation() { submitRunContinuation(false); } // 提交'runContinuation'任务到调度器,lazySubmit参数决定是否"懒提交" private void submitRunContinuation(boolean lazySubmit) { try { if (lazySubmit && scheduler instanceof ForkJoinPool pool) { // ForkJoinPool类型调度器并且lazySubmit为true,对runContinuation这个Runnable实例适配为ForkJoinTask类型,进行"懒提交"到ForkJoinPool pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); } else { // 非ForkJoinPool类型调度器或者lazySubmit为false,直接使用Executor.execute()提交任务 scheduler.execute(runContinuation); } } catch (RejectedExecutionException ree) { // 线程池拒绝接收任务,发布提交失败事件到JVM var event = new VirtualThreadSubmitFailedEvent(); if (event.isEnabled()) { event.javaThreadId = threadId(); event.exceptionMessage = ree.getMessage(); event.commit(); } throw ree; } }
ForkJoinPool#lazySubmit()是JDK19新增的一个API,它的方法注释如下:
提交给定的任务,但不保证它最终会在没有可用活动线程的情况下执行。在某些上下文中,这种方法可以通过依赖于特定于上下文的知识来减少竞争和开销,即现有线程(如果在此池中操作,则可能包括调用线程)最终将可用来执行任务
使用此方法提交的目的就是希望可以用当前调用线程去执行任务,对于首次提交Continuation任务可能作用不明显,但是对于Continuation.yield()调用后的再次提交意义比较重大,因为这样就可以把运行的Continuation.run()方法链分配到同一个运载线程实例,在开发者的角度就是虚拟线程任务执行中断后恢复执行,执行任务的运载线程没有改变。
源码中还可以发现,run()方法覆盖了Thread#run()替换为空实现,因为VirtualThread最终是触发Continuation#run(),这一点已经在start()方法进行提交和调度。最后分析虚拟线程的阻塞(不带超时,也就是timeout = 0)、限时阻塞(timeout > 0)、join的实现。先看相对简单的joinNanos():
// java.lang.VirtualThread // Thread.join() --> VirtualThread.joinNanos() // 虚拟线程join调用 boolean joinNanos(long nanos) throws InterruptedException { // 如果状态为TERMINATED直接返回true if (state() == TERMINATED) return true; // 获取数栅栏实例 CountDownLatch termination = getTermination(); // 再次验证如果状态为TERMINATED直接返回true if (state() == TERMINATED) return true; // 如果nanos为0则调用CountDownLatch.await()阻塞 if (nanos == 0) { termination.await(); } else { // 如果nanos大于0则调用CountDownLatch.await(nanos,TimeUnit)限时阻塞 boolean terminated = termination.await(nanos, NANOSECONDS); if (!terminated) { // 阻塞到超时时限过了返回,非解除阻塞下的正常返回 return false; } } assert state() == TERMINATED; // 解除阻塞下的正常返回 return true; } // 懒创建终结倒数栅栏实例,设置资源值为1,这里用到CAS是考虑之前已经创建和保存到成员变量,如果已创建则直接选用成员变量的那个实例 private CountDownLatch getTermination() { CountDownLatch termination = this.termination; if (termination == null) { termination = new CountDownLatch(1); if (!U.compareAndSetReference(this, TERMINATION, null, termination)) { termination = this.termination; } } return termination; }
接着看虚拟线程阻塞和限时阻塞的现实:
// java.lang.VirtualThread // Thread.sleep() --> VirtualThread.sleepNanos() // 给定休眠时间让当前虚拟线程休眠 void sleepNanos(long nanos) throws InterruptedException { assert Thread.currentThread() == this; // n标签:
留言评论