请教一个 Java 多线程嵌套使用的问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
season8
V2EX    问与答

请教一个 Java 多线程嵌套使用的问题

  •  
  •   season8 2020-11-05 00:26:47 +08:00 4402 次点击
    这是一个创建于 1837 天前的主题,其中的信息可能已经有所发展或是发生改变。

    同事碰到一个问题,我写了个 demo 复现,研究了好几天还是没头绪,多线程程场景也没有调试思路,干脆发个帖,想看看有没有大佬可以指点一二。

    模拟场景:三个消费组消费异步消费,每组有三个任务,任务之间异步执行,但必须都执行完毕后消费组才算结束。 设计上,消费组线程池给 3 个线程,控制每次只有三个组能消费。 任务线程池给的是大于 3*3,按我的理解是,外层 3 个消费组,每组三个任务,实时任务应该不会超过 9 个。

    但程序执行一会儿就发现会有消费组批量涌入,导致里层线程池触发 reject 。

    Demo 如下:

    public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor outter = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(197)); ThreadPoolExecutor inner = new ThreadPoolExecutor(9, 12, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1)); for (int i = 0; i < 200; i++) { int group = i; outter.execute(() -> { System.out.println("开始第 " +group+" 组消费"); CountDownLatch countDownLatch = new CountDownLatch(3); for (int j = 0; j < 3; j++) { int task = j; inner.execute(() -> { System.out.println(group + "--消费数据:" + task); countDownLatch.countDown(); }); } try { countDownLatch.await(); System.out.println(group + "--消费完成"); } catch (InterruptedException e) { e.printStackTrace(); } }); } } 
    第 1 条附言    2020-11-06 23:25:47 +08:00

    晚上从头到尾读了一下ThreadPoolExecutor源码,从注释开始读起,对线程池有了一个更加明确的认识,同时也发现昨天的理解任然有些不对。

    昨天提到的 execute 中的 这段:

     if (workerCountOf(c) < corePoolSize) { @1 if (addWorker(command, true)) @2 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { @3 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) @4 reject(command); else if (workerCountOf(recheck) == 0) @5 addWorker(null, false); } 
    • @1 这里workerCountOf 就是拿当前的工作线程数(不是active 线程数,见原话:The workerCount is the number of workers that have been permitted to start and not permitted to stop),当小于核心线程数,走@2
    • @2 直接增加worker,worker是需要分配线程的,这里第二个参数给的true表示分配的线程作为核心线程。
    • @3 这里的isRunning不是判断线程是否在执行,而是判断线程池状态,Running状态可以添加worker和queue(这里由于已经不满足@1,所以是入队列)
    • @4 double check ,加完worker发现线程池状态变了,吐出来。。
    • @5 double check 发现线程池状态是running ,且 没有工作线程(corePoolSize可为0),直接加worder,第二个参数为false,表示额度上限为maximumPoolSize。

    还有很多细节,并为理解的很透彻,这里结论也写的比较简洁和枯燥,如有错误的还请大佬不吝指正。

    13 条回复    2020-11-06 00:11:04 +08:00
    micean
        1
    micean  
       2020-11-05 02:21:44 +08:00
    个人猜测
    countDownLatch.countDown() 之后,outter 完成了这次任务并开始下一个了,但是 inner 还没有完成,队列塞不下
    把 inner 的队列从 1 调高一些就行了
    season8
        2
    season8  
    OP
       2020-11-05 09:23:24 +08:00
    @micean
    countDownLatch 不是最后一个 inner 线程执行完成后唤醒 outter 线程吗?那 outter 线程结束应该就意味着有三个 inner 结束。

    而且,尝试过,countDownLatch.await();之后 sleep,也是存在这个问题
    Vedar
        3
    Vedar  
       2020-11-05 09:27:07 +08:00
    你这个 outer 不停的在刷,第五组消费的时候就已经超过 inner 的容纳能力了 肯定会 reject 掉,主要原因还是你 outer 没有阻塞
    Vedar
        4
    Vedar  
       2020-11-05 09:29:19 +08:00
    @season8 你是在 outer 线程池里面开一个线程去 await 的 这根本没阻塞 outer
    micean
        5
    micean  
       2020-11-05 10:31:38 +08:00
    @season8
    outter 线程结束并不意味着有三个 inner 结束,countDownLatch 释放之后,outter 和 3 个 inner 可没有先后执行顺序
    lancelee01
        6
    lancelee01  
       2020-11-05 10:33:24 +08:00
    3L 正解,countDownLatch 没什么用,感觉你的场景需要的是限流器,全局限流即可。同时 LinkedBlockingQueue 这个队列的可能和你想的不太一样,网上的八股文不对,你试试-_-!
    wysnylc
        7
    wysnylc  
       2020-11-05 10:42:27 +08:00
    别用 countDownLatch,换成 Completablefuture
    1194129822
        8
    1194129822  
       2020-11-05 12:00:48 +08:00
    建议创建线程池时传 ThreadFactory 参数,打印不要用 System.out.println,请换成 log 可以查看是具体那条线程。inner 线程池最大任务数 = 12 + 1 (建议不要设置非核心线程)。出现这个问题并没有什么高深的原理,仅仅是线程运行的不确定性,第一轮 outter 给 inner 提交了 9 个 task,此时 inner 正常,outter 三个线程被正确的阻塞。当 inner 运行所有 countDown 后,第一轮 inner 执行还没完全结束,outter 三个线程被唤醒,**注意**,此时线程执行没有了先后顺序和逻辑关系,完全靠 os 调度器调度,如果第二轮 outter 线程三个线程先提交任务,此时 inner 线程池最多可以接受 4 个任务,就是说这一轮已经可能出现错误了。而且一旦触发 RejectedExecutionException,try-catch 没有捕获这个异常,则直接杀死 outter 的核心线程,造成 outter 线程池,execute->Rejected->kill thread->create thread->execute 的恶性循环。代码根本没走到 await,所以一旦 Rejected 就不再阻塞了。
    micean
        9
    micean  
       2020-11-05 12:51:15 +08:00
    源码里是这样的:
    final void runWorker(Worker w) {
    Runnable task = w.firstTask
    ...
    try {
    while (task != null || (task = getTask()) != null) { //queueSize-1 (返回 null 时 workerSize-1 )
    ...
    task.run(); // 任务跑完了
    ...
    }
    } finally {
    processWorkerExit(w, completedAbruptly); // workerSize-1
    }
    }

    而 reject 的条件是 [queue 满] 或者 [worker 满] ,你觉得 countDown 结束了,其实只是跑完了 run()而已
    zoharSoul
        10
    zoharSoul  
       2020-11-05 13:37:58 +08:00
    这直接用 rxjava 多方便啊...
    yexiangyang
        11
    yexiangyang  
       2020-11-05 14:16:03 +08:00
    @micean 这个源码分析很有道理啊!
    season8
        12
    season8  
    OP
       2020-11-06 00:09:19 +08:00
    @Vedar @1194129822 @lancelee01 @micean 感谢各位的热情解答,我很受启发。再结合朋友给的例子,我仔细读了下源码,已经大致能复盘这个错误了。

    **inner 线程池 reject 的原因:**

    1. 主要原因:队列太小,这里给的是 1,实际每个 outer 线程要产生 3 个任务
    2. 次要原因:outter 线程里面使用 countdownlatch 确实不能起到很好的限流作用,

    **次要原因分析:**
    如 runWorker()源码所示,run 执行完毕并不能代表线程任务执行完毕。这意味着 outter 线程与 inner 线程的空闲线程数可能不是 1:3 的关系。但这里可以通过让 outter 线程 sleep 等待 inner 先执行完成,规避这个因素的影响。规避后,问题还是会存在,说明不是主要原因。

    **主要原因分析:**
    先来看个案例
    ```
    static class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
    public MyLinkedBlockingQueue(int capacity) {
    super(capacity);
    }

    @Override
    public boolean offer(E o) {
    System.out.println("任务加入,当前队列数:" + this.size());
    return super.offer(o);
    }
    }

    public static void main(String[] args) throws InterruptedException {
    BlockingQueue queue = new MyLinkedBlockingQueue<>(1);

    // 3 个线程的线程池
    ThreadPoolExecutor taskPoolExecutor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, queue);

    // 先将线程池拉满
    for (int i = 0; i < 3; i++) {
    final int finalI = i;
    taskPoolExecutor.execute(() -> {
    logger.info("{}", finalI);
    });
    }

    // 等待全部任务执行完
    Thread.sleep(1000);

    // 再次执行任务,发现每一个任务都触发加入队列操作。
    for (int i = 10; i < 12; i++) {
    final int finalI = i;
    // 多线程更容易触发 reject
    // new Thread(()-> taskPoolExecutor.execute(() -> logger.info("{}", finalI))).start();
    taskPoolExecutor.execute(() -> logger.info("{}", finalI));
    }
    }
    ```

    执行结果:

    > 23:12:39.988 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$main$0:34 - 2
    23:12:39.988 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$main$0:34 - 1
    23:12:39.988 [pool-1-thread-1] INFO c.r.s.Demo8.lambda$main$0:34 - 0
    任务加入,当前队列数:0
    23:12:40.997 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$null$1:46 - 10
    任务加入,当前队列数:0
    23:12:41.000 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$null$1:46 - 11

    跑完这个案例我感觉我根本不懂线程池,我翻了下源码:
    ```
    public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }

    // 线程池满了后,直接不创建核心线程了
    // 这里 isRunning 看的我懵逼,明明任务都执行完了,为啥还是 isRunning,先接受,后面再研究 [1]
    // 然后就触发入队列
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
    ```

    我以为的线程池是:只要有空闲线程,任务是直接丢给线程去执行的。
    **实际情况是:当核心线程数满,不管已有线程是否空闲,任务是先丢到队列,然后空闲线程从队列里面自取。**

    案例中,我给的队列大小是 1,当队列满的时候,会扩容线程池到最大线程池大小到 12,此时如果队列是满的(不管线程是否空闲),继续添加就会 reject 。案例中每组有三个任务,只要线程从队列 take 任务不及时,队列很容易满,从而触发 reject 。

    **验证:**
    1. countDownLatch.await(); 后面加上 sleep,让 outter 线程等 inner 线程结束,排除最开始说的第二个因素的影响。
    2. 将队列改成 3,适当调整线程执行时间(也可以不调),reject 很少触发或不触发。
    3. 将队列改成 9,没有触发 reject

    **总结:**
    1. 这个任务表面是多线程嵌套调用,内外线程调度不确定性导致的线程池问题,其实本质是对线程池理解不对导致线程池滥用的问题。
    2. 任务是添加到队列,空闲线程调用 take()获取,而不是有空闲线程就直接丢到空闲线程(实际任务也难以主动去找空闲线程,还容易造成等待,让线程自取则是生产消费的模式。)
    3. isRunning(c) 这个方法以及相关机制,还要再研究一下。


    再次感谢各位,如有不对的地方,还请指出。。
    season8
        13
    season8  
    OP
       2020-11-06 00:11:04 +08:00
    啊。。评论不支持 md,排版好丑,又有点长,各位见谅。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2441 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 34ms UTC 06:42 PVG 14:42 LAX 22:42 JFK 01:42
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86