Spark 如何精准控制消息的发送速率? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
leon0318
V2EX    Java

Spark 如何精准控制消息的发送速率?

  •  
  •   leon0318 2022 年 9 月 17 日 2583 次点击
    这是一个创建于 1316 天前的主题,其中的信息可能已经有所发展或是发生改变。
    目前会用 Spark 去计算一些业务逻辑,然后将处理成功的记录以 id 的形式发送 MQ 给下游,请问如何精准控制 Spark 发送消息的速率呢?

    单机可以使用 RateLimiter ,分布式情况下有什么解决方式呢?
    16 条回复    2022-09-22 15:02:30 +08:00
    ruanimal
        1
    ruanimal  
       2022 年 9 月 17 日   1
    基于外部存储(比如 redis )实现流控算法

    或者使用每个 worker 流控worker 数目
    leon0318
        2
    leon0318  
    OP
       2022 年 9 月 17 日
    @ruanimal #1 感谢回答
    1 、基于外部存储限流是一个挺好的解决方案,但不确定是不是最佳实践。另外,对 Spark 不太熟悉,不知道它有没有内置一些流控的方式呢?
    2 、如何在运行过程中动态的获取 worker 的数目呢?感觉占用的 executor 也是运行过程中根据实际资源占用动态分配的
    noparking188
        3
    noparking188  
       2022 年 9 月 17 日   1
    请问是用 Spark Streaming 吗?以我的理解 Spark 适合批量写,不知道你这个场景是不是适合用流处理
    以前我有个需求,设置一定速率来读取数据库、文件等来源的数据,发送到 Redis 队列里,不能超过队列预定的容量,我是手写 Python 处理的
    当然这个得根据你的数据量来考虑了
    kkeep
        4
    kkeep  
       2022 年 9 月 18 日 via Android   1
    还好,把速率控制交给消费端做,丰俭由人,人家想加速就多开几个,不想留少开几个。你控流了别人还不愿意呢
    kkeep
        5
    kkeep  
       2022 年 9 月 18 日 via Android   1
    更何况 MQ 本来就是给你这种发消息用的,存起来就是了
    winglight2016
        6
    winglight2016  
       2022 年 9 月 18 日
    这需求有点迷,一般都是控制流入速度,怎么会去控制流出速度呢,lz 是希望能够控制速度,预留优化空间吗?
    leon0318
        7
    leon0318  
    OP
       2022 年 9 月 18 日
    @winglight2016 #6 或者再假设一个场景,比如 Spark 将最终处理结果写入 DB ,但是 DB 的写入 QPS 有限,这块该如何限制呢?
    leon0318
        8
    leon0318  
    OP
       2022 年 9 月 18 日
    @kkeep #4 或者再假设一个场景,比如 Spark 将最终处理结果写入 DB ,但是 DB 的写入 QPS 有限,这块该如何限制呢?
    noparking188
        9
    noparking188  
       2022 年 9 月 18 日   1
    @leonme #7 给个参考:
    1. Spark 写 Parquet 文件,这个写完很快,不会占用太久集群资源
    2. DataX 之类工具读 Parquet 写 DB ,可以设置并发和 Batch Size ,开很小的资源就够了
    以上是离线处理的场景,你的场景是什么?
    leon0318
        10
    leon0318  
    OP
       2022 年 9 月 18 日
    @noparking188 #9 感谢,一般实践中是不会去控制"Spark 写 Parquet 文件"的速度,二是控制"DataX 之类工具读 Parquet 写 DB"的速度,是吧?

    所以到我这块是要控制 MQ 消费的速度,而不是 Spark 发送 MQ 的速度
    leon0318
        11
    leon0318  
    OP
       2022 年 9 月 18 日
    @noparking188 #9 是这样的,有个问题在于,MQ 采用 mafka ,有多个消费组订阅 topic ,对于某个消费组来说,有很多无关消息(上亿),导致瞬态流量很大(虽然直接 return ),cpu 瞬态使用率过高
    leon0318
        12
    leon0318  
    OP
       2022 年 9 月 18 日
    @kkeep #5 是这样的,MQ 采用 mafka ,有多个消费组订阅 topic ,对于某个消费组来说,有很多无关消息(上亿),导致瞬态流量很大(虽然直接 return ),cpu 瞬态使用率过高
    noparking188
        13
    noparking188  
       2022 年 9 月 19 日   1
    @leonme #11 老哥,我看你主题描述的是想控制 Spark 写 MQ 的速度,这边回复里说是想控制消费端消费 MQ 的速度

    我没有裸写程序消费 Kafka 的经验,不过我有用过 Flink 消费 Kafka ,可以限制消费速度,比如隔多久 fetch 一次,fetch size 啥的,也许可以参考。补充一下,我也是所有数据都推同一个 topic ,多个 flink 应用消费同一个 topic ,根据条件过滤无关的消息,几千万数据倒是没有不稳定的。

    可能我经验不足,有点不明白的是,你的消费端程序难道不是按一定频率通过偏移量读消息(比如等待几秒再更新一次),而是只要来了就立马消费?
    leon0318
        14
    leon0318  
    OP
       2022 年 9 月 19 日
    @noparking188 #13 是这样的,mafka 某个 topic 下面有几十个消费组(只有几台消费实例(机器)),spark 瞬间往 mafka 发了几千万消息,导致机器网卡瞬态流量非常大(网络流量约等于消费组数量*消息数*每条消息大小),cpu 瞬态使用率过高

    所以在想是不是通过对 Spark 写入进行分布式限流,往 topic 里写慢点,这样消费组就不会出现瞬态负载过高的问题

    消费端目前是只要来了立马消费
    noparking188
        15
    noparking188  
       2022 年 9 月 20 日
    @leonme #14 看上去
    1. 不要用 Spark 直接写 MQ (瞬间写几千万到队列也不是很合适的样子,除非实时性要求高,下游可以瞬间消费完)
    2. 调整消费端的消费策略(推荐)

    一点建议,仅供参考
    lmshl
        16
    lmshl  
       2022 年 9 月 22 日
    @leonme
    针对“或者再假设一个场景,比如 Spark 将最终处理结果写入 DB ,但是 DB 的写入 QPS 有限,这块该如何限制呢?”回复

    我们 Scalaer 通常做法是充分利用 backpressure ,下游消费速率慢的时候就不会从上游拉取太多任务了,至于精确速率并不是很关心,只关心能否充分利用当前资源。

    比如当前业务数据在 Stream 中经过 group -> batch 以后,并行写入 DB 可以在 20 个 connection 上占用 DB 50% 的 IOPS/CPU/Memory... 上限,那我并行度就设定在 20 ,也不会影响其他人访问 DB

    https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#back-pressure-explained
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2858 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 38ms UTC 12:47 PVG 20:47 LAX 05:47 JFK 08:47
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86