gRPC 笔记

gRPC

概述

这篇笔记主要介绍gRPC流、拦截器、middleware和grpcgateway。
当数据量大或者需要不断传输数据的时候,就应该使用流式RPC,它允许我们一边处理一边传输数据。流式RPC分为服务端流式RPC和客户端流式RPC服务端流式RPC过程是:客户端发送请求到服务器,拿到一个流读取返回的消息队列。客户端读取返回的流,直到里面没有任何消息。客户端流式RPC的过程是:客户端不断向服务端发送数据流,在发送结束后由服务端返回一个响应。

实践

|实现服务端流RPC

义proto文件

syntax ="proto3";
option go_package = ".;StreamServer";
// 定义发送请求消息
message SimpleRequest{
  string data = 1;
}
// 定义流式相应消息
message StreamResponse{
  string stream_value = 1;
}

// 定义服务方法ListValue
service StreamServer {
    // 流式服务端RPC,因此在returns的参数天 stream
  rpc ListValue(SimpleRequest) returns(stream StreamResponse){};
}

编译proto文件

protoc --go_out=. *.proto
protoc --go-grpc_out=. *.proto

编写Server端的程序

  • • 主要是实现定义的ListValue方法

func (s *StreamServer) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
    for n := 0; n < 5; n++ {
        err := srv.Send(&pb.StreamResponse{
            StreamValue: req.Data + strconv.Itoa(n),
        })
        if err != nil {
            return err
        }
    }
    return nil
}

编写Client端的程序

func listValue() {
    req := pb.SimpleRequest{
        Data: "stream server grpc ",
    }
    stream, err := grpcClient.ListValue(context.Background(), &req)
    if err != nil {
        log.Fatalf("Call listvalue error:", err)
    }

    for {
        res, err := stream.Recv()

        // 判断消息流是否已经结束
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("ListStr get stream error :", err)
        }
        log.Println(res.StreamValue)
    }
}

完整代码在这里 https://github.com/FengZeHe/LearngRPC/tree/main/grpc-stream-server

|实现客户端流RPC

编写proto文件

syntax = "proto3";
option  go_package = ".;ClientStream";


message StreamRequest{
  string stream_data = 1;
}

message SimpleResponse {
  int32 code = 1;
  string value = 2;
}

service StreamClient{
  rpc RouteList(stream StreamRequest) returns ( SimpleResponse){};
}

编写服务端程序

func (s *SimpleService) RouteList(srv pb.StreamClient_RouteListServer) error {
    //从流中获取消息
    for {
        res, err := srv.Recv()
    //如果已经读完了
        if err == io.EOF {
            return srv.SendAndClose(&pb.SimpleResponse{Value: "ok"})
        }
        if err != nil {
            return err
        }

        log.Println(res.StreamData)
    }
}


func main(){
  ...
  //在grpc中注册服务
  pb.RegisterStreamClientServer(grpcServer, &SimpleService{})
  ...
}

编写客户端程序

func routeList() {
    stream, err := streamClient.RouteList(context.Background())
    if err != nil {
        log.Fatalf("upload list err", err)
    }
    for n := 0; n < 5; n++ {
        err = stream.Send(&pb.StreamRequest{StreamData: "stream client rpc " + strconv.Itoa(n)})
        if err != nil {
            log.Fatalf("stream request err:", err)
        }
    }
    //关闭流并获取返回的消息
    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("routelist get response err", err)
    }
    log.Println(res.Value)
}

完整代码在这里 https://github.com/FengZeHe/LearngRPC/tree/main/grpc-stream-client

|实现双向流式RPC

概述

双向流式RPC的意思是客户端和服务端双方读写流发送消息序列,两个流单独操作,双方可以同时发送和接受消息。

编写proto文件

syntax = "proto3";
option go_package = ".;StreamConversations";

message StreamRequest {
     string question = 1;
}

message StreamResponse {
  string answer = 1;
}

service StreamConversations {
  rpc Conversations(stream StreamRequest) returns (stream StreamResponse){};
}

编译文件

protoc --go_out=. *.proto
protoc --go-grpc_out=. *.proto

编写服务端程序

//实现Conversations()方法
func (s *StreamService) Conversations(srv pb.StreamConversations_ConversationsServer) error {
    n := 1
    for {
        req, err := srv.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        err = srv.Send(&pb.StreamResponse{
            Answer: "from stream server answer: the " + strconv.Itoa(n) + " question is " + req.Question,
        })
        if err != nil {
            return err
        }
        n++
        log.Printf("from stream client question: %s", req.Question)
    }
}

func main(){
  ...
  //在gRPC中注册服务
    pb.RegisterStreamConversationsServer(grpcServer, &StreamService{})
  ...
}

编写客户端程序

// 实现conversations方法
func conversations() {
    stream, err := streamClient.Conversations(context.Background())
    if err != nil {
        log.Fatalf("stream failure")
    }
    for n := 0; n < 5; n++ {
        err := stream.Send(&pb.StreamRequest{Question: "stream client rpc " + strconv.Itoa(n)})
        if err != nil {
            log.Fatalf("stream request err: %v", err)
        }
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("Conversations get stream err: %v", err)
        }
        // 打印返回值
        log.Println(res.Answer)
    }
    err = stream.CloseSend()
    if err != nil {
        log.Fatalf("Conversations close stream err: %v", err)
    }
}


func main(){
  ...
  //建立gRPC连接
  streamClient = pb.NewStreamConversationsClient(conn)
  conversations()
  ...
}

完整代码在这里https://github.com/FengZeHe/LearngRPC/tree/main/grpc-stream-conversations

gRPC 拦截器

概述

gRPC提供了拦截器(Interceptor)功能,包括客户端拦截器和服务端拦截器。可以在接收到请求或者发起请求之前优先对请求中的数据做一些处理后再转交给指定服务处理相应;很适合做处理验证、日志等流程。

拦截器有哪些类型

  • • UnaryServerInterceptor 服务端拦截,在服务端接收请求的时候进行拦截。

  • • UnaryClientInterceptor 客户端拦截器,在客户端真正发起调用之前,进行拦截。

  • • StreamClientInterceptor 在流式客户端调用时,通过拦截 clientstream 的创建,返回一个自定义的 clientstream, 可以做一些额外的操作。

  • • StreamServerInterceptor 在服务端接收到流式请求的时候进行拦截。

客户端

客户端一元拦截器(Client Interceptor)

客户端的一元拦截器类型为UnaryClientInterceptor,实现分为预处理(pre-poressing)调用RPC方法(invoking RPC method)后处理(post-processing)三个阶段。

参数含义如下:

  • • ctx :Go语言中的上下文,一般和Goroutine配合使用,起到超时控制的效果。

  • • method: 当前调用的RPC方法名

  • • req :本次请求的参数,只有在处理前阶段修改才有效

  • • reply :本次请求响应,需要在处理后 阶段才能获得

  • • cc : gRPC连接信息

  • • invoker :可以看做是当前RPC方法,一般在拦截器中调用invoker能达到调用RPC方法的效果,底层也是RPC处理

  • • opts :本次调用指定的options信息

type UnaryClientInterceptor func(
    ctx context.Context, 
    method string
    req, 
    reply interface{}, 
    cc *ClientConn, 
    invoker UnaryInvoker, 
    opts ...CallOption,
error

客户端流拦截器 (Stream Interceptor)

客户端流拦截器的实现包括预处理和流操作拦截,并不能在事后进行RPC方法调用和后处理,而是拦截用户对流的操作。拦截器的区别也体现在请求参数上,如req参数变成了streamer。
type StreamClientInterceptor func(
    ctx context.Context, 
    desc *StreamDesc, 
    cc *ClientConn, 
    method string
    streamer Streamer, 
    opts ...CallOption,
) (ClientStream, error)

异同

流式拦截器同样分为三个阶段:预处理、调用RPC方法、后处理预处理阶段和一元拦截器类似,但后面两个阶段则不同;StreamAPI的请求和响应都是通过Stream进行传递的,更进一步是通过Streamer调用SendMsg和RecvMsg这两个方法获取的。然后Streamer又是低啊用RPC方法来获得,所以在流拦截器中我们可以对streamer进行包装,进而实现SendMsg和RecvMsg这两个方法。

服务端

服务端一元拦截器

服务端一元拦截器类型为UnaryServerInterceptor ,一共包含4个参数,包括RPC上下文、RPC请求参数、RPC方法的所有信息、RPC方法真正执行的逻辑。
type UnaryServerInterceptor func(
    ctx context.Context, 
    req interface{}, 
    info *UnaryServerInfo, 
    handler UnaryHandler,
) (resp interface{}, err error)

服务端流拦截器

服务端流拦截器类型为StreamServerInterceptor

type StreamClientInterceptor func(
    ctx context.Context, 
    desc *StreamDesc, 
    cc *ClientConn, 
    method string, 
    streamer Streamer, 
    opts ...CallOption,
) (ClientStream, error)

实践

实现客户端和服务端的一元拦截器

  • • 客户端实现一元拦截器在grpc.Dail()指定即可

  //  实现客户端一元拦截器
conn, err := grpc.Dial("127.0.0.1:9097", grpc.WithTransportCredentials(creds), grpc.WithUnaryInterceptor(LogUnaryClientIntercrptor()))
    
  // 定义客户端一元拦截器 LogUnaryClientInterceptor()
func LogUnaryClientIntercrptor() grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{},
        cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        ...xxx
    }
  }
  • • 服务端实现一元拦截器在grpc.NewServer()指定即可

//服务端实现一元拦截器
grpcserver := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(LogUnaryServerInterceptor()))
  
// 服务端定义一元拦截器 定义名字LogUnaryServerInterceptor的一元拦截器
func LogUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
   xxxx...
}

完整代码在这里:https://github.com/FengZeHe/LearngRPC/tree/main/go-grpc-interceptor

客户端的流拦截器

  • • 同样需要在grpc.Dial中指定流拦截器

// wrappedStream  用于包装 grpc.ClientStream 结构体并拦截其对应的方法。
type wrappedStream struct {
   grpc.ClientStream
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
    return &wrappedStream{s}
}

// 实现RecvMsg方法
func (w *wrappedStream) RecvMsg(m interface{}) error {
    fmt.Printf("Receive a message (Type: %T) at %v n", m, time.Now().Format(time.RFC3339))
    return w.ClientStream.RecvMsg(m)
}

// 实现SendMsg方法
func (w *wrappedStream) SendMsg(m interface{}) error {
    fmt.Printf("Send a message (Type: %T) at %v n", m, time.Now().Format(time.RFC3339))
    return w.ClientStream.SendMsg(m)
}

// streamInterceptor 一个简单的 stream interceptor 示例。
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    s, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }
    // 返回的是自定义的封装过的 stream
    return newWrappedStream(s), nil
}

服务端的流拦截器

  • • 服务端实现流拦截器也是类似的

type wrappedStream struct {
    grpc.ServerStream
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
    return &wrappedStream{s}
}

func (w *wrappedStream) RevcMsg(m interface{}) (err error) {
    fmt.Printf("Receive a message (Type: %T) at %s ", m, time.Now().Format(time.RFC3339))
    return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) (err error) {
    fmt.Printf("Send a message (Type %T) at %v", m, time.Now().Format(time.RFC3339))
    return w.ServerStream.SendMsg(m)
}

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    //包装 grpc.ServerStream 以替换 RecvMsg SendMsg这两个方法。
    err := handler(srv, newWrappedStream(ss))
    if err != nil {
        fmt.Printf("RPC failed with error %v", err)
    }
    return err
}

完整代码在这里 https://github.com/FengZeHe/LearngRPC/tree/main/go-grpc-stream-interceptor

拦截器执行过程异同

一元拦截器

  • • 1)预处理

  • • 2)调用RPC方法

  • • 3)后处理

流拦截器

  • • 1)预处理

  • • 2)调用RPC方法 获取 Streamer

  • • 3)后处理

    • • 调用 SendMsg 、RecvMsg 之前

    • • 调用 SendMsg 、RecvMsg

    • • 调用 SendMsg 、RecvMsg 之后

Go gRPC Middleware

概述

Go gRPC Middleware是一个gRPC中间件,提供了拦截器的链式功能,常用来做身份认证、日志记录、监控、客户端重连等等。

项目地址

https://github.com/grpc-ecosystem/go-grpc-middleware

实践

  • • 使用Middleware实现多个拦截器

  // 在项目中引用这个包
  import "github.com/grpc-ecosystem/go-grpc-middleware"
  
  // 在server端定义两个Log一元拦截器
  func LogUnaryIntercptor() grpc.UnaryServerInterceptor {
      return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
          ...
    }
  }
  
  func LogUnaryIntercptorTwo() grpc.UnaryServerInterceptor {
      return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
          ...
      }
  }
  
  // 在main函数中加入这两个拦截器
  grpcServer := grpc.NewServer(
          grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
              LogUnaryIntercptorTwo(),
              LogUnaryIntercptor(),
          )),
      )

go-grpc-gateway

gRPC 笔记

|概述

gRPC-Gateway是一个protoc插件。它读取gRPC服务定义并生成一个反向代理服务器,该服务器将RESTFUL JSON API转换为gRPC;gRPC-Gateway 能同时提供 gRPC 和 RESTful 风格的 API。

|步骤

  1. 1. 写一个grpc服务器(编写proto文件+编译+客户端程序+服务端程序)

  2. 2. 引用两个文件 annotations.proto 和 http.proto从官方存储库googleapis下载: 

  3. https://github.com/googleapis/googleapis/blob/master/google/api/annotations.proto

  4. https://github.com/googleapis/googleapis/blob/master/google/api/http.proto

 ├── google
 │   └── api
 │       ├── annotations.proto
 │       └── http.proto
 └── hello
     ├── hello.pb.go
     ├── hello.proto
     └── hello_grpc.pb.go
  • • 在proto文件中导入 anntotations.protoimport "google/api/annotations.proto";

  1. proto文件添加gRPC注释(注释定义gRPC服务映射到JSON请求和响应,使用protobuf 时每个RPC服务必须使用google.api.HTTP来注释定义HTTP定义和路径)

 service Hello {
   rpc sayhello(HelloRequest) returns (HelloResponse){
   //添加注释
     option (google.api.http) = {
       get: "/v1/hello/sayhello"
     };
   };
 }
  1. 再次编译

 protoc -I ./proto 
    --go_out ./proto --go_opt paths=source_relative 
    --go-grpc_out ./proto --go-grpc_opt paths=source_relative 
    --grpc-gateway_out ./proto --grpc-gateway_opt paths=source_relative 
    ./proto/hello/hello.proto

编译过后会生成hello.pb.gw.go文件

 .
 ├── google
 │   └── api
 │       ├── annotations.proto
 │       └── http.proto
 └── hello
     ├── hello.pb.go
     ├── hello.pb.gw.go
     ├── hello.proto
     └── hello_grpc.pb.go
  1. 在main函数中添加并提供gRPC-Gateway mux

     func main(){
     xxxx...
     conn, err := grpc.DialContext(
       context.Background(),
       "0.0.0.0:9099",
       grpc.WithBlock(),
       grpc.WithTransportCredentials(insecure.NewCredentials()),
     )
     if err != nil {
       log.Fatalf("Failed to dial server:", err)
     }
     gwmux := runtime.NewServeMux()
     err = pb.RegisterHelloHandler(context.Background(), gwmux, conn)
     if err != nil {
       log.Fatalln("Failed to register gateway:", err)
     }
     
     xxx...
     }
  • 用postman测试接口

    gRPC 笔记


完整代码在这里:https://github.com/FengZeHe/LearngRPC/tree/main/go-grpc-gateway

|引用

[1] https://www.lixueduan.com/posts/grpc/05-interceptor/

[2] https://github.com/iamrajiv/helloworld-grpc-gateway

[3]https://grpc-ecosystem.github.io/grpc-gateway/docs/tutorials/adding_annotations/#using-protoc

原文始发于微信公众号(ProgrammerHe):gRPC 笔记

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/207966.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!