基于 C++20 协程编写 gRPC 客户端与服务端 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Hackerl
V2EX    C++

基于 C++20 协程编写 gRPC 客户端与服务端

  •  
  •   Hackerl 1 天前 463 次点击

    基于 C++20 协程编写 gRPC 客户端与服务端

    完整代码

    gRPCC++ 异步接口十分不友好,尤其是对于需要支持并发的服务端来说,官方的例子是实现一个小型状态机进行请求处理。是否可以将 gRPC 与 C++20 的协程结合,编写出简单易懂的代码呢?

    实现细节请阅读博客,在此仅展示最终代码。

    协议:

    syntax = "proto3"; package sample; service SampleService { // Unary RPC:最简单的请求-响应模式 rpc Echo(EchoRequest) returns (EchoResponse); // 服务端流式:服务端持续向客户端推送数据 rpc GetNumbers(GetNumbersRequest) returns (stream Number); // 客户端流式:客户端持续向服务端发送数据,服务端返回一个结果 rpc Sum(stream Number) returns (SumResponse); // 双向流式:双方都可以持续发送和接收数据 rpc Chat(stream ChatMessage) returns (stream ChatMessage); } message EchoRequest { string message = 1; } message EchoResponse { string message = 1; int64 timestamp = 2; } message GetNumbersRequest { int32 value = 1; int32 count = 2; } message Number { int32 value = 1; } message SumResponse { int32 total = 1; int32 count = 2; } message ChatMessage { string user = 1; string cOntent= 2; int64 timestamp = 3; } 

    客户端:

    class Client final : public GenericClient<sample::SampleService> { public: using GenericClient::GenericClient; static Client make(const std::string &address) { return Client{sample::SampleService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials()))}; } asyncio::task::Task<sample::EchoResponse> echo( sample::EchoRequest request, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_return co_await call(&sample::SampleService::Stub::async::Echo, std::move(context), std::move(request)); } asyncio::task::Task<void> getNumbers( sample::GetNumbersRequest request, asyncio::Sender<sample::Number> sender, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_await call( &sample::SampleService::Stub::async::GetNumbers, std::move(context), std::move(request), std::move(sender) ); } asyncio::task::Task<sample::SumResponse> sum( asyncio::Receiver<sample::Number> receiver, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_return co_await call(&sample::SampleService::Stub::async::Sum, std::move(context), std::move(receiver)); } asyncio::task::Task<void> chat( asyncio::Receiver<sample::ChatMessage> receiver, asyncio::Sender<sample::ChatMessage> sender, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_return co_await call( &sample::SampleService::Stub::async::Chat, std::move(context), std::move(receiver), std::move(sender) ); } }; asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) { auto client = Client::make("localhost:50051"); co_await all( // Unary RPC asyncio::task::spawn([&]() -> asyncio::task::Task<void> { sample::EchoRequest req; req.set_message("Hello gRPC!"); const auto resp = co_await client.echo(req); fmt::print("Echo: {}\n", resp.message()); }), // 服务端流 + 客户端流,用 channel 串联 asyncio::task::spawn([&]() -> asyncio::task::Task<void> { sample::GetNumbersRequest req; req.set_value(1); req.set_count(5); auto [sender, receiver] = asyncio::channel<sample::Number>(); const auto result = co_await all( client.getNumbers(req, std::move(sender)), client.sum(std::move(receiver)) ); const auto &resp = std::get<sample::SumResponse>(result); fmt::print("Sum: {}, count: {}\n", resp.total(), resp.count()); }), // 双向流 asyncio::task::spawn([&]() -> asyncio::task::Task<void> { auto [inSender, inReceiver] = asyncio::channel<sample::ChatMessage>(); auto [outSender, outReceiver] = asyncio::channel<sample::ChatMessage>(); co_await all( client.chat(std::move(outReceiver), std::move(inSender)), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { sample::ChatMessage msg; msg.set_content("Hello server!"); co_await asyncio::error::guard(outSender.send(std::move(msg))); outSender.close(); }), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { const auto msg = co_await asyncio::error::guard(inReceiver.receive()); fmt::print("Chat reply: {}\n", msg.content()); }) ); }) ); } 

    服务端:

    class Server final : public GenericServer<sample::SampleService> { public: using GenericServer::GenericServer; static Server make(const std::string &address) { auto service = std::make_unique<sample::SampleService::AsyncService>(); grpc::ServerBuilder builder; builder.AddListeningPort(address, grpc::InsecureServerCredentials()); builder.RegisterService(service.get()); auto completiOnQueue= builder.AddCompletionQueue(); auto server = builder.BuildAndStart(); return {std::move(server), std::move(service), std::move(completionQueue)}; } private: // Unary:直接返回 Response ,错误自动转换为 gRPC 错误状态 static asyncio::task::Task<sample::EchoResponse> echo(sample::EchoRequest request) { sample::EchoResponse response; response.set_message(request.message()); response.set_timestamp(std::time(nullptr)); co_return response; } // 服务端流:接受 Writer ,逐个写入 static asyncio::task::Task<void> getNumbers(sample::GetNumbersRequest request, Writer<sample::Number> writer) { for (int i = 0; i < request.count(); ++i) { sample::Number number; number.set_value(request.value() + i); co_await writer.write(number); } } // 客户端流:接受 Reader ,读取并聚合 static asyncio::task::Task<sample::SumResponse> sum(Reader<sample::Number> reader) { int total{0}, count{0}; while (const auto number = co_await reader.read()) { total += number->value(); ++count; } sample::SumResponse response; response.set_total(total); response.set_count(count); co_return response; } // 双向流:读一条,回一条 static asyncio::task::Task<void> chat(Stream<sample::ChatMessage, sample::ChatMessage> stream) { while (const auto message = co_await stream.read()) { sample::ChatMessage response; response.set_user("Server"); response.set_timestamp(std::time(nullptr)); response.set_content(fmt::format("Echo: {}", message->content())); co_await stream.write(response); } } // 将方法指针和 handler 绑定,启动各 RPC 的监听循环 asyncio::task::Task<void> dispatch() override { co_await all( handle(&sample::SampleService::AsyncService::RequestEcho, echo), handle(&sample::SampleService::AsynService::RequestGetNumbers, getNumbers), handle(&sample::SampleService::AsyncService::RequestSum, sum), handle(&sample::SampleService::AsyncService::RequestChat, chat) ); } }; asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) { auto server = Server::make("0.0.0.0:50051"); auto signal = asyncio::Signal::make(); co_await race( asyncio::task::spawn([&]() -> asyncio::task::Task<void> { asyncio::sync::Event event; co_await asyncio::task::Cancellable{ all( server.run(), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { co_await asyncio::error::guard(event.wait()); co_await server.shutdown(); // 通知 gRPC 服务器关闭 }) ), [&]() -> std::expected<void, std::error_code> { event.set(); // 触发 shutdown 流程 return {}; } }; }), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { co_await asyncio::error::guard(signal.on(SIGINT)); }) ); } 

    正常运行:

    正常运行截图

    连接失败:

    连接失败截图

    错误信息友好,包含原始的 gRPC 错误信息,以及协程调用栈。

    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1679 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 35ms UTC 16:24 PVG 00:24 LAX 09:24 JFK 12:24
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86