[小白]关于 http 与 MQ 消息监听同异步转换 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
lchqfnu
V2EX    Java

[小白]关于 http 与 MQ 消息监听同异步转换

  •  
  •   lchqfnu 2022-09-06 18:29:52 +08:00 2908 次点击
    这是一个创建于 1138 天前的主题,其中的信息可能已经有所发展或是发生改变。

    近期偶刷到一篇文章,讲 java 的 http 异步调用,但实际上服务端还是连的同步服务。

    想到之前遇到的一个问题,诚心求教: 有个业务通过 MQ 消息队列交互,请求和响应分别通过 MQ 进行消息传递,异步传递,时间间隔小于 1 秒。 前端希望该业务通过 http 接口调用,那么有什么方式可以实现呢? 我们因为有个产品有这块功能,所以当时就通过产品实现了 http 同步转 MQ 异步,但希望了解下有哪些开源方案呢?最好是基于 springboot 的方案。

    感谢!

    第 1 条附言    2022-09-07 13:06:35 +08:00
    语文水平差,问题描述得可能不够清楚,补充一个图给各位大佬看下,感谢!
    xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    xxxxx 同步调用 xxxxxxxxxxxxx MQ 异步(发送) xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    前端 - (HTTP) -> 后端 - (send to mq) -> SEND_QUE -(mq channel)> EXTERNEL Mgr
    前端 - (HTTP) <- 后端 - (listen from mq) <- LISTEN_QUE <(mq channel)- EXTERNEL Mgr
    xxxxx 同步调用 xxxxxxxxxxxxx MQ 异步(监听) xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    21 条回复    2022-09-07 17:40:32 +08:00
    nulIptr
        1
    nulIptr  
       2022-09-06 18:59:10 +08:00
    问题过于小白
    mq 主要用来解耦,异步 /广播这些都是解耦带来的好处。
    你这用一个 http 请求相当于又回去了,mq 还有啥用呢。


    举个例子,前端界面有个导出报表的功能,报表导出又耗资源又耗时,所以就做成点完按钮发个导出的消息,导出服务作为消费者可以在有资源的时候执行导出,然后导出完了再发个导出完成的消息,
    再有个邮件推送服务可以消费导出完成的这个消息,给用户发个 email 附上下载链接啥的。这时候业务就完了,当然具体实现复杂多了。


    如果照你说的,业务实际上进行的很快,导出 1 秒就完事了,那不如不发 mq ,直接内存里面构造好数据直接返回了
    lchqfnu
        2
    lchqfnu  
    OP
       2022-09-06 19:16:20 +08:00 via iPhone
    @nulIptr 有些架构是变不了的,是现有的,比如 MQ 的异步消息机制。需要利用现有的东西,实现目标的机制,比如案例中的 http 接口。
    CEBBCAT
        3
    CEBBCAT  
       2022-09-06 19:20:56 +08:00
    不好意思,问题可能有一点点多,因为我现在的确有点困惑。

    1. 开头提到的「 Java 的 HTTP 异步调用」和后面的问题是什么关系呢?也有一点没懂“服务端还是连的同步服务”的意思。
    2. “前端希望该业务通过 http 接口调用,那么有什么方式可以实现呢?”听起来有点奇怪,前后端最常见的就是通过 HTTP 接口调用来完成吧?你是指前端希望通过阻塞式接口来获取响应吗?
    3. “我们因为有个产品有这块功能”是说你们开发过一个 toC 的软件恰好可以帮自己解决这个问题?但听起来好像也不太对。因为后面又在问有没有开源方案。

    假如说你是想问如何实现 [通过阻塞式 HTTP 接口服务异步式业务并返回业务结果] 的话,我觉得其实只要以“有结果后再 return”为中心来设计就可以了。比如你可以通过在服务端轮训、监听事件等等方式来实现结果的获取
    lchqfnu
        4
    lchqfnu  
    OP
       2022-09-06 19:38:28 +08:00 via iPhone
    @CEBBCAT
    感谢回复。
    1 、前后没有必然联系,java 的 http 异步调用提到的是非非阻塞式调用,实质上调用的服务是普通的 http 接口。
    2 、问题场景是前后端希望是标准 restful 接口,但后端接口实现了 MQ 消息发送,以及发送后 MQ 异步响应消息监听,获取结果后加工成前端要的结构 http 同步返回。
    3 、提到某产品是指的公司采购的产品,实现了 2 的功能,实现逻辑不明。
    4 、springboot 提供了 jms 组件可以比较方便的监听消息队列,以及操作队列管理群发送队列消息,但是将发送消息和监听结合起来形成一个后端服务,没想到合适的方式。
    ryanbuu
        5
    ryanbuu  
       2022-09-06 19:44:23 +08:00 via iPhone
    当时有个分布式服务,处理方式是使用了 redis 的 pubsub ,http 写 mq 的时候生成一个 uuid 为 key ,生成一个锁为 value ,之后把 uuid 透传下去。mq 处理完之后调用服务的 callback 接口,写入结果。callback 接口通过 uuid 获取锁,unlock 之后拿内容返回就行了。
    ryanbuu
        6
    ryanbuu  
       2022-09-06 19:45:45 +08:00 via iPhone   1
    上边儿说的有误。当时有个分布式服务,处理方式是使用了 redis 的 pubsub ,http 写 mq 的时候生成一个 uuid 为 key ,生成一个锁为 value ,之后把 uuid 透传下去。mq 处理完之后写入 pubsub 。callback 接口通过 uuid 获取锁,unlock 之后拿内容返回就行了。
    FrankAdler
        7
    FrankAdler  
       2022-09-06 19:51:29 +08:00 via iPhone
    不如试试 websocket ,请求响应分开
    lchqfnu
        8
    lchqfnu  
    OP
       2022-09-06 19:54:46 +08:00 via iPhone
    @q1angch0u 有意使用 redis 这种方式尝试一下,不过听起来也挺复杂的,主要是监听以后这个 callback ,这种异步方式平时没做过。老哥有 github 么,学习一下。
    yazinnnn
        9
    yazinnnn  
       2022-09-06 20:40:53 +08:00
    guava 的 cache map 配合 CountDownLatch 不是很容易么...
    只是是同步阻塞方案,性能比较差
    mango88
        10
    mango88  
       2022-09-06 21:26:17 +08:00   1
    各大网站 web 端的扫码登录 跟你提到的场景类似,你可以研究一下
    lchqfnu
        11
    lchqfnu  
    OP
       2022-09-06 21:46:42 +08:00 via iPhone
    @mango88 之前看过知乎的扫码登录好像是轮询,就是选择二维码登录以后,页面在一定时间内间隔轮询结果,超时后停止然后显示二维码失效过期。
    或也有其他种类。
    yazinnnn
        12
    yazinnnn  
       2022-09-06 21:52:50 +08:00   1
    https://gist.github.com/yazinnnn/14d8946a0e1278a2a269a3453a98e7ef

    一个用 webflux 和 guava 的简单反应式实现, 可以根据你的业务自行修改
    yazinnnn
        13
    yazinnnn  
       2022-09-06 22:00:11 +08:00
    业务超时 2 秒, rpc 响应时间 1-2.5 秒, 2000 链接同时请求,rps 在 800 以上, 能满足一般业务需求了

    wrk -t 6 -c2000 -d10s http://localhost:8080/async
    Running 10s test @ http://localhost:8080/async
    6 threads and 2000 connections
    Thread Stats Avg Stdev Max +/- Stdev
    Latency 10.10ms 43.17ms 1.31s 99.89%
    Req/Sec 504.96 501.88 1.42k 72.29%
    8939 requests in 10.05s, 717.49KB read
    Socket errors: connect 0, read 0, write 0, timeout 7988
    Requests/sec: 889.10
    Transfer/sec: 71.36KB
    szzadkk
        14
    szzadkk  
       2022-09-07 09:35:20 +08:00
    dubbo 里面好像有这种异步转同步的方案,可以参考下
    lchqfnu
        15
    lchqfnu  
    OP
       2022-09-07 12:49:51 +08:00
    @yazinnnn kotlin 虽然没有太看懂,但是看起来这个 demo 实现的是一个非阻塞式异步响应,关于响应这块,还是没明白如何结合监听的方式去应答。
    lchqfnu
        16
    lchqfnu  
    OP
       2022-09-07 12:53:31 +08:00
    @yazinnnn webflux 看起来功能好强大,但需要时间学习。不晓得 webflux 是否能够实现监听队列消息后响应给前端;或者说是否可以在 mq listener 中往 mono 或者什么的里面设置返回内容,然后 webflux 会从 mono 里把返回内容响应给前端。
    lchqfnu
        17
    lchqfnu  
    OP
       2022-09-07 13:08:12 +08:00
    @q1angch0u callback 接口通过 uuid 获取锁,unlock 之后拿内容返回就行了。
    lchqfnu
        18
    lchqfnu  
    OP
       2022-09-07 13:08:45 +08:00
    @q1angch0u callback 接口通过 uuid 获取锁,unlock 之后拿内容返回就行了。
    @q1angch0u 这个通过 uuid 获取锁,是要轮询查 redis 吗?
    ryanbuu
        19
    ryanbuu  
       2022-09-07 13:11:39 +08:00
    @lchqfnu 消费 mq 或者 subscribe 啊……
    258
        20
    258  
       2022-09-07 15:01:41 +08:00
    前端使用长轮询方案,当 http 请求时不立即返回,hold 住链接。等 mq 消息过来之后,将消息返回给前端。也可以使用 websocket 长链接
    ymy3232
        21
    ymy3232  
       2022-09-07 17:40:32 +08:00   1
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     912 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 20:05 PVG 04:05 LAX 13:05 JFK 16:05
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86