求助!关于流程引擎框架和执行自定义用户代码 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
BeUself
V2EX    程序员

求助!关于流程引擎框架和执行自定义用户代码

  •  
  •   BeUself 2024-03-27 10:05:20 +08:00 1996 次点击
    这是一个创建于 620 天前的主题,其中的信息可能已经有所发展或是发生改变。

    公司有个项目,主要需求就是 能够执行用户上传的插件(就是 pyton2 代码),并且按照一定的流程执行,然后要支持集群模式。

    先说问题:

    1. 写插件的人调试困难,基本上属于黑盒了
    2. 改插件代码之后再重新执行,特别的麻烦
    3. 我还需要写检查代码语法的函数(虽然利用 ast 模块已经实现了),比如:必须定义 main 函数,并且仅有一个入参。但是使用中还是有其他的一些问题,比如一些我没考虑到的情况出现。就会导致后面执行的时候报错或者根本无法载入进去。
    4. 插件可能会出现内存泄露的情况,导致服务区器存吃满。别人每次都会直接找我们,虽然是因为插件导致的。可能需要能自定监测插件是否出现内存泄露的办法。

    具体是:

    有个 web 界面上传插件,然后在界面上面配置几个插件执行。这个任务会按照设置好的周期执行。

    插件的流程是具体的 为:查询数据--处理数据--发送数据

    查询到的数据每次大概有 1000 条,占用 10 兆左右的内存,每天大概有 7000 万条数据
    现在我的做法:
    1. 用户上传插件之后,各个机器收到 kafka 消息,然后用importlib将插件动态导入进内存中 以便后面调用
    2. 单独有个进程去执行查询数据的脚本
    3. 将查询到的数据通过 kafka 发送至处理数据的进程
    4. 处理数据的进程收到 kafka 消息 根据消息内要执行的处理数据插件发送数据插件 来分别按照流程调用插件
    5. 最后需要将最后一步的数据写进 ES ,并且通过 kafka 进行通知回调
    上面为什么要用 importlib 载入到内存中,是因为一开始尝试用命令行直接 python xxx.py 执行插件。但是这个调用行为属于高频操作,每天大概要调用几百万次。每次重新开启 python 虚拟机,速度根本就跟不上。所以后来改成直接载入到内存,通过函数调用。

    求助大佬们,有没有开源的框架能够胜任这种情况的?已经尽可能精简了,希望大佬们留下一点点意见。

    11 条回复    2024-03-27 20:21:59 +08:00
    musi
        1
    musi  
       2024-03-27 10:12:17 +08:00
    打开一下思路
    引入 faas ,想稳定就云厂商,想自建就找开源的
    然后就变成了两个服务之间的交互 rpc/http 看喜好
    BeUself
        2
    BeUself  
    OP
       2024-03-27 10:16:27 +08:00
    @musi 多谢大佬留言,我先去看看 faas 。
    然后我们这个要给客户线下物理机部署。不能上云
    Dongxiaohao
        3
    Dongxiaohao  
       2024-03-27 11:33:27 +08:00
    没写过你这种流程引擎,但是有类似的户上传 Python 算法,给服务端去执行;
    Python 那边起了一个 Flask 的 webserver ,提供 deploy 和 uninstall 和调用算法 predict 方法的接口。
    大致流程就是 web 服务器部署算法之后,Java 把算法文件传给 Flask ,Flask 动态加载这个 Python 算法,存在内存中。
    然后等待 Java 调用 predict 的请求就行了。
    要和算法编写人员约定好算法文件的部分格式,比如主类名称,和方法名,不能乱写。
    foolishcrab
        4
    foolishcrab  
       2024-03-27 12:09:38 +08:00 via iPhone
    你看一下 perfect hq
    SmiteChow
        5
    SmiteChow  
       2024-03-27 12:31:14 +08:00
    核心问题:动态代码应该动态编译执行,而不是存为文件。以下是实战中的代码片段,供参考
    ```
    code = compile(python_code, run_file_path, 'exec')
    space = globals()
    space['__builtins__'].update({
    'asql_runtime': self.runtime,
    'asql_types': data_types,
    'asql_stdlib': stdlib,
    })
    func = FunctionType(code.co_consts[0], space)
    ```

    其他的问题:
    1. 调试,可以提供命令行 sdk
    2. 动态编译执行已解决
    3. 具体问题具体分析,语法检查目前实现方式没问题
    4. 单独开进程去跑才能控制开销,一下是实战中的代码片段,供参考
    ```
    recv_end, send_end = multiprocessing.Pipe(False)

    process = multiprocessing.Process(target=self.execute_python_method, args=(send_end, func_name, *args))
    process.start()

    bag = {'process': process, 'timeout': False}
    timer = threading.Timer(self.configure.hook_method_max_execute_time, self.terminate_python_method, args=(bag,))
    timer.start()

    process.join()

    # 超时结束
    if bag['timeout']:
    raise HookMethodExecuteTimeOut(func_name)

    # 在规定时间内结束
    timer.cancel()
    result, ok = recv_end.recv()
    ```
    BeUself
        6
    BeUself  
    OP
       2024-03-27 13:55:11 +08:00
    @SmiteChow 感谢大佬留言
    BeUself
        7
    BeUself  
    OP
       2024-03-27 13:55:35 +08:00
    @foolishcrab 好的,我去看看
    BeUself
        8
    BeUself  
    OP
       2024-03-27 14:04:46 +08:00
    @Dongxiaohao 估计和我这个差不多的实现方式
    imaple
        9
    imaple  
       2024-03-27 15:00:05 +08:00
    听着好像 xxljob 就能解决, 定时调度自定义 python 代码
    BeUself
        10
    BeUself  
    OP
       2024-03-27 17:08:34 +08:00
    @imaple 多谢大佬,我去看看
    ryulxy
        11
    ryulxy  
       2024-03-27 20:21:59 +08:00
    我做过的事情和这个有点像,是 C++开发的引擎里嵌入 Python 虚拟机载入用户编写的 Python 脚本执行,可以动态载入 Python 脚本,脚本修改后可以热重载不用重启,然后捕获 Python 脚本编译阶段的报错不运行,和游戏引擎脚本差不多
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     3272 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 42ms UTC 00:45 PVG 08:45 LAX 16:45 JFK 19:45
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86