自己尝试实现了,但是结果不是自己想要的,而且如果多个异步轮询( A 轮询 -> B 轮询 -> C 轮询),可能就会使代码变得嵌套层级多,以下是实现的具体代码,有大佬指教指教么?
import { from, Observable, Subscriber } from 'rxjs'; import { delay, mapTo, repeatWhen } from 'rxjs/operators'; interface RetryOptions<T = any, P = any> { try: (tryRequest: P) => Promise<T>; tryRequest: P; retryUntil: (response: T) => boolean; maxTimes?: number; tick?: number; } export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => { optiOns= Object.assign( { maxTimes: 20, tick: 1000 }, options ); let result = null; const notifier = () => { // 计数最大尝试次数 let count = 0; const loop = (producer: Subscriber<any>) => { // 超过最大次数强制退出轮询 if (count >= options.maxTimes) { producer.complete(); } else { options .try(options.tryRequest) .then(res => { producer.next(count++); // 满足条件则退出轮询 if (options.retryUntil(res)) { producer.complete(); } else { // 不满足条件则继续轮询 loop(producer); } // 保存请求结果 result = res; }) .catch(err => { producer.error(err); }); } }; return new Observable(producer => { loop(producer); }); }; return from([0]).pipe( delay(options.tick), // 当满足条件是,进行一下轮轮询 repeatWhen(notifier), // 转换结果 mapTo(() => result) ); };
import { polling } from './polling'; let count = 0; const mockRequest = (): Promise<string> => { return new Promise((resolve, reject) => { setTimeout(() => { if (count < 6) { resolve('pending'); } else { resolve('finish'); } count++; }, 1000); }); }; polling<string, number>({ try: mockRequest, tryRequest: count, retryUntil: res => { return res === 'finish'; } }).subscribe((response) => { const result = response(); console.log(result); if (result === 'finish') { console.log('轮询结束'); } // 这个轮询结束后应该怎么继续轮询比较好? // 继续在这里 polling 下一个轮询吗?容易回调地狱啊 });
null pending pending pending pending pending pending finish // 上面的都不输出,只输出最后一个结果,因为上面的我并不关注 轮询结束
![]() | 1 leemove 2019-08-23 16:50:31 +08:00 是不是有点把问题复杂化了. |
![]() | 3 momocraft 2019-08-23 16:54:29 +08:00 concatMap ? |
4 b1anker OP 发现有个 last 操作符,可以解决最后只输出最终结果 |
![]() | 5 leemove 2019-08-23 17:48:30 +08:00 楼主可以看看我写的这个简单 demo 其实 rxjs 的操作符还是很强大的,而且 rxjs 的重试操作符是很强的.请求被我简化了,最大重试次数的逻辑没加. 地址: https://stackblitz.com/edit/rxjs-playground-test |
![]() | 6 leemove 2019-08-23 17:50:14 +08:00 @leemove 不好意思上一条地址发错了 正确地址: https://stackblitz.com/edit/rxjs-playground-test-t8quzt |
![]() | 7 wawaforya 2019-08-23 18:34:49 +08:00 献丑了,有什么错误请轻拍 ``` typescript import { Observable, of, race, timer } from 'rxjs'; import { concatMapTo, skipWhile, take, tap } from 'rxjs/operators'; type Result = 'pending' | 'success'; const getResult: () => Observable<Result> = () => of<Result>('success').pipe(tap(() => console.log('Requested.'))); // 向接口请求数据的函数 const limit = 20; const schedule = timer(0, 1000); const requestSource = schedule.pipe(concatMapTo(getResult()), skipWhile(result => result === 'pending')); const upperBound = schedule.pipe(skipWhile(value => value < limit)); race([requestSource, upperBound]).pipe( take(1) ).subscribe( result => console.log(result), // 这里需要判断类型,如果是数字,说明 20 次了都还没有 success error => console.error(error), () => console.log('Completed') ); ``` |
![]() | 8 wawaforya 2019-08-23 18:44:41 +08:00 #7 好像不会生成一个新的请求,要把 `concatMapTo(getResult())` 改成 `concatMap(() => getResult())` 哈哈 |
![]() | 10 b1anker OP @wawaforya 你这个 timer,其实跟 interval 差不多,其实我一开始也是这么搞得,但是得考虑一种情况,有可能请求完成超过 1s,这样子就不好控制了 |
![]() | 11 ibufu 2019-08-23 19:46:30 +08:00 via Android 所以关键点是解决竞态。你 google 一下 rxjs 竞态,应该能搜到很多。 |
![]() | 12 b1anker OP |