
新春临近,先住各位 V 友新年快乐~
传统的 ExecutorService、Future、CompletableFuture 非常强大,但写起来比较麻烦:
我一直在思考怎么能让 Javaer 用多线程的时候能简单点,少点弯弯绕绕,于是诞生了 ThreadForge 。
ThreadForge 的设计哲学很简单:先降低认知成本,再追求性能。
可以把它理解成一个结构化并发框架让你用写同步代码的思维写并发代码,同时自动处理那些容易遗漏的边界情况。
也可以把它理解成对于 Java 内置并发工具的二次包装,目标是让 Java 并发更简单、更清晰。
看一个最简单的例子:
try (ThreadScope scope = ThreadScope.open()) { Task<String> user = scope.submit("load-user", () -> fetchUser()); Task<Integer> orders = scope.submit("load-orders", () -> fetchOrders()); scope.await(user, orders); // 到这里,两个任务肯定都结束了(成功、失败或超时) String result = user.await() + ":" + orders.await(); } // scope 关闭时,所有任务自动取消、资源自动清理 这段代码有几个关键点:
ThreadScope 内,生命周期有边界,不会泄漏对比传统写法,你需要:
这里其实就能看出来 ThreadForge 的设计初衷和目标了,就是努力让我们省掉这些重复劳动,专注业务逻辑。
// 默认:FAIL_FAST + 30 秒超时 + 自动取消其他任务 try (ThreadScope scope = ThreadScope.open()) { Task<Integer> a = scope.submit(() -> riskyRpc()); Task<Integer> b = scope.submit(() -> anotherRpc()); scope.await(a, b); } catch (ScopeTimeoutException timeout) { // 超时了,所有任务已被自动取消 fallback(); } catch (FailurePropagationException failed) { // 某个任务失败了,其他任务已被自动取消 handleError(failed); } 不需要配置,不需要思考,开箱即用。
不同场景对失败的容忍度不同,ThreadForge 提供了 5 种明确的策略:
FAIL_FAST:快速失败,立即取消其他任务(默认)COLLECT_ALL:等所有任务结束,汇总所有失败SUPERVISOR:不自动取消,失败信息收集到 OutcomeCANCEL_OTHERS:失败后取消其余任务,但不抛异常IGNORE_ALL:忽略失败,只返回成功的结果// 场景:批量导入,即使部分失败也要知道哪些成功了 try (ThreadScope scope = ThreadScope.open() .withFailurePolicy(FailurePolicy.SUPERVISOR)) { List<Task<Void>> tasks = ids.stream() .map(id -> scope.submit(() -> importData(id))) .collect(toList()); Outcome outcome = scope.await(tasks); // 明确知道哪些成功、哪些失败 log.info("成功: {}, 失败: {}", outcome.successCount(), outcome.failureCount()); } // 场景:调用外部 API,最多同时 50 个请求 try (ThreadScope scope = ThreadScope.open() .withConcurrencyLimit(50)) { List<Task<Result>> tasks = hugeIdList.stream() .map(id -> scope.submit(() -> externalApi.call(id))) .collect(toList()); List<Result> results = scope.awaitAll(tasks); } // 自动限流,不会把外部服务打爆 不需要自己写信号量,也不需要手动分批,框架自动处理。
ThreadScope scope = ThreadScope.open() .withHook(new ThreadHook() { @Override public void onStart(TaskInfo info) { metrics.taskStarted(info.name()); } @Override public void onSuccess(TaskInfo info, Duration duration) { metrics.taskSuccess(info.name(), duration.toMillis()); } @Override public void onFailure(TaskInfo info, Throwable error, Duration duration) { log.error("Task {} failed after {}", info.name(), duration, error); metrics.taskFailed(info.name()); } }); 不需要在每个任务里重复写日志和监控代码,同时新的 1.0.2 版本中内置了 ScopeMetricsSnapshot 作为观测点,直接 .toString() 就能看到完整的调用耗时等情况 。
// 同一套 API try (ThreadScope scope = ThreadScope.open()) { // JDK 21+: 自动使用虚拟线程 // JDK 8-20: 自动降级到线程池 Task<String> task = scope.submit(() -> longRunningTask()); return task.await(); } 不需要分叉代码,不需要写 if-else,框架自动适配。
ThreadForge 特别适合这些场景:
并发 RPC 聚合
try (ThreadScope scope = ThreadScope.open()) { Task<User> user = scope.submit(() -> userService.get(uid)); Task<List<Order>> orders = scope.submit(() -> orderService.list(uid)); Task<Profile> profile = scope.submit(() -> profileService.get(uid)); scope.await(user, orders, profile); return buildResponse(user.await(), orders.await(), profile.await()); } 批量数据处理
try (ThreadScope scope = ThreadScope.open() .withConcurrencyLimit(100) .withDeadline(Duration.ofMinutes(5))) { List<Task<Void>> tasks = records.stream() .map(r -> scope.submit(() -> process(r))) .collect(toList()); scope.awaitAll(tasks); } 生产者-消费者模式
try (ThreadScope scope = ThreadScope.open()) { Channel<Data> channel = Channel.bounded(1000); scope.submit(() -> { for (Data d : datasource) { channel.send(d); } channel.close(); return null; }); List<Task<Void>> cOnsumers= IntStream.range(0, 4) .mapToObj(i -> scope.submit(() -> { for (Data d : channel) { process(d); } return null; })) .collect(toList()); scope.awaitAll(consumers); } Maven:
<dependency> <groupId>pub.lighting</groupId> <artifactId>threadforge-core</artifactId> <version>1.0.2</version> </dependency> Gradle:
implementation("pub.lighting:threadforge-core:1.0.2") 最小示例:
try (ThreadScope scope = ThreadScope.open()) { Task<String> task = scope.submit(() -> "Hello, ThreadForge"); System.out.println(task.await()); } GitHub: github.com/wuuJiawei/ThreadForge
文档: 见项目 docs/api/README.md
License: MIT
感谢所有看到这里的朋友。
JDK21 之后,官方团队也跟进了结构化并发类,可以称这个项目是又一个轮子,也可以称它是在工程化里面的一次探讨和另一种解决方案,毕竟给低版本的 JDK 也提供了可能性。
欢迎点赞、评论,如果有任何问题,也欢迎提出您的宝贵意见。
这是让即梦画的 logo ,看起来有点意思,像是个老派的项目。
1 cloudzhou 2 月 13 日 |
2 keepfun 2 月 13 日 good |
3 1ffree 2 月 13 日 有点意思 |
4 andforce 2 月 13 日 Kotlin 的协程,我已经很久没有手动写多线程了 |