
import datetime import random import string import asyncio impot time import websockets from multiprocessing import Manager from concurrent import futures # 忽略警告 import warnings warnings.filterwarnings("ignore") strLen = 30 def putmsg(que, gvar): cc = 0 while True: cc += 1 if gvar['flag'] == True: break ranStr = '' for s in range(strLen): ranStr = ranStr + random.choice(string.ascii_letters + string.digits) # slTime = random.uniform(0.01,0.2) logStr = str(cc) + ' ' + "{:.2f}".format(slTime) + ' ' + str(datetime.datetime.now().replace(microsecOnd=0)) + ' ' + ranStr # print(logStr) # test # print(cc, '#', que.qsize()) que.put(logStr) # time.sleep(slTime) time.sleep(1.5) def wsock(queu, gvar): loop = asyncio.get_event_loop() async def stoploop(): loop.stop() # Maintain a list of connected clients connected_clients = set() async def register(websocket): # Add a new client to the list connected_clients.add(websocket) print('connected_clients.add(websocket)') async def unregister(websocket): # Remove a client from the list connected_clients.remove(websocket) print('connected_clients.remove(websocket)') async def broadcast(message): # Send a message to all connected clients if connected_clients: await asyncio.gather(*(client.send(message) for client in connected_clients)) async def echo(websocket, que=queu): await register(websocket) while True: if queu.qsize(): msgStr = queu.get() if connected_clients: try: await asyncio.gather(*(client.send(msgStr) for client in connected_clients)) except Exception as e: print(e) # echo:received 1001 (going away); then sent 1001 (going away) break else: asyncio.sleep(0.3) async for message in websocket: # Broadcast the received message to all clients if message == 'stop': gvar['flag'] = True await stoploop() await broadcast(message) await unregister(websocket) start_server = websockets.serve(echo, "172.17.0.2", 25299) asyncio.set_event_loop(loop) # loop.create_task(start_server) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() if __name__ == '__main__': # 队列 msgQue = Manager().Queue() # 全局变量 glovar = Manager().dict() # 启停开关 glovar['flag'] = False # 处理进程 proc = futures.ProcessPoolExecutor(max_workers=2) wsockRet = proc.submit(wsock, msgQue, glovar) putmsgRet = proc.submit(putmsg, msgQue, glovar) 问题是 websocket.serve 使用 echo 方法作为 handle ,
只有在 websocket 接口有事件的时候,才会调用 echo 进行处理,(被动式)
echo 的被动方法,队列里的日志越来越多,
想有一个永久循环,如果有 client(s),send 取出的队列内容,没有 client ,取出就 pass 了,
websocket.serve 被动调用不适合这个场合,看官方也没有更好的提示,
请教大家这里怎么换个方式实现呢?
1 julyclyde 2023-10-24 16:01:58 +08:00 看了一下这个库的说明,感觉好奇怪 为什么把“处理”和“IO”合在一个库里面啊 |
2 qazwsxkevin OP @julyclyde #1 是啊,官网上异步和线程的范例逻辑思想,是做了很高的包装,要用在其它场景,很难适合,打算过几天有时间再去仔细地看看 python websocket 官网的 API renference ,看看能不能粒度化用在我的场景上。。。 |