自荐 Java 多线程神器ThreadForge ,让多线程从此简单 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
HeyHudy
V2EX    Java

自荐 Java 多线程神器ThreadForge ,让多线程从此简单

  •  
  •   HeyHudy 2 月 13 日 2738 次点击
    这是一个创建于 69 天前的主题,其中的信息可能已经有所发展或是发生改变。

    新春临近,先住各位 V 友新年快乐~

    从场景切入

    传统的 ExecutorServiceFutureCompletableFuture 非常强大,但写起来比较麻烦:

    • 线程池要手动创建和关闭
    • 超时逻辑每个任务都要写一遍
    • 失败了要不要取消其他任务?得自己判断
    • 异常怎么传播?要么吞掉,要么手动包装
    • 想知道任务跑了多久?自己打日志

    我一直在思考怎么能让 Javaer 用多线程的时候能简单点,少点弯弯绕绕,于是诞生了 ThreadForge 。

    ThreadForge:把复杂度收敛到一个可推理的模型里

    ThreadForge 的设计哲学很简单:先降低认知成本,再追求性能。

    可以把它理解成一个结构化并发框架让你用写同步代码的思维写并发代码,同时自动处理那些容易遗漏的边界情况。

    也可以把它理解成对于 Java 内置并发工具的二次包装,目标是让 Java 并发更简单、更清晰。

    什么是结构化?

    看一个最简单的例子:

    try (ThreadScope scope = ThreadScope.open()) { Task<String> user = scope.submit("load-user", () -> fetchUser()); Task<Integer> orders = scope.submit("load-orders", () -> fetchOrders()); scope.await(user, orders); // 到这里,两个任务肯定都结束了(成功、失败或超时) String result = user.await() + ":" + orders.await(); } // scope 关闭时,所有任务自动取消、资源自动清理 

    这段代码有几个关键点:

    1. 所有任务都绑定在 ThreadScope,生命周期有边界,不会泄漏
    2. 默认就是安全的:默认超时、默认失败传播、自动取消
    3. 代码结构就是任务关系:读代码的人一眼就能看出两个任务是并发的,且必须都完成才能继续

    对比传统写法,你需要:

    • 创建线程池,配置核心线程数、队列大小
    • 提交任务,手动处理 Future
    • 写 try-finally 确保 shutdown
    • 手动处理超时和异常传播

    这里其实就能看出来 ThreadForge 的设计初衷和目标了,就是努力让我们省掉这些重复劳动,专注业务逻辑。

    五个让你省脑力的设计

    1. 默认行为就是正确的

    // 默认:FAIL_FAST + 30 秒超时 + 自动取消其他任务 try (ThreadScope scope = ThreadScope.open()) { Task<Integer> a = scope.submit(() -> riskyRpc()); Task<Integer> b = scope.submit(() -> anotherRpc()); scope.await(a, b); } catch (ScopeTimeoutException timeout) { // 超时了,所有任务已被自动取消 fallback(); } catch (FailurePropagationException failed) { // 某个任务失败了,其他任务已被自动取消 handleError(failed); } 

    不需要配置,不需要思考,开箱即用。

    2. 失败策略明确且统一

    不同场景对失败的容忍度不同,ThreadForge 提供了 5 种明确的策略:

    • FAIL_FAST:快速失败,立即取消其他任务(默认)
    • COLLECT_ALL:等所有任务结束,汇总所有失败
    • SUPERVISOR:不自动取消,失败信息收集到 Outcome
    • CANCEL_OTHERS:失败后取消其余任务,但不抛异常
    • IGNORE_ALL:忽略失败,只返回成功的结果
    // 场景:批量导入,即使部分失败也要知道哪些成功了 try (ThreadScope scope = ThreadScope.open() .withFailurePolicy(FailurePolicy.SUPERVISOR)) { List<Task<Void>> tasks = ids.stream() .map(id -> scope.submit(() -> importData(id))) .collect(toList()); Outcome outcome = scope.await(tasks); // 明确知道哪些成功、哪些失败 log.info("成功: {}, 失败: {}", outcome.successCount(), outcome.failureCount()); } 

    3. 并发度控制不再需要手动管理队列

    // 场景:调用外部 API,最多同时 50 个请求 try (ThreadScope scope = ThreadScope.open() .withConcurrencyLimit(50)) { List<Task<Result>> tasks = hugeIdList.stream() .map(id -> scope.submit(() -> externalApi.call(id))) .collect(toList()); List<Result> results = scope.awaitAll(tasks); } // 自动限流,不会把外部服务打爆 

    不需要自己写信号量,也不需要手动分批,框架自动处理。

    4. 生命周期观测统一收口

    ThreadScope scope = ThreadScope.open() .withHook(new ThreadHook() { @Override public void onStart(TaskInfo info) { metrics.taskStarted(info.name()); } @Override public void onSuccess(TaskInfo info, Duration duration) { metrics.taskSuccess(info.name(), duration.toMillis()); } @Override public void onFailure(TaskInfo info, Throwable error, Duration duration) { log.error("Task {} failed after {}", info.name(), duration, error); metrics.taskFailed(info.name()); } }); 

    不需要在每个任务里重复写日志和监控代码,同时新的 1.0.2 版本中内置了 ScopeMetricsSnapshot 作为观测点,直接 .toString() 就能看到完整的调用耗时等情况 。

    5. 跨 JDK 版本的一致体验

    // 同一套 API try (ThreadScope scope = ThreadScope.open()) { // JDK 21+: 自动使用虚拟线程 // JDK 8-20: 自动降级到线程池 Task<String> task = scope.submit(() -> longRunningTask()); return task.await(); } 

    不需要分叉代码,不需要写 if-else,框架自动适配。

    适用场景

    ThreadForge 特别适合这些场景:

    并发 RPC 聚合

    try (ThreadScope scope = ThreadScope.open()) { Task<User> user = scope.submit(() -> userService.get(uid)); Task<List<Order>> orders = scope.submit(() -> orderService.list(uid)); Task<Profile> profile = scope.submit(() -> profileService.get(uid)); scope.await(user, orders, profile); return buildResponse(user.await(), orders.await(), profile.await()); } 

    批量数据处理

    try (ThreadScope scope = ThreadScope.open() .withConcurrencyLimit(100) .withDeadline(Duration.ofMinutes(5))) { List<Task<Void>> tasks = records.stream() .map(r -> scope.submit(() -> process(r))) .collect(toList()); scope.awaitAll(tasks); } 

    生产者-消费者模式

    try (ThreadScope scope = ThreadScope.open()) { Channel<Data> channel = Channel.bounded(1000); scope.submit(() -> { for (Data d : datasource) { channel.send(d); } channel.close(); return null; }); List<Task<Void>> cOnsumers= IntStream.range(0, 4) .mapToObj(i -> scope.submit(() -> { for (Data d : channel) { process(d); } return null; })) .collect(toList()); scope.awaitAll(consumers); } 

    开始使用

    Maven:

    <dependency> <groupId>pub.lighting</groupId> <artifactId>threadforge-core</artifactId> <version>1.0.2</version> </dependency> 

    Gradle:

    implementation("pub.lighting:threadforge-core:1.0.2") 

    最小示例:

    try (ThreadScope scope = ThreadScope.open()) { Task<String> task = scope.submit(() -> "Hello, ThreadForge"); System.out.println(task.await()); } 

    GitHub: github.com/wuuJiawei/ThreadForge
    文档: 见项目 docs/api/README.md
    License: MIT

    最后

    感谢所有看到这里的朋友。

    JDK21 之后,官方团队也跟进了结构化并发类,可以称这个项目是又一个轮子,也可以称它是在工程化里面的一次探讨和另一种解决方案,毕竟给低版本的 JDK 也提供了可能性。

    欢迎点赞、评论,如果有任何问题,也欢迎提出您的宝贵意见。

    jimeng-2026-02-10-3757-「 ThreadForge 」 这是我新开发的开源项目,帮我做一个 logo 。Java....png

    这是让即梦画的 logo ,看起来有点意思,像是个老派的项目。

    4 条回复    2026-02-13 11:45:58 +08:00
    cloudzhou
        1
    cloudzhou  
       2 月 13 日
    大同小异,但是最终要走向 Java 结构化并发类,如果真要实现,那么就基于 Java 结构化并发类二次开发

    这是我以前的理念:
    t/916816#r_12705296
    keepfun
        2
    keepfun  
       2 月 13 日
    good
    1ffree
        3
    1ffree  
       2 月 13 日
    有点意思
    andforce
        4
    andforce  
       2 月 13 日
    Kotlin 的协程,我已经很久没有手动写多线程了
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5646 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 67ms UTC 01:44 PVG 09:44 LAX 18:44 JFK 21:44
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86