
方法client.downloadFile会不断发射FilePart,需要collec写入到文件中即可。
在接收FilePart期间会有网络等其他异常,现在直接用onErrorResume从 offset 开始请求返回新的 Flux 会有一个问题。 第一次异常会进入onErrorResume返回新的 Flux ,由于新的 Flux 没有声明onErrorResume就噶了
我也不可能在新的 Flux 里声明onErrorResume,无限套娃了属于是。
client.downloadFile(fileReferenceId) .publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor())) .timeout(Duration.ofMinutes(3)) .onErrorResume(RpcException::class.java) { if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) { log.warn("Download timeout, resuming: $fileDownloadPath") return@onErrorResume client.downloadFile( fileReferenceId, monitoredChannel.getDownloadedBytes(), MAX_FILE_PART_SIZE, true ) } Flux.error(it) } .collect({ monitoredChannel }, { fc, filePart -> fc.write(filePart.bytes.nioBuffer()) }) .doOnSuccess { tempDownloadPath.moveTo(fileDownloadPath) downloadCounting.incrementAndGet() log.info("Downloaded file: $fileDownloadPath") } .doOnError { log.error("Error downloading file:$fileDownloadPath", it) } .onErrorMap { wrapRetryableExceptionIfNeeded(it) } .doFinally { runCatching { closePath(fileDownloadPath) }.onFailure { log.error("Error closing file channel", it) } hashingPathMapping.remove(hashing) } .block() 1 guyeu 2023 年 11 月 14 日 via iPhone retry 操作符? |
3 yuhongtai114514 2023 年 11 月 15 日 把 flux 中的动作先用操作符转成 mono ,然后把 retry 挂在 mono 上试试? |