gRPC
概述
实践
|实现服务端流RPC
定义proto文件
syntax ="proto3";
option go_package = ".;StreamServer";
// 定义发送请求消息
message SimpleRequest{
string data = 1;
}
// 定义流式相应消息
message StreamResponse{
string stream_value = 1;
}
// 定义服务方法ListValue
service StreamServer {
// 流式服务端RPC,因此在returns的参数天 stream
rpc ListValue(SimpleRequest) returns(stream StreamResponse){};
}
编译proto文件
protoc --go_out=. *.proto
protoc --go-grpc_out=. *.proto
编写Server端的程序
-
• 主要是实现定义的ListValue方法
func (s *StreamServer) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
for n := 0; n < 5; n++ {
err := srv.Send(&pb.StreamResponse{
StreamValue: req.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
}
return nil
}
编写Client端的程序
func listValue() {
req := pb.SimpleRequest{
Data: "stream server grpc ",
}
stream, err := grpcClient.ListValue(context.Background(), &req)
if err != nil {
log.Fatalf("Call listvalue error:", err)
}
for {
res, err := stream.Recv()
// 判断消息流是否已经结束
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream error :", err)
}
log.Println(res.StreamValue)
}
}
完整代码在这里 https://github.com/FengZeHe/LearngRPC/tree/main/grpc-stream-server
|实现客户端流RPC
编写proto文件
syntax = "proto3";
option go_package = ".;ClientStream";
message StreamRequest{
string stream_data = 1;
}
message SimpleResponse {
int32 code = 1;
string value = 2;
}
service StreamClient{
rpc RouteList(stream StreamRequest) returns ( SimpleResponse){};
}
编写服务端程序
func (s *SimpleService) RouteList(srv pb.StreamClient_RouteListServer) error {
//从流中获取消息
for {
res, err := srv.Recv()
//如果已经读完了
if err == io.EOF {
return srv.SendAndClose(&pb.SimpleResponse{Value: "ok"})
}
if err != nil {
return err
}
log.Println(res.StreamData)
}
}
func main(){
...
//在grpc中注册服务
pb.RegisterStreamClientServer(grpcServer, &SimpleService{})
...
}
编写客户端程序
func routeList() {
stream, err := streamClient.RouteList(context.Background())
if err != nil {
log.Fatalf("upload list err", err)
}
for n := 0; n < 5; n++ {
err = stream.Send(&pb.StreamRequest{StreamData: "stream client rpc " + strconv.Itoa(n)})
if err != nil {
log.Fatalf("stream request err:", err)
}
}
//关闭流并获取返回的消息
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("routelist get response err", err)
}
log.Println(res.Value)
}
完整代码在这里 https://github.com/FengZeHe/LearngRPC/tree/main/grpc-stream-client
|实现双向流式RPC
概述
编写proto文件
syntax = "proto3";
option go_package = ".;StreamConversations";
message StreamRequest {
string question = 1;
}
message StreamResponse {
string answer = 1;
}
service StreamConversations {
rpc Conversations(stream StreamRequest) returns (stream StreamResponse){};
}
编译文件
protoc --go_out=. *.proto
protoc --go-grpc_out=. *.proto
编写服务端程序
//实现Conversations()方法
func (s *StreamService) Conversations(srv pb.StreamConversations_ConversationsServer) error {
n := 1
for {
req, err := srv.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = srv.Send(&pb.StreamResponse{
Answer: "from stream server answer: the " + strconv.Itoa(n) + " question is " + req.Question,
})
if err != nil {
return err
}
n++
log.Printf("from stream client question: %s", req.Question)
}
}
func main(){
...
//在gRPC中注册服务
pb.RegisterStreamConversationsServer(grpcServer, &StreamService{})
...
}
编写客户端程序
// 实现conversations方法
func conversations() {
stream, err := streamClient.Conversations(context.Background())
if err != nil {
log.Fatalf("stream failure")
}
for n := 0; n < 5; n++ {
err := stream.Send(&pb.StreamRequest{Question: "stream client rpc " + strconv.Itoa(n)})
if err != nil {
log.Fatalf("stream request err: %v", err)
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Conversations get stream err: %v", err)
}
// 打印返回值
log.Println(res.Answer)
}
err = stream.CloseSend()
if err != nil {
log.Fatalf("Conversations close stream err: %v", err)
}
}
func main(){
...
//建立gRPC连接
streamClient = pb.NewStreamConversationsClient(conn)
conversations()
...
}
完整代码在这里https://github.com/FengZeHe/LearngRPC/tree/main/grpc-stream-conversations
gRPC 拦截器
概述
拦截器有哪些类型
-
•
UnaryServerInterceptor
服务端拦截,在服务端接收请求的时候进行拦截。 -
•
UnaryClientInterceptor
客户端拦截器,在客户端真正发起调用之前,进行拦截。 -
•
StreamClientInterceptor
在流式客户端调用时,通过拦截 clientstream 的创建,返回一个自定义的 clientstream, 可以做一些额外的操作。 -
•
StreamServerInterceptor
在服务端接收到流式请求的时候进行拦截。
客户端
客户端一元拦截器(Client Interceptor)
客户端的一元拦截器类型为UnaryClientInterceptor
,实现分为预处理(pre-poressing)
、调用RPC方法(invoking RPC method)
和后处理(post-processing)
三个阶段。
参数含义如下:
-
•
ctx
:Go语言中的上下文,一般和Goroutine配合使用,起到超时控制的效果。 -
•
method
: 当前调用的RPC方法名 -
•
req
:本次请求的参数,只有在处理前阶段修改才有效 -
•
reply
:本次请求响应,需要在处理后 阶段才能获得 -
•
cc
: gRPC连接信息 -
•
invoker
:可以看做是当前RPC方法,一般在拦截器中调用invoker能达到调用RPC方法的效果,底层也是RPC处理 -
•
opts
:本次调用指定的options信息
type UnaryClientInterceptor func(
ctx context.Context,
method string,
req,
reply interface{},
cc *ClientConn,
invoker UnaryInvoker,
opts ...CallOption,
) error
客户端流拦截器 (Stream Interceptor)
type StreamClientInterceptor func(
ctx context.Context,
desc *StreamDesc,
cc *ClientConn,
method string,
streamer Streamer,
opts ...CallOption,
) (ClientStream, error)
异同
服务端
服务端一元拦截器
UnaryServerInterceptor
,一共包含4个参数,包括RPC上下文、RPC请求参数、RPC方法的所有信息、RPC方法真正执行的逻辑。type UnaryServerInterceptor func(
ctx context.Context,
req interface{},
info *UnaryServerInfo,
handler UnaryHandler,
) (resp interface{}, err error)
服务端流拦截器
服务端流拦截器类型为StreamServerInterceptor
,
type StreamClientInterceptor func(
ctx context.Context,
desc *StreamDesc,
cc *ClientConn,
method string,
streamer Streamer,
opts ...CallOption,
) (ClientStream, error)
实践
实现客户端和服务端的一元拦截器
-
• 客户端实现一元拦截器在
grpc.Dail()
指定即可
// 实现客户端一元拦截器
conn, err := grpc.Dial("127.0.0.1:9097", grpc.WithTransportCredentials(creds), grpc.WithUnaryInterceptor(LogUnaryClientIntercrptor()))
// 定义客户端一元拦截器 LogUnaryClientInterceptor()
func LogUnaryClientIntercrptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
...xxx
}
}
-
• 服务端实现一元拦截器在
grpc.NewServer()
指定即可
//服务端实现一元拦截器
grpcserver := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(LogUnaryServerInterceptor()))
// 服务端定义一元拦截器 定义名字LogUnaryServerInterceptor的一元拦截器
func LogUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
xxxx...
}
完整代码在这里:https://github.com/FengZeHe/LearngRPC/tree/main/go-grpc-interceptor
客户端的流拦截器
-
• 同样需要在
grpc.Dial
中指定流拦截器
// wrappedStream 用于包装 grpc.ClientStream 结构体并拦截其对应的方法。
type wrappedStream struct {
grpc.ClientStream
}
func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
return &wrappedStream{s}
}
// 实现RecvMsg方法
func (w *wrappedStream) RecvMsg(m interface{}) error {
fmt.Printf("Receive a message (Type: %T) at %v n", m, time.Now().Format(time.RFC3339))
return w.ClientStream.RecvMsg(m)
}
// 实现SendMsg方法
func (w *wrappedStream) SendMsg(m interface{}) error {
fmt.Printf("Send a message (Type: %T) at %v n", m, time.Now().Format(time.RFC3339))
return w.ClientStream.SendMsg(m)
}
// streamInterceptor 一个简单的 stream interceptor 示例。
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
// 返回的是自定义的封装过的 stream
return newWrappedStream(s), nil
}
服务端的流拦截器
-
• 服务端实现流拦截器也是类似的
type wrappedStream struct {
grpc.ServerStream
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}
func (w *wrappedStream) RevcMsg(m interface{}) (err error) {
fmt.Printf("Receive a message (Type: %T) at %s ", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) (err error) {
fmt.Printf("Send a message (Type %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
//包装 grpc.ServerStream 以替换 RecvMsg SendMsg这两个方法。
err := handler(srv, newWrappedStream(ss))
if err != nil {
fmt.Printf("RPC failed with error %v", err)
}
return err
}
完整代码在这里 https://github.com/FengZeHe/LearngRPC/tree/main/go-grpc-stream-interceptor
拦截器执行过程异同
一元拦截器
-
• 1)预处理
-
• 2)调用RPC方法
-
• 3)后处理
流拦截器
-
• 1)预处理
-
• 2)调用RPC方法 获取 Streamer
-
• 3)后处理
-
• 调用 SendMsg 、RecvMsg 之前
-
• 调用 SendMsg 、RecvMsg
-
• 调用 SendMsg 、RecvMsg 之后
Go gRPC Middleware
概述
项目地址
https://github.com/grpc-ecosystem/go-grpc-middleware
实践
-
• 使用
Middleware
实现多个拦截器
// 在项目中引用这个包
import "github.com/grpc-ecosystem/go-grpc-middleware"
// 在server端定义两个Log一元拦截器
func LogUnaryIntercptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
...
}
}
func LogUnaryIntercptorTwo() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
...
}
}
// 在main函数中加入这两个拦截器
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
LogUnaryIntercptorTwo(),
LogUnaryIntercptor(),
)),
)
go-grpc-gateway
|概述
|步骤
-
1. 写一个grpc服务器(编写proto文件+编译+客户端程序+服务端程序)
-
2. 引用两个文件
annotations.proto
和http.proto
从官方存储库googleapis下载: -
https://github.com/googleapis/googleapis/blob/master/google/api/annotations.proto
-
https://github.com/googleapis/googleapis/blob/master/google/api/http.proto
├── google
│ └── api
│ ├── annotations.proto
│ └── http.proto
└── hello
├── hello.pb.go
├── hello.proto
└── hello_grpc.pb.go
-
• 在proto文件中导入
anntotations.proto
import "google/api/annotations.proto";
-
proto文件添加gRPC注释(注释定义gRPC服务映射到JSON请求和响应,使用protobuf 时每个RPC服务必须使用google.api.HTTP来注释定义HTTP定义和路径)
service Hello {
rpc sayhello(HelloRequest) returns (HelloResponse){
//添加注释
option (google.api.http) = {
get: "/v1/hello/sayhello"
};
};
}
-
再次编译
protoc -I ./proto
--go_out ./proto --go_opt paths=source_relative
--go-grpc_out ./proto --go-grpc_opt paths=source_relative
--grpc-gateway_out ./proto --grpc-gateway_opt paths=source_relative
./proto/hello/hello.proto
编译过后会生成hello.pb.gw.go
文件
.
├── google
│ └── api
│ ├── annotations.proto
│ └── http.proto
└── hello
├── hello.pb.go
├── hello.pb.gw.go
├── hello.proto
└── hello_grpc.pb.go
-
在main函数中添加并提供gRPC-Gateway mux
func main(){
xxxx...
conn, err := grpc.DialContext(
context.Background(),
"0.0.0.0:9099",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("Failed to dial server:", err)
}
gwmux := runtime.NewServeMux()
err = pb.RegisterHelloHandler(context.Background(), gwmux, conn)
if err != nil {
log.Fatalln("Failed to register gateway:", err)
}
xxx...
}
完整代码在这里:https://github.com/FengZeHe/LearngRPC/tree/main/go-grpc-gateway
|引用
[1] https://www.lixueduan.com/posts/grpc/05-interceptor/
[2] https://github.com/iamrajiv/helloworld-grpc-gateway
[3]https://grpc-ecosystem.github.io/grpc-gateway/docs/tutorials/adding_annotations/#using-protoc
原文始发于微信公众号(ProgrammerHe):gRPC 笔记
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/207966.html