基础知识介绍

gRPC (gRPCRemote Procedure Calls) 是Google发起的一个开源的远程过程调用 (Remote procedure call) 系统。该系统基于 HTTP/2 协议传输,使用Protocol Buffers 工具来序列化结构化数据

protobuf

protobuf 全称 Google Protocol Buffers,是 google 开发的的一套用于数据存储,网络通信时用于协议编解码的工具库,proto 文件是以 xxx.proto 命名。

stream

Stream 顾名思义就是一种流,可以源源不断的推送数据,很适合大数据传输,或者服务端和客户端长时间数据交互的场景。

使用场景

1:股票app:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

2:数据上传:几百万的数据上传处理,若普通rpc则需要全部传输成功后才进行处理,而stream则会再接受第一条数据后就开始处理

3:数据交换:比如LOL,王者荣耀等竞技游戏,client和server之间需要非常频繁地交换数据

ServiceAPI类型

gRPC 中的 Service API 有如下4种类型:

SimpleAPI:普通rpc
ServerStreaming:服务器流式响应
ClientStreaming:客户端流式发送
BidirectionalStreaming:双向流

操作实战

代码说明:通过传入名字查询对应的结果

下载protoc

wget https://github.com/protocolbuffers/protobuf/releases/download/v21.2/protoc-21.2-linux-x86_64.zip
unzip protoc-21.2-linux-x86_64.zip
mv protoc-21.2-linux-x86_64 /usr/local/protoc

写入环境变量

将protoc命令写入path中,编辑 /etc/profile文件

export PROTOC_HOME=/usr/local/protoc
export PATH=$PROTOC_HOME/bin:$PATH

目录说明

创建一个grpc-stream-example目录存放演示代码

tree 
.
├── client
│   ├── go.mod
│   ├── go.sum
│   ├── main.go
│   └── protos
│       └── ServerStream.proto
├── README.md
└── server
    ├── controllers
    │   └── Message.go
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── protos
        └── ServerStream.proto

5 directories, 10 files

创建client和server目录,分别存放server端和client端代码,进入各自的目录初始化go mod文件,以及创建protos目录存放proto文件

定义proto文件

syntax = "proto3";
package protos;
option go_package ="/protos";
message  SendMessage {
  string name = 1;
}
message  ReceiveMessage {
  string name = 1;
  int32 age =2;
  string address=3;
}
service Message {
  rpc Send(SendMessage) returns (ReceiveMessage) {}
  rpc SendServerStream(SendMessage) returns (stream ReceiveMessage) {}
  rpc SendClientStream(stream SendMessage) returns (ReceiveMessage) {}
  rpc SendBidirectionalStream(stream SendMessage) returns (stream ReceiveMessage) {}
}
syntax:声明proto的版本 只有 proto3 才支持 gRPC
option go_package: 将编译后文件输出在/protos目录
package: 指定当前proto文件属于protos包
message:定义具体的请求或响应的参数,类似于struct的定义
service: 定义方法,这里我们定义四个方法来分别测试普通的api和具有不同stream api的功能,类似于interface的定义
         要实现流式响应或请求,只需在方法中对请求或响应使用stream标记即可
定义一个请求体为SendMessage的message,其中字段为name,通过传一个name来获取对应的信息
定义一个响应体为ReceiveMessage的message,其中定义其对象的属性信息,包括名称,年龄和地址

我们定义一个方法,其传入一个SendMessage结构的message,返回一个ReceiveMessage结构的message
Send: 普通api
SendServerStream: 服务器流式响应
SendClientStream:客户端流式发送
SendBidirectionalStream:双向流

我们这里测试,server端和client端使用的同一个proto文件即可,定义完成后编译生成go代码

编译文件

go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
protoc --go_out=./server --go-grpc_out=./server ./server/protos/*.proto
protoc --go_out=./client --go-grpc_out=./client ./client/protos/*.proto

此时,我们可以看到protos目录下编译生成的go文件

├── client
│   ├── go.mod
│   ├── go.sum
│   ├── main.go
│   └── protos
│       ├── ServerStream_grpc.pb.go
│       ├── ServerStream.pb.go
│       └── ServerStream.proto
├── README.md
└── server
    ├── controllers
    │   └── Message.go
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── protos
        ├── ServerStream_grpc.pb.go
        ├── ServerStream.pb.go
        └── ServerStream.proto

5 directories, 14 files

我们可以看见在protos目录下自动生成了一个_grpc.pb.go的文件中,自动生成了一个名字叫MessageServer的interface接口,我们只需要实现里面对应的定义好的方法即可。

type MessageServer interface {
	Send(context.Context, *SendMessage) (*ReceiveMessage, error)
	SendServerStream(*SendMessage, Message_SendServerStreamServer) error
	SendClientStream(Message_SendClientStreamServer) error
	SendBidirectionalStream(Message_SendBidirectionalStreamServer) error
	mustEmbedUnimplementedMessageServer()
}

注意每个方法传入的参数不一样,我们在实现这些方法的时候也要传入对应类型的参数,这些代码都是自动生成的,不用管,我们只需要实现它们即可

使用普通api

创建controllers目录存放具体的业务逻辑代码

server端代码

controllers/Message.go

package controllers

import (
	"context"
	"grpc.server/protos"
	"io"
	"log"
	"strconv"
	"strings"
	"time"
)
type Message struct {
	protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
	resp := &protos.ReceiveMessage{
		Name:    req.Name,
		Age:     18,
		Address: "beijing",
	}
	return resp, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
    return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
	return nil
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
	return nil
}

这里我们先实现普通普通api的方法Send,我们在里面直接返回ReceiveMessage并填充其字段,其他方法先直接返回一个nil即可

main.go

package main
import (
	"google.golang.org/grpc"
	"grpc.server/controllers"
	"grpc.server/protos"
	"log"
	"net"
)

func main() {
	address := ":" + "8888"
	listener, err := net.Listen("tcp", address)
	if err != nil {
		log.Println(err)
	}
	server := grpc.NewServer()
	protos.RegisterMessageServer(server, &controllers.Message{})
	err = server.Serve(listener)
	if err != nil {
		log.Println(err)
	}
}

使用net.Listen启动一个tcp socket,并监听到8888端口,然后使用grpc.NewServer()启动一个grpc服务,接下来我们写一个client去测试

client端代码

将server端定义好的.proto文件拷贝过来,同样执行protoc命令进行编译生成go文件

main.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc.client/protos"
	"io"
	"log"
	"strconv"
	"time"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
	defer conn.Close()
	c := protos.NewMessageClient(conn)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	send(c)
}
func send(c protos.MessageClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.Send(ctx, &protos.SendMessage{Name: "SimpleAPI"})
	if err != nil {
		panic(err)
	}
	log.Printf("Revice Ms: %s", r)
}

我们这里通过grpc.Dial来建立一个连接,并忽略证书验证,然后直接调用protos下的生成好的c.Send方法并传入对应字段即可。我定义send函数和c.Send没关系,我只是把名字取成一样的了

测试

启动server端:go run main.go
启动client端:go run main.go
2022/07/08 10:07:02 Revice Ms: name:"SimpleAPI" age:18 address:"beijing"

我们会发现他返回了对应的数据

服务器端流式响应

股票app:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

图片搜索:客户端发送一张图片,陆续返回给它相似的图片

server 端代码

controllers/Message.go

package controllers

import (
	"context"
	"grpc.server/protos"
	"io"
	"log"
	"strconv"
	"strings"
	"time"
)
type Message struct {
	protos.MessageServer
}
func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
    return nil,nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
	var i int64
	for i = 0; i < 10; i++ {
		resp := &protos.ReceiveMessage{
			Name:    req.Name + strconv.FormatInt(i, 10),
			Age:     18,
			Address: "beijing",
		}
		err := stream.Send(resp)
		if err != nil {
			return err
		}
		time.Sleep(1 * time.Second)
	}
	return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
	return nil
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
	return nil
}

每隔一秒钟,响应一次客户端,一共响应十次,这里有个stream,这个不是关键字,我是用来区分这个时流方式还是普通方式,在流式响应的时候,使用了stream.Sned方法来返回给客户端,而不是直接return

client 端代码

main.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc.client/protos"
	"io"
	"log"
	"strconv"
	"time"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
	defer conn.Close()
	c := protos.NewMessageClient(conn)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	//send(c)
	sendServerStream(c)
	//sendClientStream(c)
	//sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {}
func sendServerStream(c protos.MessageClient) {
	stream, err := c.SendServerStream(context.Background(), &protos.SendMessage{Name: "ServerStream"})
	if err != nil {
		panic(err)
	}
	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Printf("Recv error:%v", err)
			continue
		}
		log.Printf("Recv Ms:%v", resp)
	}
}
func sendClientStream(c protos.MessageClient) {}
func sendBidirectionalStream(c protos.MessageClient) {}

此时,我们调用一下,看下结果

go run main.go 
2022/07/08 10:15:18 Recv Ms:name:"ServerStream0"  age:18  address:"beijing"
2022/07/08 10:15:19 Recv Ms:name:"ServerStream1"  age:18  address:"beijing"
2022/07/08 10:15:20 Recv Ms:name:"ServerStream2"  age:18  address:"beijing"
2022/07/08 10:15:21 Recv Ms:name:"ServerStream3"  age:18  address:"beijing"
2022/07/08 10:15:22 Recv Ms:name:"ServerStream4"  age:18  address:"beijing"
2022/07/08 10:15:23 Recv Ms:name:"ServerStream5"  age:18  address:"beijing"
2022/07/08 10:15:24 Recv Ms:name:"ServerStream6"  age:18  address:"beijing"
2022/07/08 10:15:25 Recv Ms:name:"ServerStream7"  age:18  address:"beijing"
2022/07/08 10:15:26 Recv Ms:name:"ServerStream8"  age:18  address:"beijing"
2022/07/08 10:15:27 Recv Ms:name:"ServerStream9"  age:18  address:"beijing"

我们会发现我们执行了一次,服务的陆续返回了十次结果,并且在接受数据的时候使用了stream.Recv()来接受流数据,使用err == io.EOF来判断流是否发送完成

客户端流式发送

数据上传:几百万的数据上传处理,若普通rpc则需要全部传输成功后才进行处理,而stream则会再接受第一条数据后就开始处理

server 端代码

controllers/Message.go

package controllers

import (
	"context"
	"grpc.server/protos"
	"io"
	"log"
	"strconv"
	"strings"
	"time"
)

type Message struct {
	protos.MessageServer
}

func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
	return nil, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
	return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
	var names []string
	for {
		resp := &protos.ReceiveMessage{
			Name:    "ClientStream Processing Completed" + strings.Join(names, ","),
			Age:     18,
			Address: "beijing",
		}
		res, err := stream.Recv()
         log.Println(names)
		if err == io.EOF {
			err := stream.SendAndClose(resp)
			if err != nil {
				return err
			}
			return nil
		}
		if err != nil {
			log.Printf("failed to recv: %v", err)
			return err
		}
		names = append(names, res.Name)
	}
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
    return nil
}

我们这里写个死循环来接受客户端的请求,使用stream.Recv()方法来接受数据,err == io.EOF 则表示数据已经全部获取了,最后通过 stream.SendAndClose()返回响应。这里并不会关闭连接,一直在循环处理请求的数据,关闭连接是在客户端做的

client 端代码

main.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc.client/protos"
	"io"
	"log"
	"strconv"
	"time"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
	defer conn.Close()
	c := protos.NewMessageClient(conn)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	//send(c)
	//sendServerStream(c)
	sendClientStream(c)
	//sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {}
func sendServerStream(c protos.MessageClient) {}
func sendClientStream(c protos.MessageClient) {
	stream, err := c.SendClientStream(context.Background())
	if err != nil {
		panic(err)
	}
	for i := 0; i < 10; i++ {
		err := stream.Send(&protos.SendMessage{Name: "ClientStream" + strconv.Itoa(i)})
		if err != nil {
			return
		}
		time.Sleep(1 * time.Second)
	}
	resp, err := stream.CloseAndRecv()
	if err != nil {
		log.Printf("failed to recv: %v", err)
	}
	log.Printf("Recv Ms: %s", resp)
}
func sendBidirectionalStream(c protos.MessageClient) {}

通过多次调用 stream.Send() 像服务端推送多个数据,最后调用 stream.CloseAndRecv() 关闭stream并接收服务端响应。

我们在客户端执行go run main.go后,服务端每秒都会输出信息:
2022/07/08 10:45:25 []
2022/07/08 10:45:26 [ClientStream0]
2022/07/08 10:45:27 [ClientStream0 ClientStream1]
2022/07/08 10:45:28 [ClientStream0 ClientStream1 ClientStream2]
2022/07/08 10:45:29 [ClientStream0 ClientStream1 ClientStream2 ClientStream3]
2022/07/08 10:45:30 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4]
2022/07/08 10:45:31 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5]
2022/07/08 10:45:32 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6]
2022/07/08 10:45:33 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6 ClientStream7]
2022/07/08 10:45:34 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6 ClientStream7 ClientStream8]
2022/07/08 10:45:35 [ClientStream0 ClientStream1 ClientStream2 ClientStream3 ClientStream4 ClientStream5 ClientStream6 ClientStream7 ClientStream8 ClientStream9]

客户端在十秒后,会收到以下信息:
2022/07/08 10:57:10 Recv Ms: name:"ClientStream Processing CompletedClientStream0,ClientStream1,ClientStream2,ClientStream3,ClientStream4,ClientStream5,ClientStream6,ClientStream7,ClientStream8,ClientStream9"  age:18  address:"beijing"

此时我们就会看见客户端在发送第一条数据开始,服务端就进行了响应,一直到发送完成后返回了一个最后的完成的消息

双向流

数据交换:比如LOL,王者荣耀等竞技游戏,client和server之间需要非常频繁地交换数据

server 端代码

controllers/Message.go

package controllers

import (
	"context"
	"grpc.server/protos"
	"io"
	"log"
	"strconv"
	"strings"
	"time"
)

type Message struct {
	protos.MessageServer
}

func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
	return nil, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
	return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
	return nil
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			log.Printf("failed to recv: %v", err)
			return err
		}
        log.Println(in.Name)
		resp := &protos.ReceiveMessage{
			Name:    "BidirectionalStream" + in.Name,
			Age:     18,
			Address: "beijing",
		}
		err = stream.Send(resp)
		if err != nil {
			return err
		}
	}
}

server端和client端都需要同时使用stream.Recv()stream.Send()两个方法同时接受和发送数据,并且都需要使用err==io.EOF来判断数据是否传输完成

client 端代码

main.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc.client/protos"
	"io"
	"log"
	"strconv"
	"time"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
	defer conn.Close()
	c := protos.NewMessageClient(conn)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	//send(c)
	//sendServerStream(c)
	//sendClientStream(c)
	sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {}
func sendServerStream(c protos.MessageClient) {}
func sendClientStream(c protos.MessageClient) {}
func sendBidirectionalStream(c protos.MessageClient) {
	stream, err := c.SendBidirectionalStream(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	done := make(chan struct{})
	go func() {
		for {
			resp, err := stream.Recv()
			if err == io.EOF {
				close(done)
				return
			}
			if err != nil {
				log.Fatal(err)
			}
			log.Printf("recv-server-%s", resp)
		}
	}()
	var i int64
	for i = 1; i < 10; i++ {
		err := stream.Send(&protos.SendMessage{
			Name: strconv.FormatInt(i, 10),
		})
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(1 * time.Second)
	}
	_ = stream.CloseSend()
	<-done
}

我们在客户端使用协程来进行发送和接受流,服务端也能这样写,我这就在客户端写了,然后启动server端和client端的程序

服务端终端输出以下信息:
2022/07/08 11:04:35 1
2022/07/08 11:04:36 2
2022/07/08 11:04:37 3
2022/07/08 11:04:38 4
2022/07/08 11:04:39 5
2022/07/08 11:04:40 6
2022/07/08 11:04:41 7
2022/07/08 11:04:42 8
2022/07/08 11:04:43 9

客户端终端输出以下信息:
2022/07/08 11:04:35 recv-server-name:"BidirectionalStream1" age:18 address:"beijing"
2022/07/08 11:04:36 recv-server-name:"BidirectionalStream2" age:18 address:"beijing"
2022/07/08 11:04:37 recv-server-name:"BidirectionalStream3" age:18 address:"beijing"
2022/07/08 11:04:38 recv-server-name:"BidirectionalStream4" age:18 address:"beijing"
2022/07/08 11:04:39 recv-server-name:"BidirectionalStream5" age:18 address:"beijing"
2022/07/08 11:04:40 recv-server-name:"BidirectionalStream6" age:18 address:"beijing"
2022/07/08 11:04:41 recv-server-name:"BidirectionalStream7" age:18 address:"beijing"
2022/07/08 11:04:42 recv-server-name:"BidirectionalStream8" age:18 address:"beijing"
2022/07/08 11:04:43 recv-server-name:"BidirectionalStream9" age:18 address:"beijing"

到此,game over,我们来总结以下最后的代码

总代码

proto文件

syntax = "proto3";
package protos;
option go_package ="/protos";

message  SendMessage {
  string name = 1;
}
message  ReceiveMessage {
  string name = 1;
  int32 age =2;
  string address=3;
}
service Message {
  rpc Send(SendMessage) returns (ReceiveMessage) {}
  rpc SendServerStream(SendMessage) returns (stream ReceiveMessage) {}
  rpc SendClientStream(stream SendMessage) returns (ReceiveMessage) {}
  rpc SendBidirectionalStream(stream SendMessage) returns (stream ReceiveMessage) {}
}

server端

controllers/Message.go

package controllers

import (
	"context"
	"grpc.server/protos"
	"io"
	"log"
	"strconv"
	"strings"
	"time"
)

type Message struct {
	protos.MessageServer
}

func (m *Message) Send(ctx context.Context, req *protos.SendMessage) (*protos.ReceiveMessage, error) {
	resp := &protos.ReceiveMessage{
		Name:    req.Name,
		Age:     18,
		Address: "beijing",
	}
	return resp, nil
}
func (m *Message) SendServerStream(req *protos.SendMessage, stream protos.Message_SendServerStreamServer) error {
	var i int64
	for i = 0; i < 10; i++ {
		resp := &protos.ReceiveMessage{
			Name:    req.Name + strconv.FormatInt(i, 10),
			Age:     18,
			Address: "beijing",
		}
		err := stream.Send(resp)
		if err != nil {
			return err
		}
		time.Sleep(1 * time.Second)
	}
	return nil
}
func (m *Message) SendClientStream(stream protos.Message_SendClientStreamServer) error {
	var names []string
	for {
		resp := &protos.ReceiveMessage{
			Name:    "ClientStream Processing Completed" + strings.Join(names, ","),
			Age:     18,
			Address: "beijing",
		}
		res, err := stream.Recv()
		log.Println(names)
		if err == io.EOF {
			err := stream.SendAndClose(resp)
			if err != nil {
				return err
			}
			return nil
		}
		if err != nil {
			log.Printf("failed to recv: %v", err)
			return err
		}
		names = append(names, res.Name)
	}
}
func (m *Message) SendBidirectionalStream(stream protos.Message_SendBidirectionalStreamServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			log.Printf("failed to recv: %v", err)
			return err
		}
		log.Println(in.Name)
		resp := &protos.ReceiveMessage{
			Name:    "BidirectionalStream" + in.Name,
			Age:     18,
			Address: "beijing",
		}
		err = stream.Send(resp)
		if err != nil {
			return err
		}
	}
}

main.go

package main
import (
	"google.golang.org/grpc"
	"grpc.server/controllers"
	"grpc.server/protos"
	"log"
	"net"
)

func main() {
	address := ":" + "8888"
	listener, err := net.Listen("tcp", address)
	if err != nil {
		log.Println(err)
	}
	server := grpc.NewServer()
	protos.RegisterMessageServer(server, &controllers.Message{})
	err = server.Serve(listener)
	if err != nil {
		log.Println(err)
	}
}

client端代码

main.go

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"grpc.client/protos"
	"io"
	"log"
	"strconv"
	"time"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
	defer conn.Close()
	c := protos.NewMessageClient(conn)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	send(c)
	sendServerStream(c)
	sendClientStream(c)
	sendBidirectionalStream(c)
}
func send(c protos.MessageClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.Send(ctx, &protos.SendMessage{Name: "SimpleAPI"})
	if err != nil {
		panic(err)
	}
	log.Printf("Revice Ms: %s", r)
}
func sendServerStream(c protos.MessageClient) {
	stream, err := c.SendServerStream(context.Background(), &protos.SendMessage{Name: "ServerStream"})
	if err != nil {
		panic(err)
	}
	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Printf("Recv error:%v", err)
			continue
		}
		log.Printf("Recv Ms:%v", resp)
	}
}
func sendClientStream(c protos.MessageClient) {
	stream, err := c.SendClientStream(context.Background())
	if err != nil {
		panic(err)
	}
	for i := 0; i < 10; i++ {
		err := stream.Send(&protos.SendMessage{Name: "ClientStream" + strconv.Itoa(i)})
		if err != nil {
			return
		}
		time.Sleep(1 * time.Second)
	}
	resp, err := stream.CloseAndRecv()
	if err != nil {
		log.Printf("failed to recv: %v", err)
	}
	log.Printf("Recv Ms: %s", resp)
}
func sendBidirectionalStream(c protos.MessageClient) {
	stream, err := c.SendBidirectionalStream(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	done := make(chan struct{})
	go func() {
		for {
			resp, err := stream.Recv()
			if err == io.EOF {
				close(done)
				return
			}
			if err != nil {
				log.Fatal(err)
			}
			log.Printf("recv-server-%s", resp)
		}
	}()
	var i int64
	for i = 1; i < 10; i++ {
		err := stream.Send(&protos.SendMessage{
			Name: strconv.FormatInt(i, 10),
		})
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(1 * time.Second)
	}
	_ = stream.CloseSend()
	<-done
}

小结

Client发送完成后需要手动调用Close()或者CloseSend()方法关闭stream,Server端则return nil就会自动 Close。
1)ServerStream
    服务端处理完成后 return nil 代表响应完成
    客户端通过 err == io.EOF判断服务端是否响应完成
2)ClientStream
    客户端发送完毕通过 CloseAndRecv 关闭stream 并接收服务端响应
    服务端通过 err == io.EOF 判断客户端是否发送完毕,完毕后使用SendAndClose关闭 stream并返回响应。
3)BidirectionalStream
    客户端服务端都通过 stream 向对方推送数据
    客户端推送完成后通过 CloseSend 关闭流,通过 err == io.EOF 判断服务端是否响应完成
    服务端通过 err == io.EOF 判断客户端是否响应完成,通过 return nil 表示已经完成响应
    通过 err == io.EOF 来判定是否把对方推送的数据全部获取到了。
    客户端通过 CloseAndRecv 或者 CloseSend 关闭 Stream,服务端则通过 SendAndClose 或者直接 return nil 来返回响应。

参考链接

https://lixueduan.com/post/grpc/03-stream/
https://colobu.com/2017/04/06/dive-into-gRPC-streaming/

好了,到此结束,有啥问题看主页加联系方式讨论,源码在这 https://github.com/JoeyX-u/grpc-stream-example