grpc-stream go版简解
基础知识介绍
gRPC (gRPCRemote Procedure Calls) 是Google发起的一个开源的远程过程调用 (Remote procedure call) 系统。该系统基于 HTTP/2 协议传输,使用Protocol Buffers 工具来序列化结构化数据。
protobuf
protobuf 全称 Google Protocol Buffers,是 google 开发的的一套用于数据存储,网络通信时用于协议编解码的工具库,proto 文件是以 xxx.proto 命名。
stream
Stream 顾名思义就是一种流,可以源源不断的推送数据,很适合大数据传输,或者服务端和客户端长时间数据交互的场景。
使用场景
1:股票app:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端
2:数据上传:几百万的数据上传处理,若普通rpc则需要全部传输成功后才进行处理,而stream则会再接受第一条数据后就开始处理
3:数据交换:比如LOL,王者荣耀等竞技游戏,client和server之间需要非常频繁地交换数据
ServiceAPI类型
gRPC 中的 Service API 有如下4种类型:
SimpleAPI:普通rpc
ServerStreaming:服务器流式响应
ClientStreaming:客户端流式发送
BidirectionalStreaming:双向流
操作实战
代码说明:通过传入名字查询对应的结果
下载protoc
wget https://github.com/protocolbuffers/protobuf/releases/download/v21.2/protoc-21.2-linux-x86_64.zip
unzip protoc-21.2-linux-x86_64.zip
mv protoc-21.2-linux-x86_64 /usr/local/protoc
写入环境变量
将protoc命令写入path中,编辑
/etc/profile
文件
export PROTOC_HOME=/usr/local/protoc
export PATH=$PROTOC_HOME/bin:$PATH
目录说明
创建一个grpc-stream-example目录存放演示代码
tree
.
├── client
│ ├── go.mod
│ ├── go.sum
│ ├── main.go
│ └── protos
│ └── ServerStream.proto
├── README.md
└── server
├── controllers
│ └── Message.go
├── go.mod
├── go.sum
├── main.go
└── protos
└── ServerStream.proto
5 directories, 10 files
创建client和server目录,分别存放server端和client端代码,进入各自的目录初始化go mod文件,以及创建protos目录存放proto文件
定义proto文件
syntax = "proto3";
package protos;
option go_package ="/protos";
message SendMessage {
string name = 1;
}
message ReceiveMessage {
string name = 1;
int32 age =2;
string address=3;
}
service Message {
rpc Send(SendMessage) returns (ReceiveMessage) {}
rpc SendServerStream(SendMessage) returns (stream ReceiveMessage) {}
rpc SendClientStream(stream SendMessage) returns (ReceiveMessage) {}
rpc SendBidirectionalStream(stream SendMessage) returns (stream ReceiveMessage) {}
}
syntax:声明proto的版本 只有 proto3 才支持 gRPC
option go_package: 将编译后文件输出在/protos目录
package: 指定当前proto文件属于protos包
message:定义具体的请求或响应的参数,类似于struct的定义
service: 定义方法,这里我们定义四个方法来分别测试普通的api和具有不同stream api的功能,类似于interface的定义
要实现流式响应或请求,只需在方法中对请求或响应使用stream标记即可
定义一个请求体为SendMessage的message,其中字段为name,通过传一个name来获取对应的信息
定义一个响应体为ReceiveMessage的message,其中定义其对象的属性信息,包括名称,年龄和地址
我们定义一个方法,其传入一个SendMessage结构的message,返回一个ReceiveMessage结构的message
Send: 普通api
SendServerStream: 服务器流式响应
SendClientStream:客户端流式发送
SendBidirectionalStream:双向流
我们这里测试,server端和client端使用的同一个proto文件即可,定义完成后编译生成go代码
编译文件
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
protoc --go_out=./server --go-grpc_out=./server ./server/protos/*.proto
protoc --go_out=./client --go-grpc_out=./client ./client/protos/*.proto
此时,我们可以看到protos目录下编译生成的go文件
├── client
│ ├── go.mod
│ ├── go.sum
│ ├── main.go
│ └── protos
│ ├── ServerStream_grpc.pb.go
│ ├── ServerStream.pb.go
│ └── ServerStream.proto
├── README.md
└── server
├── controllers
│ └── Message.go
├── go.mod
├── go.sum
├── main.go
└── protos
├── ServerStream_grpc.pb.go
├── ServerStream.pb.go
└── ServerStream.proto
5 directories, 14 files
我们可以看见在protos目录下自动生成了一个
_grpc.pb.go
的文件中,自动生成了一个名字叫MessageServer
的interface接口,我们只需要实现里面对应的定义好的方法即可。
type MessageServer interface {
Send(context.Context, *SendMessage) (*ReceiveMessage, error)
SendServerStream(*SendMessage, Message_SendServerStreamServer) error
SendClientStream(Message_SendClientStreamServer) error
SendBidirectionalStream(Message_SendBidirectionalStreamServer) error
mustEmbedUnimplementedMessageServer()
}
注意每个方法传入的参数不一样,我们在实现这些方法的时候也要传入对应类型的参数,这些代码都是自动生成的,不用管,我们只需要实现它们即可
使用普通api
创建controllers目录存放具体的业务逻辑代码
server端代码
controllers/Message.go
package controllers
import (
"context"
"grpc.server/protos"
"io"
"log"
"strconv"
"strings"
"time"
)
type Message struct {
protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
resp := &protos.ReceiveMessage{
Name: req.Name,
Age: 18,
Address: "beijing",
}
return resp, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
return nil
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
return nil
}
这里我们先实现普通普通api的方法
Send
,我们在里面直接返回ReceiveMessage并填充其字段,其他方法先直接返回一个nil即可
main.go
package main
import (
"google.golang.org/grpc"
"grpc.server/controllers"
"grpc.server/protos"
"log"
"net"
)
func main() {
address := ":" + "8888"
listener, err := net.Listen("tcp", address)
if err != nil {
log.Println(err)
}
server := grpc.NewServer()
protos.RegisterMessageServer(server, &controllers.Message{})
err = server.Serve(listener)
if err != nil {
log.Println(err)
}
}
使用
net.Listen
启动一个tcp socket,并监听到8888端口,然后使用grpc.NewServer()
启动一个grpc服务,接下来我们写一个client去测试
client端代码
将server端定义好的
.proto
文件拷贝过来,同样执行protoc命令进行编译生成go文件
main.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc.client/protos"
"io"
"log"
"strconv"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer conn.Close()
c := protos.NewMessageClient(conn)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
send(c)
}
func send(c protos.MessageClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.Send(ctx, &protos.SendMessage{Name: "SimpleAPI"})
if err != nil {
panic(err)
}
log.Printf("Revice Ms: %s", r)
}
我们这里通过
grpc.Dial
来建立一个连接,并忽略证书验证,然后直接调用protos下的生成好的c.Send
方法并传入对应字段即可。我定义send函数和c.Send没关系,我只是把名字取成一样的了
测试
启动server端:go run main.go
启动client端:go run main.go
2022/07/08 10:07:02 Revice Ms: name:"SimpleAPI" age:18 address:"beijing"
我们会发现他返回了对应的数据
服务器端流式响应
股票app:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端
图片搜索:客户端发送一张图片,陆续返回给它相似的图片
server 端代码
controllers/Message.go
package controllers
import (
"context"
"grpc.server/protos"
"io"
"log"
"strconv"
"strings"
"time"
)
type Message struct {
protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
return nil,nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
var i int64
for i = 0; i < 10; i++ {
resp := &protos.ReceiveMessage{
Name: req.Name + strconv.FormatInt(i, 10),
Age: 18,
Address: "beijing",
}
err := stream.Send(resp)
if err != nil {
return err
}
time.Sleep(1 * time.Second)
}
return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
return nil
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
return nil
}
每隔一秒钟,响应一次客户端,一共响应十次,这里有个stream,这个不是关键字,我是用来区分这个时流方式还是普通方式,在流式响应的时候,使用了
stream.Sned
方法来返回给客户端,而不是直接return
client 端代码
main.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc.client/protos"
"io"
"log"
"strconv"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer conn.Close()
c := protos.NewMessageClient(conn)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
//send(c)
sendServerStream(c)
//sendClientStream(c)
//sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {}
func sendServerStream(c protos.MessageClient) {
stream, err := c.SendServerStream(context.Background(), &protos.SendMessage{Name: "ServerStream"})
if err != nil {
panic(err)
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Recv error:%v", err)
continue
}
log.Printf("Recv Ms:%v", resp)
}
}
func sendClientStream(c protos.MessageClient) {}
func sendBidirectionalStream(c protos.MessageClient) {}
此时,我们调用一下,看下结果
go run main.go
2022/07/08 10:15:18 Recv Ms:name:"ServerStream0" age:18 address:"beijing"
2022/07/08 10:15:19 Recv Ms:name:"ServerStream1" age:18 address:"beijing"
2022/07/08 10:15:20 Recv Ms:name:"ServerStream2" age:18 address:"beijing"
2022/07/08 10:15:21 Recv Ms:name:"ServerStream3" age:18 address:"beijing"
2022/07/08 10:15:22 Recv Ms:name:"ServerStream4" age:18 address:"beijing"
2022/07/08 10:15:23 Recv Ms:name:"ServerStream5" age:18 address:"beijing"
2022/07/08 10:15:24 Recv Ms:name:"ServerStream6" age:18 address:"beijing"
2022/07/08 10:15:25 Recv Ms:name:"ServerStream7" age:18 address:"beijing"
2022/07/08 10:15:26 Recv Ms:name:"ServerStream8" age:18 address:"beijing"
2022/07/08 10:15:27 Recv Ms:name:"ServerStream9" age:18 address:"beijing"
我们会发现我们执行了一次,服务的陆续返回了十次结果,并且在接受数据的时候使用了
stream.Recv()
来接受流数据,使用err == io.EOF
来判断流是否发送完成
客户端流式发送
数据上传:几百万的数据上传处理,若普通rpc则需要全部传输成功后才进行处理,而stream则会再接受第一条数据后就开始处理
server 端代码
controllers/Message.go
package controllers
import (
"context"
"grpc.server/protos"
"io"
"log"
"strconv"
"strings"
"time"
)
type Message struct {
protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
return nil, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
var names []string
for {
resp := &protos.ReceiveMessage{
Name: "ClientStream Processing Completed" + strings.Join(names, ","),
Age: 18,
Address: "beijing",
}
res, err := stream.Recv()
log.Println(names)
if err == io.EOF {
err := stream.SendAndClose(resp)
if err != nil {
return err
}
return nil
}
if err != nil {
log.Printf("failed to recv: %v", err)
return err
}
names = append(names, res.Name)
}
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
return nil
}
我们这里写个死循环来接受客户端的请求,使用
stream.Recv()
方法来接受数据,err == io.EOF
则表示数据已经全部获取了,最后通过stream.SendAndClose()
返回响应。这里并不会关闭连接,一直在循环处理请求的数据,关闭连接是在客户端做的
client 端代码
main.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc.client/protos"
"io"
"log"
"strconv"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer conn.Close()
c := protos.NewMessageClient(conn)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
//send(c)
//sendServerStream(c)
sendClientStream(c)
//sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {}
func sendServerStream(c protos.MessageClient) {}
func sendClientStream(c protos.MessageClient) {
stream, err := c.SendClientStream(context.Background())
if err != nil {
panic(err)
}
for i := 0; i < 10; i++ {
err := stream.Send(&protos.SendMessage{Name: "ClientStream" + strconv.Itoa(i)})
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
resp, err := stream.CloseAndRecv()
if err != nil {
log.Printf("failed to recv: %v", err)
}
log.Printf("Recv Ms: %s", resp)
}
func sendBidirectionalStream(c protos.MessageClient) {}
通过多次调用
stream.Send()
像服务端推送多个数据,最后调用stream.CloseAndRecv()
关闭stream并接收服务端响应。
我们在客户端执行go run main.go后,服务端每秒都会输出信息:
2022/07/08 10:45:25 []
2022/07/08 10:45:26 [ClientStream0]
2022/07/08 10:45:27 [ClientStream0 ClientStream1]
2022/07/08 10:45:28 [ClientStream0 ClientStream1 ClientStream2]
2022/07/08 10:45:29 [ClientStream0 ClientStream1 ClientStream2 ClientStream3]
2022/07/08 10:45:30 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4]
2022/07/08 10:45:31 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5]
2022/07/08 10:45:32 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6]
2022/07/08 10:45:33 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6 ClientStream7]
2022/07/08 10:45:34 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6 ClientStream7 ClientStream8]
2022/07/08 10:45:35 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6 ClientStream7 ClientStream8 ClientStream9]
客户端在十秒后,会收到以下信息:
2022/07/08 10:57:10 Recv Ms: name:"ClientStream Processing CompletedClientStream0,ClientStream1,ClientStream2,ClientStream3,ClientStream4,ClientStream5,ClientStream6,ClientStream7,ClientStream8,ClientStream9" age:18 address:"beijing"
此时我们就会看见客户端在发送第一条数据开始,服务端就进行了响应,一直到发送完成后返回了一个最后的完成的消息
双向流
数据交换:比如LOL,王者荣耀等竞技游戏,client和server之间需要非常频繁地交换数据
server 端代码
controllers/Message.go
package controllers
import (
"context"
"grpc.server/protos"
"io"
"log"
"strconv"
"strings"
"time"
)
type Message struct {
protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
return nil, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
return nil
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Printf("failed to recv: %v", err)
return err
}
log.Println(in.Name)
resp := &protos.ReceiveMessage{
Name: "BidirectionalStream" + in.Name,
Age: 18,
Address: "beijing",
}
err = stream.Send(resp)
if err != nil {
return err
}
}
}
server端和client端都需要同时使用
stream.Recv()
和stream.Send()
两个方法同时接受和发送数据,并且都需要使用err==io.EOF
来判断数据是否传输完成
client 端代码
main.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc.client/protos"
"io"
"log"
"strconv"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer conn.Close()
c := protos.NewMessageClient(conn)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
//send(c)
//sendServerStream(c)
//sendClientStream(c)
sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {}
func sendServerStream(c protos.MessageClient) {}
func sendClientStream(c protos.MessageClient) {}
func sendBidirectionalStream(c protos.MessageClient) {
stream, err := c.SendBidirectionalStream(context.Background())
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
close(done)
return
}
if err != nil {
log.Fatal(err)
}
log.Printf("recv-server-%s", resp)
}
}()
var i int64
for i = 1; i < 10; i++ {
err := stream.Send(&protos.SendMessage{
Name: strconv.FormatInt(i, 10),
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
_ = stream.CloseSend()
<-done
}
我们在客户端使用协程来进行发送和接受流,服务端也能这样写,我这就在客户端写了,然后启动server端和client端的程序
服务端终端输出以下信息:
2022/07/08 11:04:35 1
2022/07/08 11:04:36 2
2022/07/08 11:04:37 3
2022/07/08 11:04:38 4
2022/07/08 11:04:39 5
2022/07/08 11:04:40 6
2022/07/08 11:04:41 7
2022/07/08 11:04:42 8
2022/07/08 11:04:43 9
客户端终端输出以下信息:
2022/07/08 11:04:35 recv-server-name:"BidirectionalStream1" age:18 address:"beijing"
2022/07/08 11:04:36 recv-server-name:"BidirectionalStream2" age:18 address:"beijing"
2022/07/08 11:04:37 recv-server-name:"BidirectionalStream3" age:18 address:"beijing"
2022/07/08 11:04:38 recv-server-name:"BidirectionalStream4" age:18 address:"beijing"
2022/07/08 11:04:39 recv-server-name:"BidirectionalStream5" age:18 address:"beijing"
2022/07/08 11:04:40 recv-server-name:"BidirectionalStream6" age:18 address:"beijing"
2022/07/08 11:04:41 recv-server-name:"BidirectionalStream7" age:18 address:"beijing"
2022/07/08 11:04:42 recv-server-name:"BidirectionalStream8" age:18 address:"beijing"
2022/07/08 11:04:43 recv-server-name:"BidirectionalStream9" age:18 address:"beijing"
到此,game over,我们来总结以下最后的代码
总代码
proto文件
syntax = "proto3";
package protos;
option go_package ="/protos";
message SendMessage {
string name = 1;
}
message ReceiveMessage {
string name = 1;
int32 age =2;
string address=3;
}
service Message {
rpc Send(SendMessage) returns (ReceiveMessage) {}
rpc SendServerStream(SendMessage) returns (stream ReceiveMessage) {}
rpc SendClientStream(stream SendMessage) returns (ReceiveMessage) {}
rpc SendBidirectionalStream(stream SendMessage) returns (stream ReceiveMessage) {}
}
server端
controllers/Message.go
package controllers
import (
"context"
"grpc.server/protos"
"io"
"log"
"strconv"
"strings"
"time"
)
type Message struct {
protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
resp := &protos.ReceiveMessage{
Name: req.Name,
Age: 18,
Address: "beijing",
}
return resp, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
var i int64
for i = 0; i < 10; i++ {
resp := &protos.ReceiveMessage{
Name: req.Name + strconv.FormatInt(i, 10),
Age: 18,
Address: "beijing",
}
err := stream.Send(resp)
if err != nil {
return err
}
time.Sleep(1 * time.Second)
}
return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
var names []string
for {
resp := &protos.ReceiveMessage{
Name: "ClientStream Processing Completed" + strings.Join(names, ","),
Age: 18,
Address: "beijing",
}
res, err := stream.Recv()
log.Println(names)
if err == io.EOF {
err := stream.SendAndClose(resp)
if err != nil {
return err
}
return nil
}
if err != nil {
log.Printf("failed to recv: %v", err)
return err
}
names = append(names, res.Name)
}
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Printf("failed to recv: %v", err)
return err
}
log.Println(in.Name)
resp := &protos.ReceiveMessage{
Name: "BidirectionalStream" + in.Name,
Age: 18,
Address: "beijing",
}
err = stream.Send(resp)
if err != nil {
return err
}
}
}
main.go
package main
import (
"google.golang.org/grpc"
"grpc.server/controllers"
"grpc.server/protos"
"log"
"net"
)
func main() {
address := ":" + "8888"
listener, err := net.Listen("tcp", address)
if err != nil {
log.Println(err)
}
server := grpc.NewServer()
protos.RegisterMessageServer(server, &controllers.Message{})
err = server.Serve(listener)
if err != nil {
log.Println(err)
}
}
client端代码
main.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc.client/protos"
"io"
"log"
"strconv"
"time"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer conn.Close()
c := protos.NewMessageClient(conn)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
send(c)
sendServerStream(c)
sendClientStream(c)
sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.Send(ctx, &protos.SendMessage{Name: "SimpleAPI"})
if err != nil {
panic(err)
}
log.Printf("Revice Ms: %s", r)
}
func sendServerStream(c protos.MessageClient) {
stream, err := c.SendServerStream(context.Background(), &protos.SendMessage{Name: "ServerStream"})
if err != nil {
panic(err)
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Recv error:%v", err)
continue
}
log.Printf("Recv Ms:%v", resp)
}
}
func sendClientStream(c protos.MessageClient) {
stream, err := c.SendClientStream(context.Background())
if err != nil {
panic(err)
}
for i := 0; i < 10; i++ {
err := stream.Send(&protos.SendMessage{Name: "ClientStream" + strconv.Itoa(i)})
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
resp, err := stream.CloseAndRecv()
if err != nil {
log.Printf("failed to recv: %v", err)
}
log.Printf("Recv Ms: %s", resp)
}
func sendBidirectionalStream(c protos.MessageClient) {
stream, err := c.SendBidirectionalStream(context.Background())
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
close(done)
return
}
if err != nil {
log.Fatal(err)
}
log.Printf("recv-server-%s", resp)
}
}()
var i int64
for i = 1; i < 10; i++ {
err := stream.Send(&protos.SendMessage{
Name: strconv.FormatInt(i, 10),
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
_ = stream.CloseSend()
<-done
}
小结
Client发送完成后需要手动调用Close()或者CloseSend()方法关闭stream,Server端则return nil就会自动 Close。
1)ServerStream
服务端处理完成后 return nil 代表响应完成
客户端通过 err == io.EOF判断服务端是否响应完成
2)ClientStream
客户端发送完毕通过 CloseAndRecv 关闭stream 并接收服务端响应
服务端通过 err == io.EOF 判断客户端是否发送完毕,完毕后使用SendAndClose关闭 stream并返回响应。
3)BidirectionalStream
客户端服务端都通过 stream 向对方推送数据
客户端推送完成后通过 CloseSend 关闭流,通过 err == io.EOF 判断服务端是否响应完成
服务端通过 err == io.EOF 判断客户端是否响应完成,通过 return nil 表示已经完成响应
通过 err == io.EOF 来判定是否把对方推送的数据全部获取到了。
客户端通过 CloseAndRecv 或者 CloseSend 关闭 Stream,服务端则通过 SendAndClose 或者直接 return nil 来返回响应。
参考链接
https://lixueduan.com/post/grpc/03-stream/
https://colobu.com/2017/04/06/dive-into-gRPC-streaming/
好了,到此结束,有啥问题看主页加联系方式讨论,源码在这 https://github.com/JoeyX-u/grpc-stream-example