
http 请求 stream 响应时,response body 打印出来是正确的结果:
{"result":{"code":1,"msg":"1111"}} {"result":{"code":2,"msg":"2222"}} {"result":{"code":3,"msg":"3333"}} {"result":{"code":4,"msg":"4444"}} {"result":{"code":5,"msg":"5555"}} {"result":{"code":6,"msg":"6666"}} 但是使用runtime.JSONPb.Decode 时,也会得到五个结果,但每个 decode 出来是个 nil... :
=== RUN TestHttpRespStream service_test.go:147: resp: <nil> service_test.go:147: resp: <nil> service_test.go:147: resp: <nil> service_test.go:147: resp: <nil> service_test.go:147: resp: <nil> service_test.go:147: resp: <nil> service_test.go:149: EOF --- PASS: TestHttpRespStream (0.62s) 这是 proto 文件:
// ./pb/test.proto syntax = "proto3"; package pb; option go_package = "/pb;pb"; import "google/api/annotations.proto"; message Req { int32 id = 1; string name = 2; } message Resp { int32 code = 1; string msg = 2; } service TestService { rpc QueryStreamResp(Req) returns (stream Resp){ option (google.api.http) = { post: "/query-stream-resp" body: "*" }; }; rpc QueryStreamReq(stream Req) returns (Resp){ option (google.api.http) = { post: "/query-stream-req" body: "*" }; }; rpc Query(stream Req) returns (stream Resp){ option (google.api.http) = { post: "/query" body: "*" }; }; } grpc 服务端:
func (ts *TestService) QueryStreamResp(req *pb.Req, stream pb.TestService_QueryStreamRespServer) error { log.Printf("QueryStreamResp|start...|req: %+v\n", req) result := []*pb.Resp{ {Code: 1, Msg: "1111"}, {Code: 2, Msg: "2222"}, {Code: 3, Msg: "3333"}, {Code: 4, Msg: "4444"}, {Code: 5, Msg: "5555"}, {Code: 6, Msg: "6666"}, } // header := make(metadata.MD) // header.Append("content-type", "application/json") // stream.SendHeader(header) for i := range result { log.Printf("resp: %+v", result[i]) if err := stream.Send(result[i]); err != nil { log.Fatal(err) } time.Sleep(100 * time.Millisecond) } log.Println("QueryStreamResp|stop...") return nil } 单元测试:
func TestHttpRespStream(t *testing.T) { url := "http://127.0.0.1:8080/query-stream-resp" reqData := &pb.Req{Id: 1, Name: "111"} var buffer bytes.Buffer encoder := (&runtime.JSONPb{}).NewEncoder(&buffer) if err := encoder.Encode(reqData); err != nil { t.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buffer) if err != nil { t.Fatal(err) } resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatal(err) } defer resp.Body.Close() // body, err := ioutil.ReadAll(resp.Body) // if err != nil { // t.Fatal(err) // } // t.Logf("body: %s", string(body)) jsonb := new(runtime.JSONPb) dencoder := jsonb.NewDecoder(resp.Body) for { var result *pb.Resp err := dencoder.Decode(result) if err == nil { t.Logf("resp: %+v", result) } else { t.Logf("%+v", err) break } } } 1 HUNYXV OP |