
最近在测试 低延迟 rust tokio 组播,用 tokio 和 socket2 ,
设备是 aws c7gn.metal arm 架构 64 核心。 1 个 sender ,n 个 receiver 。发送时间戳。 设置 linux route 走 lo 网卡。
当 receiver20 个以内时。延迟大概是 40us 以内可控。 当 receiver 个数增加到 50-100 个以后, 延迟逐渐上升到 100~200us 以上。cpu 仍然占用很低,延迟显著增加,这是为什么呢?
求解 如何能让 5+ receiver 同时接收, 还能保持 50us 以下延迟?
use serde::{Deserialize, Serialize}; use std::net::{Ipv4Addr, SocketAddrV4}; const MULTICAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 0, 1); const MULTICAST_PORT: u16 = 3001; const BIND_ADDR: Ipv4Addr = Ipv4Addr::LOCALHOST; pub fn timestamp16() -> u128 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_micros() } /// Networking options. #[derive(argh::FromArgs)] struct Args { /// multicast address that the socket must join #[argh(option, short = 'a', default = "MULTICAST_ADDR")] addr: Ipv4Addr, /// specific port to bind the socket to #[argh(option, short = 'p', default = "MULTICAST_PORT")] port: u16, /// whether or not to allow the UDP socket /// to be reused by another application #[argh(switch)] is_sender: bool, } fn main() -> std::io::Result<()> { init_logger(); use socket2::{Domain, Protocol, Socket, Type}; let Args { addr, port, is_sender, } = argh::from_env(); println!("{} {} is_sender: {}", addr, port, is_sender); let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; socket.set_nonblocking(true)?; socket.set_reuse_address(true)?; socket.set_reuse_port(true)?; socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(1)?; socket.join_multicast_v4(&addr, &Ipv4Addr::LOCALHOST)?; let fin_addr = SocketAddrV4::new(addr, port); if is_sender { socket.bind(&SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into())?; } else { socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port).into())?; } let runtime = tokio::runtime::Builder::new_current_thread() .thread_name("network") .enable_all() .build()?; let udp = { let _guard = runtime.enter(); tokio::net::UdpSocket::from_std(socket.into())? }; runtime.block_on(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(1000)); if is_sender == false { interval = tokio::time::interval(tokio::time::Duration::from_secs(60 * 60 * 24)); } interval.tick().await; let mut buf = [0; 16]; loop { tokio::select! { recv_res = udp.recv_from(&mut buf) => { let (count, remote_addr) = recv_res.expect("cannot receive from socket"); let parsed = u128::from_be_bytes(buf[..count].try_into().unwrap()); let cost = timestamp16() - parsed; println!("{:?}", );!("recv {remote_addr} {parsed} {count} {cost}") } _ = interval.tick() => { let cur = timestamp16(); let input = cur.to_be_bytes(); udp.send_to(&input, fin_addr).await.expect("cannot send message to socket"); println!("{:?}", );!("send: {}", cur); } } } }); Ok(()) } 1 lsk569937453 2024-03-26 10:48:43 +08:00 https://stackoverflow.com/questions/76589659/does-multithreaded-tokio-run-task-on-a-single-os-thread 把 tokio::runtime::Builder::new_current_thread() 换成 new_multi_thread 试试?? |
2 EricJia OP @lsk569937453 试过, 这个我测试下来结果 new_current_thread 延迟会更低。 |
3 EricJia OP 部署方式: supervisor 开了 60 个 receiver 程序, 用 args 指定的 sender receiver ``` [program:sender1] stderr_logfile = /data/sender1 command = /home/ubuntu/test_udp --is_sender [program:receiver0] stderr_logfile = /data/receiver0 command = /home/ubuntu/test_udp [program:receiver1] stderr_logfile = /data/receiver1 command = /home/ubuntu/test_udp ... ``` |
4 检查下网卡软中断绑定 cpu 问题,有些机器绑定到 cpu0 可能出现性能瓶颈。 |
5 everfly 2024-03-27 10:08:40 +08:00 是不是 udp.send_to 导致的延时增加?可以考虑改成 unbound_channel 异步发送看看。 |
6 EricJia OP @everfly https://stackoverflow.com/questions/6866611/linux-multicast-sendto-performance-degrades-with-local-listeners 问题跟这个很像,listeners 增加后, 延迟以每个 2us 成倍增加。 已经设置 no_blocking, 跟 channel 没关系 |
7 EricJia OP @rainmote https://stackoverflow.com/questions/6866611/linux-multicast-sendto-performance-degrades-with-local-listeners 问题跟这个很像,listeners 增加后, 延迟以每个 listener 2us 线性增加。 似乎跟 cpu 没关系? |