
flask 项目需要访问阿里云上部署的 kubeflow api 服务, 这个 api 参数需要使用 query 传递, 因 query 构造过长报了 413,后考虑拆分 query 并发访问服务.
大致代码抽象如下
import asyncio import json import re import functools from typing import Dict, List import aiohttp from loguru import logger def kubeflow_auth_with_async(func): """做 kubeflow 的 auth""" @functools.wraps(func) async def wrapper(*args, **kwargs): async with aiohttp.ClientSession() as session: # login payload = { "username": conf.USERNAME, "password": conf.PASSWORD, } await session.post(conf.AIX_AUTH_URL, data=payload) # get req # async with session.get(conf.PIPELINE_URL) as response: # text = await response.text() respOnse= await session.get(conf.PIPELINE_URL) text = await response.text() pattern = r"/dex/auth/aix\?req=" index_beg = re.search(pattern, text).span() index_end = text.find('"', index_beg[1]) req = text[index_beg[1] : index_end] params = {"req": req} # login kubeflow with aix await session.get(conf.DEX_AUTH_URL, params=params) return await func(session, *args, **kwargs) return wrapper async def async_get_runs( session: aiohttp.ClientSession, page_size=1, page_token=None, experiment_id=None, filters=None, sort_by="created_at desc", ): if not filters: filters = {} query = { "page_size": page_size, "page_token": page_token, "sort_by": sort_by, "resource_reference_key.type": "EXPERIMENT", "resource_reference_key.id": experiment_id, "filter": json.dumps(filters), } respOnse= await session.get( url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query ) # async with session.get( # url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query # ) as response: # response.raise_for_status() # return await response.json() response.raise_for_status() return await response.json() @kubeflow_auth_with_async async def gather_fetch_runs( session: aiohttp.ClientSession, runs_lst:List[str], ): exp_id = "xxxxx" tasks = [] for sub_runs in runs_lst: filters: dict = { "predicates": [ { "key": "id", "op": FILTER_OPERATIONS.IN.value, "string_values": {"values": sub_runs}, }, ] } tasks.append( async_get_runs( session, page_size=len(sub_runs), experiment_id=exp_id, filters=filters, ) ) return await asyncio.gather(*tasks) res = [] for r in asyncio.run(gather_fetch_runs(["xxx","xxx","xx"])): res.extend(r.get("runs", [])) 出现的报错,不知道怎么贴图,手动概括一下异常
服务稳定运行一段时间后,会出现突然 500 刷新又可以坚挺一段时间 aiohttp.client_exceptions.ContentypTypeError 从报错信息上看像是 session 失效导致的,被 auth 重定向到了登录页. contentType 变成了 text/html 而请求的 url 重定向到了登录的 url,已经不是我传入的那个 因为没试过 async 函数的装饰器,不知道是不是这个问题, 还有就是 clientSession 的连接池是不是保存了 session 的状态. 装饰器每次都重新请求了.这个 session 按道理不应该还是一样的. 也试了,没有这个问题,不过访问次数多起来之后容易莫名 GG,大概率是被自己家部署的 kubeflow 反爬了?
1 amlee 2022 年 1 月 15 日 你可能是想在装饰函数里面获取 token ,然后 session 发出的每次请求的请求头都要带上这个 token ? 如果我理解没错的话,装饰器里面的 wrapper 只执行一次的,你可以看看服务端的 token 过期时间。 |
2 hadugen OP @amlee emm 服务器每次获取请求后, 内部实现都会用这个装饰器, 您指的 wrapper 执行一次, 我不是很理解, 表达的是 wrapper 函数外的作用域只执行一次么? 如果是这个意思 , 对于目前这个方式来讲 它应该是不影响的对么? 不知道我理解的对不对. |
3 amlee 2022 年 1 月 18 日 @tomtao00001 之前看你代码不仔细,我上面那个回答是错误的,不要理会上面那个回答了。 我重新读了一遍你的代码,你说的“每次获取请求,内部函数都会用这个装饰器”跟你的代码行为有误差。 你通过 gather_fetch_runs 构建一组了协程对象并发执行,但这一组协程对象通过 gather_fetch_runs 的 @kubeflow_auth_with_async 装饰器赋予了同一个 session 对象。当这一组协程并发运行足够长的时间,登录会超时。而你的登录状态是保存在同一个 session 中的,这一组协程共同使用这个 session ,所以登录超时以后这个 session 失效,你这一组并发的协程也会请求失败。 而你说的刷新又好了的情况,或许是重新运行了一遍 gather_fetch_runs ,这会导致重新构建一组协程,重新构建一个已登录的 session |