轻量级 Java 应用消息通知中心 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Aidenboss
V2EX    Java

轻量级 Java 应用消息通知中心

  •  
  •   Aidenboss 2021-07-09 00:30:23 +08:00 2960 次点击
    这是一个创建于 1611 天前的主题,其中的信息可能已经有所发展或是发生改变。

    轻量级 Java 应用消息通知中心

    项目地址

    https://github.com/yemingfeng/kit-message

    项目背景
    1. 应用集群部署,并且使用了 local cache 。当要清除缓存时,通过 rpc / 消息队列清除,只能清除接收到消息的那个节点,无法清除整个应用集群的 local cache 导致,节点 、节点 3 存在脏数据。

    1. 应用集群部署,存在耗时的计算,为了减少计算资源浪费,某个节点更新后需要通知集群内其他节点更新

    这里的场景本质是,消息如何广播? 那么会有人问为什么不使用消息队列? 因为消息队列无法很优美的实现这里的场景。比如说 kafka,使用不同的 consumer_group 就可以实现,但不优雅。所以开启了 kit-message 这个项目。

    技术清单
    • java11
    • springboot
    • netty
    • redis
    核心逻辑

    • kit-message-center: 消息中心服务,接收 kit-message-client 订阅消息或者发布消息
    • kit-message-client:一个 Java 的轻量级 client 实现
    • kit-message-producer:基于 kit-message-client 发布消息
    • kit-message-consumer:基于 kit-message-client 订阅消息
    快速使用
    server 启动

    本地启动

    1. 修改 kit-message-center / application.yml 中 redis 的配置,配置遵循 springboot 规范
    2. mvn clean package
    3. java -jar kit-message-server/target/kit-message-center.jar
    4. server 会监听 8800 端口
    client 使用

    增加依赖

     <dependency> <groupId>io.github.yemingfeng</groupId> <artifactId>kit-message-client</artifactId> <version>1.0.0</version> </dependency> 

    发布消息

    MessageProducer messageProducer = new MessageProducer("127.0.0.1", 8800); messageProducer.pub("topic1", "topic1:" + i); messageProducer.close(); 

    订阅消息

    MessageConsumer messageCOnsumer= new MessageConsumer("127.0.0.1", 8800); messageConsumer.sub("topic1", new BiConsumer<String, String>() { @Override public void accept(String topic, String payload) { System.out.println("topic1_1:" + topic + "\t" + payload); } }); 
    线上环境部署

    kit-message-server 支持集群部署,建议使用 nginx 做转发。

    stream { upstream kit-message-server { server server_ip1:8800; server server_ip2:8800; } server { listen 8800; proxy_connect_timeout 1s; proxy_timeout 3s; proxy_pass kit-message-server; } } 
    Q&A
    1. 这个项目使用场景?使用消息队列等中间件不香吗?

    答:这个项目是基于服务间消息通知这个场景的。解决问题更加明确,也更加轻量。

    1. 为什么不直接封装一个 redis-client 进行消息的收发?而是使用 client/server 的模式?

    答:kit-message-server 让项目更加通用,接入更加方便,依赖更少,管理维护成本更低。

    欢迎提反馈

    14 条回复    2021-07-23 08:55:27 +08:00
    MidCoder
        1
    MidCoder  
       2021-07-21 14:01:42 +08:00
    所有的有点都是自己 YY 的,你有相关一个系统引入一个新的组件带来的各种风险吗?你能确保你的这个组件有多少个 9 的可用性?以及面对真实的生产场景,面对上亿的消息量,你的 redis 机器,和你的 message-center 集群如何实现横向扩展?能否通过简单的加机器就能解决?我看现在你的架构都不具备。你当前的架构就是一个 Toy,练手我觉得可以,但是千万不要用于实际业务场景,否则业务会被害惨的。所以上面说的第一点,纯属自己 YY,当前哪个消息中间件整个稳定性和横向扩展性没有 99%以上?请用成人思维去思考。

    架构的本质是去繁化简,多增加一个组件和一行代码都是需要经过仔细思考的,所以你上面说的第二点,无疑是为了自己的技术热情而引入的没必要的架构复杂度

    欣赏你对技术的热情,可以作为技术分享对你的技术理解,但不要轻易的将自己的东西当做框架或者开源推出去,这是一种对技术不负责任的态度。
    Aidenboss
        2
    Aidenboss  
    OP
       2021-07-21 20:21:41 +08:00
    @MidCoder
    1. 引入的风险就看各自的承接了。只要达到 99.9% 可用即可
    2. 横向拓展是通过 redis 做的,这里的逻辑推演下即可
    3. 可以通过简单的加机器完成

    请你先了解好以上再来评价。
    MidCoder
        3
    MidCoder  
       2021-07-21 22:23:37 +08:00
    @Aidenboss
    1 、你这里 99.9%的可用率在亿级别场景肯定达不到,首先你的 center 如何做到容错和负载?这一点你的架构一点都没实现,怎么保障你说的三个九的可用?至少机器挂了你这一点容错能力都没有。
    2 、你这里不只是 redis 水平,还有你的 center,你的 center 只是单点?单点你就敢承载亿级别的场景?太异想天开了。如果你 center 要做分布式,那就涉及到 center 的发现和负载,以及 center 和 redis 之间的归属如何分配?不是你想的这么简单。

    学习角度我是赞赏的,但是不要那这个去做实际生产场景,赞赏你的分享精神,但是如果你想证明自己,请把这个东西做到你说的三个九。如果这样简单搞一下就保障了三个九,你以为大厂哪些人是吃素的?
    Aidenboss
        4
    Adenboss  
    OP
       2021-07-21 22:52:02 +08:00
    @MidCoder
    你再仔细看下为啥 center 不是单点的吧
    MidCoder
        5
    MidCoder  
       2021-07-22 09:55:42 +08:00
    @Aidenboss 那整个架构 redis 将会是单点,虽然你的 center 可以通过 niginx 负载,但是如何解决单个 topic 消息量倾斜,导致 redis 集群负载不均衡,如果基于 redis,那 topic 的负载如何基于 reids 来实现,是你这个能否规模化的关键,至少这里没有实现或者体现。

    如果想真正研究大厂的基础架构,欢迎加 Vbieber-cn,来我厂一起搞事情
    Aidenboss
        6
    Aidenboss  
    OP
       2021-07-22 10:07:24 +08:00
    @MidCoder
    1. redis 单点就由 redis-cluster 解决。为啥要让应用层解决中间件单点的问题。
    2. center 是可以水平扩展的,已经解决了多 topic 的问题了。如果是单 topic 的消息负载,这点确实提醒了我,没有做。但使用 center + redis 参考 redis 模糊匹配的订阅模式即可。

    以上已经解决了你的问题。
    Aidenboss
        7
    Aidenboss  
    OP
       2021-07-22 11:31:56 +08:00
    Aidenboss
        8
    Aidenboss  
    OP
       2021-07-22 11:43:55 +08:00
    @Aidenboss
    重新补充下细节,topic 消息负载使用 topic 内消息分片解决。怎么解决呢?
    默认分成 8 个分片数「具体随意调整」
    前置条件:
    redis-cluster 保存每个 topic 的订阅者信息,以及订阅者的 key 。比如 key = topic_s_list,value = [c1:1,3,6,7,c2:2,4,5,8]
    每个消息都会按照 routing 算法计算出 key,每个消息都会发送到对应的 key 订阅通道,如:msg:1,代表发送到 msg1 发送给 topic1,只有 c1 才能接收到 msg1 的消息。


    当加入一个新的 topic 订阅者,就先发送 topic:stop 指令,c1 、c2 接收到指令后,之后的消息先缓存在 redis 的待发送 list topic_s_pengding 中
    c3 通过 lua 脚本,将 topic_s_list 修改为:[c1:1,3,4,c2:2,5,c3:6,7,8],并发送 sub_update 指令
    c1 、c2 接收指令后,重新订阅的 key 变成 topic:1 、topic:3 、topic:4 ; c2 的订阅的 key 是:topic:2 、topic:5 ; c3 订阅的 key 是 topic:6 、topic:7 、topic:8
    将 topic_s_pengding 中的 key 重新发送到 topic 中

    中间会有类似 kafka rebalance 的现象。但其实并不影响消息的生产。

    技术是为了解决问题的,提出没解决的点,照着点去设计就好。
    MidCoder
        9
    MidCoder  
       2021-07-22 13:23:12 +08:00
    @Aidenboss 是否通过真实的大规模场景验证你的这个方案?如果没有,如何验证你的方案是真的可行?
    第一:首先采用 redis 方案,看似把最难解决的消息存储交给 redis 已有解决方案来去解决。但是在真实大规模场景下,这会导致网络开销增加了一倍,因为多了一次 center 和 redis 的 request/response 。这种网络开销在亿级别的消息体量下,会严重影响性能
    第二:整个集群管理你如何保障?如何让全局感知整个 topic 分片的负载策略?以及当出现网络异常(你的 stop 命令都无法发出的时候)如何保障集群的一致性?以及消息的消费顺序如何保障,如何记录消息消费到了哪里?以及当消费端重启的时候,如何找到之前的消费位置?

    只能说你在逐步完善一个消息消费的最基本能力(消息通信),但是对于一个简单的 MQ 场景来说,这只是最简单的部分
    Aidenboss
        10
    Aidenboss  
    OP
       2021-07-22 16:47:58 +08:00
    @MidCoder
    你没理解这个场景,既然使用了 redis,就不需要保障消息可靠和一致性。
    这个场景只需要解决消息传递即可,这也是这个项目的本意。
    如果要做更牛的功能,直接用 RabittMQ 解决好了。
    不需要什么事情都推到大规模、亿万消息、机制性能。技术是为了解决问题而存在的。只要解决那个场景的问题,自然就有存在的价值。如果脱离的场景,只考虑技术难点,会忽略了解决这个问题的本质。
    MidCoder
        11
    MidCoder  
       2021-07-22 17:42:24 +08:00
    @Aidenboss 那你是如何解决消息消费者消费到一半,宕机或者重新部署,消息不丢失的?如果你不记录消息消费位置的?
    Aidenboss
        12
    Aidenboss  
    OP
       2021-07-22 19:04:47 +08:00
    @MidCoder 如果你需要解决,可以用: https://www.xuxueli.com/xxl-mq/
    MidCoder
        13
    MidCoder  
       2021-07-22 21:35:18 +08:00
    @Aidenboss 看来研究的开源项目不少,来不来我们这边搞事情?我们这边是搞基础架构的
    Dockerfile
        14
    Dockerfile  
       2021-07-23 08:55:27 +08:00
    好家伙,楼上是来面试的?
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1335 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 24ms UTC 17:03 PVG 01:03 LAX 09:03 JFK 12:03
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86