搞死 MPP 的时空碰撞问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
爱意满满的作品展示区。
jingwei8340885

搞死 MPP 的时空碰撞问题

  •  
  •   jingwei8340885 Dec 11, 2023 1779 views
    This topic created in 873 days ago, the information mentioed may be changed or developed.

    问题描述

    时空碰撞定义

    某时间区间(例如 7 天)被分成多个固定时长(如 15 分钟)的时间切片,对象 a 和对象 b 在同一时间切片内的相同位置出现过,称为一次碰撞。

    规则 1:相同时间切片内,多次碰撞只记一次。

    规则 2:相同时间切片内,最后出现位置不同的称为不匹配,不匹配的时间切片数量不超过 20 时,(包括其它时间切片的)碰撞才被认为有效。

    要求:已知对象 a ,查找出指定时间区间内,满足两条规则下,与 a 发生有效碰撞次数最多的前 20 个对象 b 。

    数据结构与规模

    单一数据表,每天的数据量约 80 亿条记录,每个对象平均 1000 条记录,每条记录存储对象的时空信息(对象标识、时间戳、空间标记),当时间区间为 7 天时,总数据量有 560 亿行,数据结构如下:

    字段名称 字段类型 字段注释 示例数据
    no String 唯一对象标识 100000000009
    ct Int 时间戳,精确到秒 1690819200
    lac String 空间标记 1 40000
    ci String 空间标记 2 66000000

    no 由全数字构成。lac 、ci 总是一起出现,为了描述方便起见,我们可以把 lac 和 ci 并称为一个字段 loc ,已知 loc 去重计数的范围不超过 27 万。

    环境和期望

    在 5 台 64C256G 服务器构成的集群下,期望 1 小时内计算出结果,使用某世界知名 MPP 数据库无法达到预期。

    问题分析

    这个问题用关系数据库确实不容易快速计算,我们尝试用 SQL 写出不考虑规则 2 的运算:

    WITH DT AS ( SELECT DISTINCT no, loc, int(ct/15 分钟) as ct FROM T ) SELECT TOP 20 * FROM ( SELECT B.no, COUNT(DISTINCT B.loc) cnt FROM DT AS A JOIN DT AS B ON A.loc=B.loc AND A.ct=B.ct WHERE A.no=a AND B.no<>a GROUP BY B.no) ORDER BY cnt DESC 

    SQL 中的 DISTINCT 和 JOIN 计算会涉及 HASH 和比对,数据量很大时计算量也会很大,都会严重拖累性能。而且这些运算都涉及随机访问,通常要在内存进行,数据量太大还要使用缓存,性能更会急剧下降甚至可能溢出。仅是规则 1 用 SQL 计算已经很慢了,再加上规则 2 ,MPP 算不出来也不奇怪了。

    如果把对象 a 、b 在时间区间内的相关记录都取出成内存中的集合,然后来统计 a 和 b 发生有效碰撞的次数,并不会很困难。每个对象涉及的记录数并不多,即使 7 天区间也不到 1 万条,内存放下毫无压力。

    设 a 的记录集合是 A ,b 的是 B ,将 A 按时间切片分组为 A1,…,An ,B 分为 B1,…Bn 。所有 Ai,Bi 内成员都按 ct 从小到大排序。

    时间切片 i 内,a,b 发生(不考虑两条规则时的)碰撞的次数,可用

    Ci=Bi.count(Ai.(loc).contain(loc)) 

    计算出来,即统计 Bi 中有多少 loc 在 Ai 中出现过。

    不过,这种两层循环计算会较慢,而我们知道 a 以及 Ai 相对于 b 是确定的,这样可以事先对 Ai 中的 loc 去重后建索引,改为

    Ai’=Ai.id(loc).key@i(loc) Ci=Bi.switch@i(loc,Ai’).len() 

    用 switch@i 过滤掉在 Ai 中找不到 loc 的 Bi 成员,同样可以得到碰撞次数。

    我们只要统计 Ci>0 的时间切片个数即可得到满足规则 1 的碰撞次数。

    类似地,可用

    Di=Ai.m(-1).loc!=Bi.m(-1).loc //m(-1)表示取集合的最后成员 

    判断出在时间切片 i 中 a,b 是否发生过不匹配。

    有了 Ci,Di ,a,b 的有效碰撞次数就很容易计算了

    if(count(Di)<=20,count(Ci>0)) 

    剩下就是针对该值计算 TopN 的常规任务了。

    如果数据对 no,ct 有序,也很容易实现这个思路。A 可以用二分法一把取出,然后从头遍历对象 b ,因为数据有序,每次取出对应的 B 很容易。A 和 B 都对 ct 有序时,可以用有序分组计算出 Ai,Bi ,且保证上述 m(-1)的正确性。

    可惜关系数据库无法保证数据有序存储,也没有相关的有序计算方法,只能写出非常绕的嵌套 SQL 。

    SPL 有这种有序存储和相关的计算机制,容易实现。

    基于这个思路,还有一些工程上的优化手段。

    数据转换

    将 no 变成数,两个位置 lac 、ci 合并成一个 loc ,并且序号化(原来是字符串,数字化时就顺便处理为序号了)。

    转换后的数据结构如下

    字段名称 数据类型 字段含义 示例数据
    no Long 唯一对象标识 100000000009
    ct Int 时间戳,精确到秒 1690819200
    loc Int 空间标记 10282

    相比原数据结构,转存时做了以下两点变动:

    1 、将 lac 、ci 两个字段合并为 loc 字段,并转换成 Int 型序号。原 lac 、ci 作为维表单独存储。

    2 、将数字串 no 的数据类型变为 Long 型整数。

    关联与序号化

    前面分析中提到的每个时间切片的 Ai 建索引,但 Ai 太小了(平均长度在 10 左右),对于过小的集合使用索引的效果不明显。所以,我们在工程上改造成对整个 A (长度约有 1000 )建索引,这样要把时间切片序号 i 也加到主键上,大致代码:

    A’=A.derive((ct-st)\900:i).groups(i,loc).index() 

    其中 st 是时间区间的起点,即每 900 秒分出一个时间切片。

    这时 Ci 的计算要变成先关联(过滤)再分组了:

    B.derive((ct-st)\900:i).join@i(i:loc,A).groups(i;count(1):C) 

    这样就可以计算出以 i 和 Ci 为字段的序表,未碰撞的情况被 join@i 过滤掉了。

    join@i 使用索引实现关联过滤时,还是要计算 HASH 并比对,仍然有一定的计算量。其实我们知道,全部 i,loc 组合最多有 7 天96 (每天 96 个 15 分钟)27 万种可能,这并不是很大。如果用一个布尔值数组(序列)表示 A 在各个时间切片中是否在某个 loc 出现过,其长度最多也就是 79627 万,内存完全可以装得下。这样,我们就可以用对位序列技术来实现关联过滤,避免 HASH 计算和比对时间,能更快速地计算 Ci 。

    用 aloc 表示 A 的对位序列:

    aloc=A.align@a(672,(ct-st)\900+1).(x=270000.(false),~.run(x(loc)=true),x) 

    因为有时间切片和位置两个维度,这里也使用了二层的对位序列。将 A 按时间切片分成 672 ( 7*96 )个 组,每组是个 27 万个布尔值成员的序列,对于时间切片 i 中在位置 loc 出现过的对象,可以简单地用 aloc(i)(loc)迅速判断出是否与 a 发生过碰撞(即 a 是否也在时间切片 i 中在位置 loc 出现过)。

    a 在每个时间切片的最后位置,也可以用一个序列表示为:

    alast=A.align@a(672,(ct-st)\900+1).(~.m(-1).loc) 

    alast(i)就是 a 在时间切片 i 的最后位置,同样可以简单地用时间切片序号访问,以便快速计算 Di 。

    按天分表

    以上讲的算法要求数据对 no,ct 有序。但数据每天会新增,新增数据通常只会对 ct 有序甚至彻底无序。如果每次都要所有数据大排序就非常慢,即使只把新增数据排序再归并也要重写 560 亿行的数据,过于耗时。

    SPL 复组表可以将多个有序的组表逻辑上合并成一个更大的有序组表,这样每天一个分组表存储,计算时用复组表归并分表数据,归并后的数据也可以支持并行计算。避免了全量数据每天重写,复组表读取时会损失少量归并时间,但获得数据维护的灵活性还是值得的。

    当历史数据过期时,直接将相应日期的分表文件删除就可以了,非常简单。

    实践过程

    准备实验数据

    将数据按天存储,每天内数据 no 、ct 有序,保存为列存组表,例如将 7 天数据,分别存为:1.day.ctx,…,7.day.ctx ,由这 7 个分表可构成复组表,造数据脚本可以这样写:

    A B C
    1 =rand@s(1)
    2 for n =file("day"/A2/".btx")
    3 =movefile(B2)
    4 =elapse@s(sd,(A2-1)*86400)
    5 =long(B4)\1000
    6 for nm =1000000.new(100000000000+rand(8000000):no,int(B5+rand(86400)):ct,int(rand(270000)+1):loc)
    7 =B2.export@ab(C6)
    8 =file(A2/".day.ctx").create@py(#no,#ct,loc)
    9 =B2.cursor@b().sortx(#1,#2)
    10 >B8.append@i(B9)
    11 =movefile(B2)

    参数值有 3 个:

    1 、n ,几天,举例:1 ,代表 1 天

    2 、nm ,每天几百万,举例:1000 ,代表 10 亿

    3 、sd ,起始日期,举例:2023-08-01

    B8 建立组表时用了 @p 选项,表示按第一个字段 no 作为分段键。并行计算时需要对组表分段,不能把相同 no 的记录分到两段,使用 @p 选项可以在组表分段时保证这一点。

    计算脚本

    A
    1 =now()
    2 270000
    3 =n*24*3600\pt
    4 =file("day.ctx":to(n)).open()
    5 =A4.cursor@m(ct,loc;no==src_no).fetch().align@a(A3,(ct-st)\pt+1)
    6 =alast=A5.(~.m(-1).loc)
    7 =aloc=A5.(x=A2.(false),~.run(x(loc)=true),x)
    8 =A4.cursor@m(;no!=src_no).derive((ct-st)\pt+1:tn,aloc(tn)(loc):loca,alast(tn):lasta)
    9 =A8.group@s(no,tn;lasta,count(loca):cnt,top@1(-1,0,loc):lastb)
    10 =A9.group@s(no;count(cnt>0):cnt,count(lasta && lastb && lastb!=lasta):dcnt)
    11 =A10.select(cnt>0 && dcnt<=A3).total(top(-20;cnt))
    12 =file("app2_result.csv").export@ct(A11.new(src_no,no:dst_no,cnt:count))
    13 =interval@ms(A1,now())

    参数值有 4 个:

    1 、src_no 为对象 a 的特征号,举例:100000000009 2 、st 为起始时间戳(秒),举例:1690819200 ,对应 2023-08-01 00:00:00 3 、n 为统计天数,举例 7 4 、pt 为切片时间的秒数,举例 900

    A3:为统计时间区间内的总时间切片数

    A5:读出对象 a 的数据,产生时间切片序号并按该序号分组。组表对 no 有序时,用 no==src_no 的条件可以迅速定位到目标数据。

    A6:基于 A5 计算 a 在每个时间切片的最后空间值

    A7:基于 A5 计算 a 在对位序列,前面已经解释过计算原理

    A8:遍历其他(非 a 的)对象,生成时间切片序号 tn (使用新符号与 a 区别)。对于第一记录,从 aloc 中取出当前对象是否在时间切片 tn 和位置 loc 上和 a 发生碰撞记入 loca ,从 alast 中取出时间切片 tn 中对象 a 的最后空间值。

    A9:按对象和时间切片分组,可以用 lasta 计算每个对象在时间切片中与 a 的碰撞次数 cnt ,即前面分析的 Ci ;并计算出该对象在该时间切片中最后的 loc 记为 lastb 。

    A10:进一步按对象分组,计算出该对象与 a 的(考虑规则 1 后的)碰撞次数和不匹配次数。每个时间切片中的 Ci>0 即认定为一次碰撞,所以是 count(cnt>0),记入新的 cnt ;最后位置不同时计算一次不匹配,记入 dcnt 。

    A11:过滤掉无效碰撞的对象后取有效碰撞次数最多的前 20 名。这里做实验时采用的条件 dcnt<=A3 ,实际应该是 dcnt<=20 ,因为随机生成的数据中几乎没有 count(Di)<=20 的,就会算出空集。而 count(Di)最大值就是 A3 ,可以保证总能统计出结果。这样的计算量会比针对实际数据会更大,用于测试性能只会吃亏。

    编号化及还原

    以上代码在造数据时,是按 no 已经整数化,且 lac,ci 被合为序号来写的,实际上先要做转换整理,完成计算后还要还原。具体介绍可以参考:数据转存时的整数化

    实验效果

    SPL 使用单机( 8C64G ),计算总时间跨度 7 天(总数据量有 560 亿行),时间切片为 15 分钟,耗时为 121 秒。

    实际上,达到这个性能还会少量使用 SPL 企业版中的列式运算选项,但因为不涉及原理分析,这里就不详述了。

    后记

    这是个典型的对象统计问题,这类问题一般有如下几个特点:

    1. 统计满足某种条件的对象的个数

    2. 对象的数量非常多,但每个对象涉及的数据量不多

    3. 条件非常复杂,通常还和次序有关,需要一定的步骤才能判断出来

    面对这类问题,一个常见的思路就是把数据按对象排序,逐步取出每个对象的数据进入内存,再来做复杂的条件判断。

    现实中这种运算很常用,银行的帐户统计、电商的用户漏斗分析等等都是这种运算。

    SQL 很难实现这种运算,不保证有序存储,也缺乏有序运算,也很难写这些复杂判断。经常要写成很绕的嵌套语句,或者使用存储过程,无论如何,执行性能都会很差。

    SPL 则提供了有序存储及有序计算,也支持复杂的过程计算,能够很方便实现这类统计。

    SPL 资料

    1 replies    2023-12-12 08:55:11 +08:00
    XhivaW
        1
    XhivaW  
       Dec 12, 2023
    这东西的宣传过去半年看了不知道多少次。。。 有用过的 v 友锐评一波吗?
    个人是感觉没啥兴趣也没啥需求
    About     Help     Advertise     Blog     API     FAQ     Solana     751 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 60ms UTC 21:36 PVG 05:36 LAX 14:36 JFK 17:36
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86