大数据量、高并发业务怎么优化?(一) - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
wayn111

大数据量、高并发业务么优化?(一)

  •  
  •   wayn111
    wayn111 2022 年 12 月 8 日 1998 次点击
    这是一个创建于 1235 天前的主题,其中的信息可能已经有所发展或是发生改变。

    博主这里的大数据量、高并发业务处理优化基于博主线上项目实践以及全网资料整理而来,在这里分享给大家

    一. 大数据量上传写入优化

    线上业务后台项目有一个消息推送的功能,通过上传包含用户 id 的文件,给指定用户推送系统消息

    1.1 如上功能描述很简单,但是对于技术侧想要做好这个功能,保证大用户量(比如达到百万级别)下,系统正常运行,功能正常其实是需要仔细思考的,博主这里给出思路:

    1. 上传文件类型选择

    通常情况下大部分用户都会使用 excel 文件,但是相比 excel 文件还有一种更加推荐的文件格式,那就是 csv 文件,相比 excel 文件它可以直接在记事本编辑,excel 也可以打开 cvs 文件,且占用内存更少(画重点),对于上传的 csv 文件过于庞大,也可以采用流式读取,读一部分写一部分

    1. 消息推送成功与否状态保存

    由于大批量数据插入是一个耗时操作(可能几秒也可能几分钟),所以需要保存批量插入是否成功的状态,在后台中可以显现出这条消息推送记录是成功还是失败,方便运营回溯消息推送状态

    1. 批量写入启不启用事务

    博主这里给出两种方案利弊:

    • 启用事务:好处在于如批量插入过程中,异常情况可以保证原子性,但是性能比不开事务低,在特大数据量下会明显低一个档次
    • 不启用事务:好处就是写入性能高,特大数据量写入性能提升明显,但是无法保证原子性,但是对于已经批量插入的新增数据,只是会产生脏数据而已,在功能设计合理的情况下是不影响业务的,如下面第四点

    综上:在大数据量下,我们要是追求极致性能可以不启用事务,具体选择也需各位结合自身业务情况

    1. 推送异常失败的消息处理

    建议功能设计上,可以屏蔽对失败消息再进行操作,这样不需要再处理之前推送失败写入的脏数据,直接新增消息推送即可

    1.2 批量写入代码优化

    1. jdbc 参数携带 rewriteBatchedStatements=true 在 jdbc 驱动上启动批量写入功能,如下
    spring.datasource.master.jdbc-url=jdbc:mysql://localhost:3306/test_db?allowMultiQueries=true&characterEncoding=utf8&autoRecOnnect=true&useSSL=false&rewriteBatchedStatements=true 
    1. 启用 insert into table(id, name) values(1, 'tom'),(2, 'jack') 模式,建议一次写入个数不要太多,MySQL 对于 sql 长度是有限制的,对于这种字段少的表,一次写入 500 - 1000 问题不大,字段多了需要降低这个写入量
    insert into im_notice_app_ref(notice_id, app_id, create_time) values <foreach collection="list" separator="," item="item"> (#{item.noticeId}, #{item.appId}, #{item.createTime}) </foreach> 

    一般情况下大家都知道第二条优化,但是可能会忽略 jdbc 参数携带rewriteBatchedStatements=true,这个参数能在第二条的基础上启用批量执行 SQL ,进一步提升写入性能

    二. 大事务优化,减小影响范围,提升系统处理能力

    @Transactional 大于 Spring 提供得事务注解,许多人都知道,但是在高并发下,不建议使用,推荐通过编程式事务来手动控制事务提交或者回滚,减少事务影响范围

    如下是一段订单超时未支付回滚业务数据得代码,采用 @Transactional 事务注解

    @Transactional(rollbackFor = Exception.class) public void doUnPaidTask(Long orderId) { // 1. 查询订单是否存在 Order order = orderService.getById(orderId); if (order == null) { throw new BusinessException(String.format("订单不存在,orderId:%s", orderId)); } if (order.getOrderStatus() != OrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()) { throw new BusinessException(String.format("订单状态错误,order:%s", order)); } // 2. 设置订单为已取消状态 order.setOrderStatus((byte) OrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus()); order.setUpdateTime(new Date()); if (!orderService.updateById(order)) { throw new BusinessException("更新数据已失效"); } // 3.商品货品数量增加 LambdaQueryWrapper<OrderItem> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(OrderItem::getOrderId, orderId); List<OrderItem> orderItems = orderItemService.list(queryWrapper); for (OrderItem orderItem : orderItems) { if (orderItem.getSeckillId() != null) { // 秒杀单商品项处理 Long seckillId = orderItem.getSeckillId(); SeckillService seckillService = SpringContextUtil.getBean(SeckillService.class); if (!seckillService.addStock(seckillId)) { throw new BusinessException("秒杀商品货品库存增加失败"); } } else { // 普通单商品项处理 Long goodsId = orderItem.getGoodsId(); Integer goodsCount = orderItem.getGoodsCount(); if (!goodsDao.addStock(goodsId, goodsCount)) { throw new BusinessException("秒杀商品货品库存增加失败"); } } } // 4. 返还优惠券 couponService.releaseCoupon(orderId); log.info("---------------订单 orderId:{},未支付超时取消成功", orderId); } 

    采用编程式事务对其优化,代码如下:

    @Resource private PlatformTransactionManager platformTransactionManager; @Resource private TransactionDefinition transactionDefinition; public void doUnPaidTask(Long orderId) { // 启用编程式事务 // 1. 在开启事务钱查询订单是否存在 Order order = orderService.getById(orderId); if (order == null) { throw new BusinessException(String.format("订单不存在,orderId:%s", orderId)); } if (order.getOrderStatus() != OrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()) { throw new BusinessException(String.format("订单状态错误,order:%s", order)); } // 2. 开启事务 TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); try { // 3. 设置订单为已取消状态 order.setOrderStatus((byte) OrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus()); order.setUpdateTime(new Date()); if (!orderService.updateById(order)) { throw new BusinessException("更新数据已失效"); } // 4. 商品货品数量增加 LambdaQueryWrapper<OrderItem> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(OrderItem::getOrderId, orderId); List<OrderItem> orderItems = orderItemService.list(queryWrapper); for (OrderItem orderItem : orderItems) { if (orderItem.getSeckillId() != null) { // 秒杀单商品项处理 Long seckillId = orderItem.getSeckillId(); SeckillService seckillService = SpringContextUtil.getBean(SeckillService.class); RedisCache redisCache = SpringContextUtil.getBean(RedisCache.class); if (!seckillService.addStock(seckillId)) { throw new BusinessException("秒杀商品货品库存增加失败"); } redisCache.increment(Constants.SECKILL_GOODS_STOCK_KEY + seckillId); redisCache.deleteCacheSet(Constants.SECKILL_SUCCESS_USER_ID + seckillId, order.getUserId()); } else { // 普通单商品项处理 Long goodsId = orderItem.getGoodsId(); Integer goodsCount = orderItem.getGoodsCount(); if (!goodsDao.addStock(goodsId, goodsCount)) { throw new BusinessException("秒杀商品货品库存增加失败"); } } } // 5. 返还优惠券 couponService.releaseCoupon(orderId); // 6. 所有更新操作完成后,提交事务 platformTransactionManager.commit(transaction); log.info("---------------订单 orderId:{},未支付超时取消成功", orderId); } catch (Exception e) { log.info("---------------订单 orderId:{},未支付超时取消失败", orderId, e); // 7. 发生异常,回滚事务 platformTransactionManager.rollback(transaction); } } 

    可以看到采用编程式事务后,我们将查询逻辑排除在事务之外,减小了其影响范围,也就提升了性能,在高并发场景下,性能优先的场景,我们甚至可以考虑不适用事务

    三. 客户端海量日志上报优化

    线上项目客户端,采用 tcp 协议与日志采集服务建立连接,上报日志数据。业务高峰期下,会有同时成千个客户端建立连接实时上报日志数据

    如上场景,高峰期下,对日志采集服务会造成不小的压力,处理服务处理不当,会造成高峰期下,服务卡顿、CPU 占用过高、内存溢出等。

    这里给出海量日志高并发下优化点:

    1. 上报日志进行异步化处理,
    • 普通版:采用阻塞队列 ArrayBlockingQueue 得生产者消费者模式,对日志数据进行异步批量处理,在此场景下,通过生产者将数据缓存再内存中,然后再消费者中批量保存入库。
    • 进阶版:采用 Disruptor 队列,也是基于内存队列的生产者消费者模型,消费速度对比 ArrayBlockingQueue 有一个数量级得性能提升,附简介说明: https://www.jianshu.com/p/bad7b4b44e48
    • 终极版:采用 kfaka 消息队列中间件,持久日志数据,慢慢消费。虽然引入第三方依赖会增加系统复杂度,但是相比 kfaka/code> 在大数据场景下提供的优秀表现,这一点也是值得。

    如上三种方案:大家可以结合自身项目实际体量选择

    1. 采集日志压缩

    对上报后的日志如果要再发送给其他服务,推荐是对其进行压缩处理,避免消耗过多网络带宽以及最终数据落库选型:

    • 网络传输,在 Java 里通常是指序列化方式,Jdk 自带得序列化方式对比 Protobuf 、fst 、Hession 等在序列化速度和大小的表现上都没有优势,甚至可以用垃圾形容,博主这里直接给出 Java 得几种序列化方式对比链接: https://segmentfault.com/a/1190000039934578 , 建议对传输大小要求较高可以使用 Avro 序列化, 对综合要求较高可采用 Protobuf
    • 落库选型,像日志这种大数据量落库,都是新增且无修改得场景建议使用 Clickhouse 进行存储,相同数据量下对比 MySql 占用存储更少,查询性能更高

    最后,附博主 github 地址: https://github.com/wayn111

    byte10
        1
    byte10  
       2022 年 12 月 8 日
    想的有点复杂了吧,一个带事务的 MQ 就可以解决你的问题了??
    wayn111
        2
    wayn111  
    OP
       2022 年 12 月 8 日
    @byte10 老哥讲讲
    byte10
        3
    byte10  
       2022 年 12 月 8 日
    1 、上传的数据写入 MQ ,消息推送事务,回滚 mq 消息。2 、读取 mq 数据写入数据库,这个瓶颈一般在于数据库上,多个客户端同时消费。3 、日志那边没啥问题
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     866 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 54ms UTC 22:13 PVG 06:13 LAX 15:13 JFK 18:13
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86