一个生产者消费者问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
sununiq
V2EX    Java

一个生产者消费者问题

  •  1
     
  •   sununiq Dec 22, 2017 3075 views
    This topic created in 3049 days ago, the information mentioned may be changed or developed.
    目前有这样一个场景,每次会以文件流的形式推送一批数据(大概 500W 条)给服务器解析入库,该数据会先保存在磁盘上。因为读取数据到内存解析比较快,入库是 IO 操作比较慢,所以目前想到的思路是采用单生产者多消费者模型,其中多消费者采用线程池管理,Java 中使用 BlockingQueue 来做中间管道。

    目前有这么一个需求,如果其中一个消费者入库出现异常,需要通知其他消费者停止消费,同时还要告知生产者放弃解析这批数据,直到下一批数据过来在此解析,请问下这种有什么好的方式来实现?
    6 replies    2018-01-12 20:37:16 +08:00
    zhx1991
        1
    zhx1991  
       Dec 22, 2017
    不同数据本身有特征(比如 id)可以分开吗?

    同类的数据(比如相同的 id, 需要严格保持先后顺序)可以走同一个管道, 然后这样某个管道挂了别的管道也依然可以正常处理别的数据
    qk3z
        2
    qk3z  
       Dec 23, 2017
    生产者和消费者共用一个标识符,生产者在每次解析的时候判断,消费者在每次入库的时候判断,但是这样感觉性能跟不上啊 -_-!
    sununiq
        3
    sununiq  
    OP
       Dec 23, 2017
    @zhx1991 需求就是出现问题,整个这批数据都不处理
    sununiq
        4
    sununiq  
    OP
       Dec 23, 2017
    @qk3z 是啊,没有想到啥好办法
    crossoverJie
        5
    crossoverJie  
       Jan 12, 2018
    实时性要求高嘛?可以借助 MQ 失败之后发个消息,所有线程收到这个消息后停止工作。
    sununiq
        6
    sununiq  
    OP
       Jan 12, 2018
    @crossoverJie 实时性要求不高,但是增加组件的话会增加整个系统的复杂度,有没有其他不增加其他组件的方法?
    About     Help     Advertise     Blog     API     FAQ     Solana     3247 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 96ms UTC 13:22 PVG 21:22 LAX 06:22 JFK 09:22
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86