
gRPC 的 C++ 异步接口十分不友好,尤其是对于需要支持并发的服务端来说,官方的例子是实现一个小型状态机进行请求处理。是否可以将 gRPC 与 C++20 的协程结合,编写出简单易懂的代码呢?
实现细节请阅读博客,在此仅展示最终代码。
协议:
syntax = "proto3"; package sample; service SampleService { // Unary RPC:最简单的请求-响应模式 rpc Echo(EchoRequest) returns (EchoResponse); // 服务端流式:服务端持续向客户端推送数据 rpc GetNumbers(GetNumbersRequest) returns (stream Number); // 客户端流式:客户端持续向服务端发送数据,服务端返回一个结果 rpc Sum(stream Number) returns (SumResponse); // 双向流式:双方都可以持续发送和接收数据 rpc Chat(stream ChatMessage) returns (stream ChatMessage); } message EchoRequest { string message = 1; } message EchoResponse { string message = 1; int64 timestamp = 2; } message GetNumbersRequest { int32 value = 1; int32 count = 2; } message Number { int32 value = 1; } message SumResponse { int32 total = 1; int32 count = 2; } message ChatMessage { string user = 1; string cOntent= 2; int64 timestamp = 3; } 客户端:
class Client final : public GenericClient<sample::SampleService> { public: using GenericClient::GenericClient; static Client make(const std::string &address) { return Client{sample::SampleService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials()))}; } asyncio::task::Task<sample::EchoResponse> echo( sample::EchoRequest request, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_return co_await call(&sample::SampleService::Stub::async::Echo, std::move(context), std::move(request)); } asyncio::task::Task<void> getNumbers( sample::GetNumbersRequest request, asyncio::Sender<sample::Number> sender, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_await call( &sample::SampleService::Stub::async::GetNumbers, std::move(context), std::move(request), std::move(sender) ); } asyncio::task::Task<sample::SumResponse> sum( asyncio::Receiver<sample::Number> receiver, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_return co_await call(&sample::SampleService::Stub::async::Sum, std::move(context), std::move(receiver)); } asyncio::task::Task<void> chat( asyncio::Receiver<sample::ChatMessage> receiver, asyncio::Sender<sample::ChatMessage> sender, std::unique_ptr<grpc::ClientContext> cOntext= std::make_unique<grpc::ClientContext>() ) { co_return co_await call( &sample::SampleService::Stub::async::Chat, std::move(context), std::move(receiver), std::move(sender) ); } }; asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) { auto client = Client::make("localhost:50051"); co_await all( // Unary RPC asyncio::task::spawn([&]() -> asyncio::task::Task<void> { sample::EchoRequest req; req.set_message("Hello gRPC!"); const auto resp = co_await client.echo(req); fmt::print("Echo: {}\n", resp.message()); }), // 服务端流 + 客户端流,用 channel 串联 asyncio::task::spawn([&]() -> asyncio::task::Task<void> { sample::GetNumbersRequest req; req.set_value(1); req.set_count(5); auto [sender, receiver] = asyncio::channel<sample::Number>(); const auto result = co_await all( client.getNumbers(req, std::move(sender)), client.sum(std::move(receiver)) ); const auto &resp = std::get<sample::SumResponse>(result); fmt::print("Sum: {}, count: {}\n", resp.total(), resp.count()); }), // 双向流 asyncio::task::spawn([&]() -> asyncio::task::Task<void> { auto [inSender, inReceiver] = asyncio::channel<sample::ChatMessage>(); auto [outSender, outReceiver] = asyncio::channel<sample::ChatMessage>(); co_await all( client.chat(std::move(outReceiver), std::move(inSender)), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { sample::ChatMessage msg; msg.set_content("Hello server!"); co_await asyncio::error::guard(outSender.send(std::move(msg))); outSender.close(); }), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { const auto msg = co_await asyncio::error::guard(inReceiver.receive()); fmt::print("Chat reply: {}\n", msg.content()); }) ); }) ); } 服务端:
class Server final : public GenericServer<sample::SampleService> { public: using GenericServer::GenericServer; static Server make(const std::string &address) { auto service = std::make_unique<sample::SampleService::AsyncService>(); grpc::ServerBuilder builder; builder.AddListeningPort(address, grpc::InsecureServerCredentials()); builder.RegisterService(service.get()); auto completiOnQueue= builder.AddCompletionQueue(); auto server = builder.BuildAndStart(); return {std::move(server), std::move(service), std::move(completionQueue)}; } private: // Unary:直接返回 Response ,错误自动转换为 gRPC 错误状态 static asyncio::task::Task<sample::EchoResponse> echo(sample::EchoRequest request) { sample::EchoResponse response; response.set_message(request.message()); response.set_timestamp(std::time(nullptr)); co_return response; } // 服务端流:接受 Writer ,逐个写入 static asyncio::task::Task<void> getNumbers(sample::GetNumbersRequest request, Writer<sample::Number> writer) { for (int i = 0; i < request.count(); ++i) { sample::Number number; number.set_value(request.value() + i); co_await writer.write(number); } } // 客户端流:接受 Reader ,读取并聚合 static asyncio::task::Task<sample::SumResponse> sum(Reader<sample::Number> reader) { int total{0}, count{0}; while (const auto number = co_await reader.read()) { total += number->value(); ++count; } sample::SumResponse response; response.set_total(total); response.set_count(count); co_return response; } // 双向流:读一条,回一条 static asyncio::task::Task<void> chat(Stream<sample::ChatMessage, sample::ChatMessage> stream) { while (const auto message = co_await stream.read()) { sample::ChatMessage response; response.set_user("Server"); response.set_timestamp(std::time(nullptr)); response.set_content(fmt::format("Echo: {}", message->content())); co_await stream.write(response); } } // 将方法指针和 handler 绑定,启动各 RPC 的监听循环 asyncio::task::Task<void> dispatch() override { co_await all( handle(&sample::SampleService::AsyncService::RequestEcho, echo), handle(&sample::SampleService::AsynService::RequestGetNumbers, getNumbers), handle(&sample::SampleService::AsyncService::RequestSum, sum), handle(&sample::SampleService::AsyncService::RequestChat, chat) ); } }; asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) { auto server = Server::make("0.0.0.0:50051"); auto signal = asyncio::Signal::make(); co_await race( asyncio::task::spawn([&]() -> asyncio::task::Task<void> { asyncio::sync::Event event; co_await asyncio::task::Cancellable{ all( server.run(), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { co_await asyncio::error::guard(event.wait()); co_await server.shutdown(); // 通知 gRPC 服务器关闭 }) ), [&]() -> std::expected<void, std::error_code> { event.set(); // 触发 shutdown 流程 return {}; } }; }), asyncio::task::spawn([&]() -> asyncio::task::Task<void> { co_await asyncio::error::guard(signal.on(SIGINT)); }) ); } 正常运行:

连接失败:

错误信息友好,包含原始的 gRPC 错误信息,以及协程调用栈。