Go gRPC 编程教程
📖 关于本教程本教程系统讲解 gRPC 全栈开发:Proto 语法、protoc 工具链、序列化性能、Server/Client 实现、拦截器、TLS 加密、身份验证、Streaming、公网部署等,配合完整可运行代码。
1. Proto 语法与 protoc 工具
1.1 安装工具链
shell
# ==================== 安装 protoc 编译器 ====================
# macOS
brew install protobuf
# Ubuntu
sudo apt install -y protobuf-compiler
# 验证
protoc --version
# libprotoc 25.x
# ==================== 安装 Go 插件 ====================
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 确保 $GOPATH/bin 在 PATH 中
export PATH="$PATH:$(go env GOPATH)/bin"
# ==================== 安装 Go 依赖 ====================
go get google.golang.org/grpc
go get google.golang.org/protobuf1.2 Proto3 语法详解
protobuf
// user.proto
syntax = "proto3"; // 必须第一行声明
// Go 包路径(生成的 Go 代码的 package)
option go_package = "github.com/myproject/pb/user";
// proto 包名(逻辑命名空间,防止类型名冲突)
package user;
// ==================== 导入其他 proto 文件 ====================
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto"; // 可空基本类型
// ==================== 枚举 ====================
enum UserStatus {
USER_STATUS_UNSPECIFIED = 0; // 第一个值必须为 0(默认值)
USER_STATUS_ACTIVE = 1;
USER_STATUS_DISABLED = 2;
USER_STATUS_DELETED = 3;
}
enum Gender {
GENDER_UNSPECIFIED = 0;
GENDER_MALE = 1;
GENDER_FEMALE = 2;
}
// ==================== 消息类型 ====================
message User {
// 字段格式:类型 名称 = 字段编号;
// 字段编号一旦使用就不能更改(向后兼容)
// 编号 1-15 占 1 字节(高频字段用小编号)
// 编号 16-2047 占 2 字节
int64 id = 1;
string username = 2;
string email = 3;
int32 age = 4;
UserStatus status = 5;
Gender gender = 6;
// ==================== 常用类型 ====================
double balance = 7; // float64
float score = 8; // float32
bool is_vip = 9;
bytes avatar = 10; // []byte
// ==================== 复合类型 ====================
repeated string tags = 11; // 切片 []string
map<string, string> meta = 12; // map[string]string
// ==================== 嵌套消息 ====================
Address address = 13;
// ==================== Well-Known Types ====================
google.protobuf.Timestamp created_at = 14;
google.protobuf.StringValue nickname = 15; // 可空字符串(区分""和未设置)
// ==================== oneof(互斥字段)====================
oneof contact {
string phone = 16;
string wechat = 17;
}
// ==================== reserved 保留已废弃的字段 ====================
reserved 99, 100;
reserved "old_field", "deprecated_field";
}
message Address {
string province = 1;
string city = 2;
string street = 3;
string zip_code = 4;
}
// ==================== 请求 / 响应消息 ====================
message CreateUserRequest {
string username = 1;
string email = 2;
int32 age = 3;
}
message CreateUserResponse {
User user = 1;
}
message GetUserRequest {
int64 id = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
string keyword = 3;
UserStatus status = 4;
}
message ListUsersResponse {
repeated User users = 1;
int64 total = 2;
}
message UpdateUserRequest {
int64 id = 1;
string username = 2;
string email = 3;
int32 age = 4;
}
message DeleteUserRequest {
int64 id = 1;
}
// ==================== 服务定义 ====================
service UserService {
// 一元 RPC(普通请求-响应)
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc GetUser(GetUserRequest) returns (User);
rpc UpdateUser(UpdateUserRequest) returns (User);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// 服务端流(Server Streaming)
rpc WatchUser(GetUserRequest) returns (stream User);
// 客户端流(Client Streaming)
rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// 双向流(Bidirectional Streaming)
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string from = 1;
string content = 2;
}1.3 Proto 类型映射
text
Proto 类型 Go 类型 零值 说明
──────────────────────────────────────────────────────────────
double float64 0 双精度浮点
float float32 0 单精度浮点
int32 int32 0 变长编码(负数效率低)
int64 int64 0 变长编码
uint32 uint32 0 变长编码
uint64 uint64 0 变长编码
sint32 int32 0 变长编码(负数更高效)
sint64 int64 0 变长编码(负数更高效)
fixed32 uint32 0 固定 4 字节(值 > 2^28 时更高效)
fixed64 uint64 0 固定 8 字节
sfixed32 int32 0 固定 4 字节
sfixed64 int64 0 固定 8 字节
bool bool false
string string ""
bytes []byte nil
repeated T []T nil 切片
map<K,V> map[K]V nil
message *MessageType nil 指针
enum EnumType 0 第一个枚举值
Timestamp *timestamppb.Timestamp nil 时间1.4 protoc 编译
shell
# 基本编译命令
protoc --go_out=. --go-grpc_out=. proto/user.proto
# 指定输出路径(推荐:module 模式)
protoc \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/user.proto
# 编译多个文件
protoc \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/*.proto
# 指定 import 搜索路径
protoc \
-I proto \
-I third_party \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/user.proto项目目录结构:
text
myproject/
├── go.mod
├── proto/
│ └── user.proto
├── pb/ ← protoc 生成的代码
│ ├── user.pb.go ← 消息类型(protoc-gen-go)
│ └── user_grpc.pb.go ← gRPC 服务接口(protoc-gen-go-grpc)
├── server/
│ └── main.go
├── client/
│ └── main.go
└── Makefilemakefile
# Makefile
.PHONY: proto clean
proto:
protoc \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/*.proto
clean:
rm -f pb/*.pb.go2. PB 序列化方式和性能对比
2.1 Protobuf 编码原理
text
Protobuf 使用 Tag-Length-Value(TLV)二进制编码:
┌─────────┬────────┬──────────────┐
│ Tag │ Length │ Value │
│ (Varint) │ (可选) │ (字节序列) │
└─────────┴────────┴──────────────┘
Tag = (字段编号 << 3) | wire_type
Wire Type:
0 Varint int32, int64, uint32, uint64, bool, enum
1 64-bit fixed64, sfixed64, double
2 Length-delim string, bytes, message, repeated
5 32-bit fixed32, sfixed32, float
例子:字段 id = 1, 值 = 150
Tag = (1 << 3) | 0 = 0x08
Value = Varint(150) = 0x96 0x01
总共只需 3 字节!
对比 JSON:{"id":150} = 10 字节2.2 序列化性能对比
go
package main
import (
"encoding/json"
"fmt"
"testing"
"time"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/myproject/pb"
)
// 构建测试数据
func buildTestUser() *pb.User {
return &pb.User{
Id: 12345678,
Username: "alice_wonderland",
Email: "alice@example.com",
Age: 28,
Status: pb.UserStatus_USER_STATUS_ACTIVE,
Balance: 99999.99,
IsVip: true,
Tags: []string{"golang", "grpc", "microservice"},
Meta: map[string]string{
"department": "engineering",
"level": "senior",
},
CreatedAt: timestamppb.Now(),
}
}
// 等效的 JSON 结构
type UserJSON struct {
ID int64 `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Age int32 `json:"age"`
Status int32 `json:"status"`
Balance float64 `json:"balance"`
IsVIP bool `json:"is_vip"`
Tags []string `json:"tags"`
Meta map[string]string `json:"meta"`
CreatedAt string `json:"created_at"`
}
func main() {
user := buildTestUser()
// ==================== Protobuf 序列化 ====================
pbData, _ := proto.Marshal(user)
fmt.Printf("Protobuf 大小: %d 字节\n", len(pbData))
// 反序列化
user2 := &pb.User{}
proto.Unmarshal(pbData, user2)
// ==================== JSON 序列化 ====================
userJSON := UserJSON{
ID: 12345678, Username: "alice_wonderland", Email: "alice@example.com",
Age: 28, Status: 1, Balance: 99999.99, IsVIP: true,
Tags: []string{"golang", "grpc", "microservice"},
Meta: map[string]string{"department": "engineering", "level": "senior"},
CreatedAt: time.Now().Format(time.RFC3339),
}
jsonData, _ := json.Marshal(userJSON)
fmt.Printf("JSON 大小: %d 字节\n", len(jsonData))
fmt.Printf("压缩比: Protobuf 比 JSON 小 %.0f%%\n",
(1-float64(len(pbData))/float64(len(jsonData)))*100)
// ==================== 性能基准测试 ====================
const N = 100000
fmt.Printf("\n序列化 %d 次:\n", N)
// Protobuf Marshal
start := time.Now()
for i := 0; i < N; i++ {
proto.Marshal(user)
}
pbMarshalTime := time.Since(start)
fmt.Printf(" Protobuf Marshal: %s\n", pbMarshalTime)
// JSON Marshal
start = time.Now()
for i := 0; i < N; i++ {
json.Marshal(userJSON)
}
jsonMarshalTime := time.Since(start)
fmt.Printf(" JSON Marshal: %s\n", jsonMarshalTime)
// Protobuf Unmarshal
start = time.Now()
for i := 0; i < N; i++ {
u := &pb.User{}
proto.Unmarshal(pbData, u)
}
pbUnmarshalTime := time.Since(start)
fmt.Printf(" Protobuf Unmarshal: %s\n", pbUnmarshalTime)
// JSON Unmarshal
start = time.Now()
for i := 0; i < N; i++ {
u := &UserJSON{}
json.Unmarshal(jsonData, u)
}
jsonUnmarshalTime := time.Since(start)
fmt.Printf(" JSON Unmarshal: %s\n", jsonUnmarshalTime)
fmt.Printf("\n速度对比: Protobuf Marshal 快 %.1fx, Unmarshal 快 %.1fx\n",
float64(jsonMarshalTime)/float64(pbMarshalTime),
float64(jsonUnmarshalTime)/float64(pbUnmarshalTime))
}
// 典型结果(仅供参考):
// Protobuf 大小: 142 字节
// JSON 大小: 298 字节
// 压缩比: Protobuf 比 JSON 小 52%
//
// 序列化 100000 次:
// Protobuf Marshal: 45ms
// JSON Marshal: 180ms
// Protobuf Unmarshal: 65ms
// JSON Unmarshal: 350ms
//
// 速度对比: Protobuf Marshal 快 4.0x, Unmarshal 快 5.4xtext
Protobuf vs JSON 总结:
维度 Protobuf JSON
──────────────────────────────────────────────────
体积 小(约 50-70%) 大
序列化速度 快 3-5x 慢
反序列化速度 快 5-10x 慢
可读性 ❌ 二进制 ✅ 人类可读
Schema ✅ 强制(.proto 文件) ❌ 无
向后兼容 ✅ 按字段编号 ⚠️ 需要约定
浏览器支持 ⚠️ 需要 grpc-web ✅ 原生
调试 ⚠️ 需要工具 ✅ 直接看3. 高级 protoc 命令选项
3.1 常用选项
shell
# ==================== 输出路径控制 ====================
# paths=source_relative:输出路径相对于 proto 文件位置
protoc --go_out=. --go_opt=paths=source_relative proto/user.proto
# 输出: proto/user.pb.go(和 proto 文件同目录)
# paths=import:输出路径基于 go_package(默认行为)
protoc --go_out=./gen --go_opt=paths=import proto/user.proto
# 输出: gen/github.com/myproject/pb/user/user.pb.go
# ==================== 多文件编译 ====================
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/user.proto proto/order.proto proto/common.proto
# 或用通配符
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/*.proto
# ==================== Import 路径 ====================
# -I(--proto_path):指定 import 搜索路径
protoc -I proto -I third_party/googleapis \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
user.proto
# ==================== gRPC 选项 ====================
# require_unimplemented_servers=false:不强制嵌入 Unimplemented 服务
protoc --go-grpc_out=. \
--go-grpc_opt=paths=source_relative,require_unimplemented_servers=false \
proto/user.proto
# ==================== 只生成消息不生成 gRPC ====================
protoc --go_out=. --go_opt=paths=source_relative proto/common.proto
# 不加 --go-grpc_out,只生成 .pb.go 不生成 _grpc.pb.go
# ==================== 查看中间表示(调试)====================
protoc --decode=user.User proto/user.proto < binary_data.bin
protoc --decode_raw < binary_data.bin
# ==================== 输出描述符文件 ====================
protoc --descriptor_set_out=user.desc --include_imports proto/user.proto3.2 使用 buf 工具(protoc 的现代替代)
shell
# 安装 buf
go install github.com/bufbuild/buf/cmd/buf@latest
# 初始化
buf config init
# buf.yaml
version: v2
modules:
- path: proto
lint:
use:
- STANDARD
breaking:
use:
- FILE
# buf.gen.yaml
version: v2
plugins:
- remote: buf.build/protocolbuffers/go
out: pb
opt: paths=source_relative
- remote: buf.build/grpc/go
out: pb
opt: paths=source_relative
# 生成代码
buf generate
# lint 检查
buf lint
# 向后兼容性检查(对比 git 上一次提交)
buf breaking --against '.git#branch=main'4. gRPC Server 和 Client
4.1 gRPC Server
go
package main
import (
"context"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/myproject/pb"
)
// UserServer 实现 pb.UserServiceServer 接口
type UserServer struct {
pb.UnimplementedUserServiceServer // 嵌入未实现的服务(向前兼容)
mu sync.RWMutex
users map[int64]*pb.User
nextID atomic.Int64
}
func NewUserServer() *UserServer {
s := &UserServer{
users: make(map[int64]*pb.User),
}
s.nextID.Store(1)
return s
}
// CreateUser 创建用户
func (s *UserServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
// 参数验证
if req.Username == "" {
return nil, status.Errorf(codes.InvalidArgument, "username is required")
}
if req.Email == "" {
return nil, status.Errorf(codes.InvalidArgument, "email is required")
}
id := s.nextID.Add(1) - 1
user := &pb.User{
Id: id,
Username: req.Username,
Email: req.Email,
Age: req.Age,
Status: pb.UserStatus_USER_STATUS_ACTIVE,
CreatedAt: timestamppb.Now(),
}
s.mu.Lock()
s.users[id] = user
s.mu.Unlock()
log.Printf("创建用户: id=%d, username=%s", id, req.Username)
return &pb.CreateUserResponse{User: user}, nil
}
// GetUser 查询用户
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
s.mu.RLock()
user, ok := s.users[req.Id]
s.mu.RUnlock()
if !ok {
// 使用 gRPC 标准错误码
return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
}
return user, nil
}
// ListUsers 用户列表
func (s *UserServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var users []*pb.User
for _, u := range s.users {
users = append(users, u)
}
// 简单分页
page := int(req.Page)
pageSize := int(req.PageSize)
if pageSize <= 0 {
pageSize = 20
}
if page <= 0 {
page = 1
}
start := (page - 1) * pageSize
if start >= len(users) {
return &pb.ListUsersResponse{Total: int64(len(users))}, nil
}
end := start + pageSize
if end > len(users) {
end = len(users)
}
return &pb.ListUsersResponse{
Users: users[start:end],
Total: int64(len(users)),
}, nil
}
// UpdateUser 更新用户
func (s *UserServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.User, error) {
s.mu.Lock()
defer s.mu.Unlock()
user, ok := s.users[req.Id]
if !ok {
return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
}
if req.Username != "" {
user.Username = req.Username
}
if req.Email != "" {
user.Email = req.Email
}
if req.Age > 0 {
user.Age = req.Age
}
return user, nil
}
// DeleteUser 删除用户
func (s *UserServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*emptypb.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.users[req.Id]; !ok {
return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
}
delete(s.users, req.Id)
return &emptypb.Empty{}, nil
}
func main() {
// 监听端口
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
// 创建 gRPC Server
grpcServer := grpc.NewServer()
// 注册服务
pb.RegisterUserServiceServer(grpcServer, NewUserServer())
log.Println("gRPC Server 启动: :50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}4.2 gRPC Client
go
package main
import (
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
pb "github.com/myproject/pb"
)
func main() {
// 建立连接
conn, err := grpc.NewClient(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()), // 无 TLS(开发用)
)
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
// 创建客户端
client := pb.NewUserServiceClient(conn)
// 带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// ==================== 创建用户 ====================
createResp, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Username: "alice",
Email: "alice@example.com",
Age: 25,
})
if err != nil {
// 解析 gRPC 错误
st, ok := status.FromError(err)
if ok {
fmt.Printf("gRPC 错误: code=%s, msg=%s\n", st.Code(), st.Message())
}
log.Fatal(err)
}
fmt.Printf("创建成功: %+v\n", createResp.User)
// ==================== 查询用户 ====================
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: createResp.User.Id})
if err != nil {
log.Fatal(err)
}
fmt.Printf("查询结果: %+v\n", user)
// ==================== 更新用户 ====================
updated, err := client.UpdateUser(ctx, &pb.UpdateUserRequest{
Id: user.Id,
Age: 26,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("更新后: %+v\n", updated)
// ==================== 用户列表 ====================
listResp, err := client.ListUsers(ctx, &pb.ListUsersRequest{
Page: 1,
PageSize: 10,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("总数: %d\n", listResp.Total)
for _, u := range listResp.Users {
fmt.Printf(" - %s (%s)\n", u.Username, u.Email)
}
// ==================== 删除用户 ====================
_, err = client.DeleteUser(ctx, &pb.DeleteUserRequest{Id: user.Id})
if err != nil {
log.Fatal(err)
}
fmt.Println("删除成功")
}4.3 gRPC 错误码
text
gRPC 标准错误码(codes 包):
错误码 数值 含义 对应 HTTP
──────────────────────────────────────────────────────────────
OK 0 成功 200
Canceled 1 客户端取消 499
Unknown 2 未知错误 500
InvalidArgument 3 无效参数 400
DeadlineExceeded 4 超时 504
NotFound 5 未找到 404
AlreadyExists 6 已存在 409
PermissionDenied 7 权限不足 403
ResourceExhausted 8 资源耗尽(限流) 429
FailedPrecondition 9 前提条件不满足 400
Aborted 10 操作中止(并发冲突) 409
Unimplemented 12 未实现 501
Internal 13 内部错误 500
Unavailable 14 服务不可用 503
Unauthenticated 16 未认证 4015. 拦截器
go
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// ==================== 服务端一元拦截器 ====================
// 日志拦截器
func loggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// 获取客户端信息
clientIP := "unknown"
if p, ok := peer.FromContext(ctx); ok {
clientIP = p.Addr.String()
}
// 调用实际处理器
resp, err := handler(ctx, req)
// 记录日志
duration := time.Since(start)
code := codes.OK
if err != nil {
if st, ok := status.FromError(err); ok {
code = st.Code()
}
}
log.Printf("[gRPC] %s | %s | %s | %s",
info.FullMethod, clientIP, code, duration)
return resp, err
}
// 认证拦截器
func authInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 从 metadata 获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
token := tokens[0]
// 验证 token(示例:简单校验)
if token != "Bearer valid-token-123" {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
return handler(ctx, req)
}
// Recovery 拦截器
func recoveryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("[PANIC] %s: %v", info.FullMethod, r)
err = status.Errorf(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
// ==================== 服务端流拦截器 ====================
func streamLoggingInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
log.Printf("[Stream Start] %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("[Stream End] %s | %s", info.FullMethod, time.Since(start))
return err
}
// ==================== 客户端一元拦截器 ====================
func clientLoggingInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
log.Printf("[Client] %s | %s | err=%v", method, time.Since(start), err)
return err
}
// 客户端自动注入 token
func clientAuthInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// 注入 metadata
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer valid-token-123")
return invoker(ctx, method, req, reply, cc, opts...)
}
// ==================== 注册拦截器 ====================
func serverExample() {
server := grpc.NewServer(
// 链式一元拦截器(按顺序执行)
grpc.ChainUnaryInterceptor(
recoveryInterceptor,
loggingInterceptor,
authInterceptor,
),
// 链式流拦截器
grpc.ChainStreamInterceptor(
streamLoggingInterceptor,
),
)
_ = server
}
func clientExample() {
conn, _ := grpc.NewClient(
"localhost:50051",
grpc.WithChainUnaryInterceptor(
clientLoggingInterceptor,
clientAuthInterceptor,
),
)
_ = conn
}6. gRPC 数据加密传输
6.1 生成自签名证书
shell
# 生成 CA 私钥和证书
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
-subj "/C=CN/ST=Beijing/O=MyOrg/CN=MyCA"
# 生成服务端私钥和 CSR
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr \
-subj "/C=CN/ST=Beijing/O=MyOrg/CN=localhost"
# 创建 SAN 扩展文件(支持域名和 IP)
cat > server_ext.cnf << EOF
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = localhost
DNS.2 = *.example.com
IP.1 = 127.0.0.1
IP.2 = 0.0.0.0
EOF
# CA 签发服务端证书
openssl x509 -req -days 365 -in server.csr \
-CA ca.crt -CAkey ca.key -CAcreateserial \
-out server.crt -extfile server_ext.cnf -extensions v3_req6.2 TLS Server
go
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"os"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "github.com/myproject/pb"
)
func main() {
// ==================== 方式1:单向 TLS(客户端验证服务端)====================
serverCert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatalf("加载证书失败: %v", err)
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{serverCert},
MinVersion: tls.VersionTLS13,
})
server := grpc.NewServer(grpc.Creds(creds))
pb.RegisterUserServiceServer(server, NewUserServer())
lis, _ := net.Listen("tcp", ":50051")
log.Println("gRPC TLS Server 启动: :50051")
server.Serve(lis)
}
// ==================== 方式2:双向 TLS(mTLS,互相验证)====================
func startMTLSServer() {
// 加载服务端证书
serverCert, _ := tls.LoadX509KeyPair("server.crt", "server.key")
// 加载 CA 证书(用于验证客户端证书)
caCert, _ := os.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{serverCert},
ClientAuth: tls.RequireAndVerifyClientCert, // 要求客户端提供证书
ClientCAs: caPool,
MinVersion: tls.VersionTLS13,
})
server := grpc.NewServer(grpc.Creds(creds))
pb.RegisterUserServiceServer(server, NewUserServer())
lis, _ := net.Listen("tcp", ":50051")
fmt.Println("gRPC mTLS Server 启动: :50051")
server.Serve(lis)
}6.3 TLS Client
go
package main
import (
"crypto/tls"
"crypto/x509"
"log"
"os"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
func main() {
// ==================== 方式1:单向 TLS ====================
// 客户端信任 CA 签发的证书
caCert, _ := os.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
creds := credentials.NewTLS(&tls.Config{
RootCAs: caPool,
ServerName: "localhost", // 必须和证书的 CN 或 SAN 匹配
MinVersion: tls.VersionTLS13,
})
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
log.Println("TLS 连接成功")
}
// ==================== 方式2:mTLS 客户端 ====================
func mtlsClient() {
// 加载客户端证书
clientCert, _ := tls.LoadX509KeyPair("client.crt", "client.key")
// 加载 CA 证书(用于验证服务端证书)
caCert, _ := os.ReadFile("ca.crt")
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: caPool,
ServerName: "localhost",
MinVersion: tls.VersionTLS13,
})
conn, _ := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(creds))
defer conn.Close()
}7. gRPC Client 身份验证
go
package main
import (
"context"
"fmt"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// ==================== 方式1:PerRPCCredentials 接口 ====================
// 每次调用自动注入凭证
// TokenAuth 实现 credentials.PerRPCCredentials 接口
type TokenAuth struct {
Token string
}
func (t *TokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + t.Token,
}, nil
}
func (t *TokenAuth) RequireTransportSecurity() bool {
return false // 生产环境应该返回 true(强制 TLS)
}
// 客户端使用
func clientWithPerRPC() {
conn, _ := grpc.NewClient(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithPerRPCCredentials(&TokenAuth{Token: "my-jwt-token"}),
)
defer conn.Close()
// 之后所有 RPC 调用会自动携带 Authorization header
}
// ==================== 方式2:手动通过 metadata 传递 ====================
func clientWithMetadata() {
conn, _ := grpc.NewClient(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer conn.Close()
// 创建带 metadata 的 context
md := metadata.New(map[string]string{
"authorization": "Bearer my-jwt-token",
"x-request-id": "req-001",
"x-client-ver": "1.0.0",
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 或者追加到已有 context
ctx = metadata.AppendToOutgoingContext(ctx, "x-trace-id", "trace-abc")
// 用这个 ctx 发起调用
client := pb.NewUserServiceClient(conn)
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
_ = user
_ = err
}
// ==================== 服务端验证 ====================
func authServerInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 跳过不需要认证的方法
if info.FullMethod == "/user.UserService/Health" {
return handler(ctx, req)
}
// 从 metadata 提取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
values := md.Get("authorization")
if len(values) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
token := values[0]
// 验证 JWT(示例简化)
userID, err := validateJWT(token)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid token: %v", err)
}
// 将用户信息注入 context,后续 handler 可以使用
type ctxKey string
ctx = context.WithValue(ctx, ctxKey("user_id"), userID)
// 读取服务端 metadata
header := metadata.Pairs("x-served-by", "node-1")
grpc.SetHeader(ctx, header)
return handler(ctx, req)
}
func validateJWT(token string) (int64, error) {
// 实际项目中解析 JWT
if token == "Bearer valid-token" {
return 42, nil
}
return 0, fmt.Errorf("invalid")
}⚠️ 身份验证最佳实践
- 生产环境必须使用 TLS,
RequireTransportSecurity返回true - 推荐使用 JWT(无状态)或 OAuth2 Token
- 拦截器中跳过健康检查等公开接口
- 敏感信息不要放在 metadata 的 value 中明文传输(TLS 会加密整个通道)
8. gRPC Streaming
8.1 三种流模式
text
模式 定义 典型场景
──────────────────────────────────────────────────────────────
Server Streaming client 发 1 个请求 → server 返回流 实时推送、日志流
Client Streaming client 发送流 → server 返回 1 个响应 文件上传、批量导入
Bidirectional client 和 server 双向流 聊天、协作编辑8.2 Server Streaming 实现
go
// ==================== Proto 定义 ====================
// rpc WatchUser(GetUserRequest) returns (stream User);
// ==================== Server 端 ====================
func (s *UserServer) WatchUser(req *pb.GetUserRequest, stream pb.UserService_WatchUserServer) error {
// 每 2 秒推送一次用户最新状态
for i := 0; i < 10; i++ {
// 检查客户端是否断开
if err := stream.Context().Err(); err != nil {
log.Printf("客户端断开: %v", err)
return err
}
s.mu.RLock()
user, ok := s.users[req.Id]
s.mu.RUnlock()
if !ok {
return status.Errorf(codes.NotFound, "user %d not found", req.Id)
}
// 发送一条消息
if err := stream.Send(user); err != nil {
return err
}
time.Sleep(2 * time.Second)
}
return nil // 返回 nil 表示流正常结束
}
// ==================== Client 端 ====================
func watchUserClient(client pb.UserServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.WatchUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
log.Fatal(err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
fmt.Println("流结束")
break
}
if err != nil {
log.Fatal("接收错误:", err)
}
fmt.Printf("收到推送: %s (age=%d)\n", user.Username, user.Age)
}
}8.3 Client Streaming 实现
go
// ==================== Proto 定义 ====================
// rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// ==================== Server 端 ====================
func (s *UserServer) BatchCreateUsers(stream pb.UserService_BatchCreateUsersServer) error {
var users []*pb.User
for {
req, err := stream.Recv()
if err == io.EOF {
// 客户端发送完毕,返回结果
log.Printf("批量创建: 共 %d 个用户", len(users))
return stream.SendAndClose(&pb.ListUsersResponse{
Users: users,
Total: int64(len(users)),
})
}
if err != nil {
return err
}
// 处理每条消息
id := s.nextID.Add(1) - 1
user := &pb.User{
Id: id,
Username: req.Username,
Email: req.Email,
Age: req.Age,
Status: pb.UserStatus_USER_STATUS_ACTIVE,
CreatedAt: timestamppb.Now(),
}
s.mu.Lock()
s.users[id] = user
s.mu.Unlock()
users = append(users, user)
}
}
// ==================== Client 端 ====================
func batchCreateClient(client pb.UserServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.BatchCreateUsers(ctx)
if err != nil {
log.Fatal(err)
}
// 逐条发送
names := []string{"user_a", "user_b", "user_c", "user_d", "user_e"}
for _, name := range names {
err := stream.Send(&pb.CreateUserRequest{
Username: name,
Email: name + "@example.com",
Age: 20,
})
if err != nil {
log.Fatal("发送失败:", err)
}
fmt.Println("发送:", name)
}
// 关闭发送端,等待服务端响应
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
fmt.Printf("批量创建完成: %d 个用户\n", resp.Total)
}8.4 Bidirectional Streaming 实现
go
// ==================== Proto 定义 ====================
// rpc Chat(stream ChatMessage) returns (stream ChatMessage);
// ==================== Server 端 ====================
func (s *UserServer) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("[Chat] %s: %s", msg.From, msg.Content)
// 回复
reply := &pb.ChatMessage{
From: "server",
Content: fmt.Sprintf("收到你的消息: %s", msg.Content),
}
if err := stream.Send(reply); err != nil {
return err
}
}
}
// ==================== Client 端 ====================
func chatClient(client pb.UserServiceClient) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.Chat(ctx)
if err != nil {
log.Fatal(err)
}
// 启动接收协程
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Println("接收错误:", err)
return
}
fmt.Printf("← [%s]: %s\n", msg.From, msg.Content)
}
}()
// 发送消息
messages := []string{"你好", "gRPC 双向流", "结束"}
for _, text := range messages {
err := stream.Send(&pb.ChatMessage{
From: "client",
Content: text,
})
if err != nil {
log.Fatal("发送失败:", err)
}
fmt.Printf("→ 发送: %s\n", text)
time.Sleep(time.Second)
}
stream.CloseSend() // 关闭发送端
time.Sleep(time.Second)
}9. gRPC 公网调用
9.1 直接暴露 gRPC(TLS + 域名)
go
package main
import (
"crypto/tls"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"time"
)
func startPublicGRPCServer() {
// 加载 TLS 证书(Let's Encrypt 或购买的证书)
cert, err := tls.LoadX509KeyPair("/etc/letsencrypt/live/grpc.example.com/fullchain.pem",
"/etc/letsencrypt/live/grpc.example.com/privkey.pem")
if err != nil {
log.Fatal(err)
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS13,
})
server := grpc.NewServer(
grpc.Creds(creds),
// 公网必须设置 keepalive
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute, // 空闲连接超时
MaxConnectionAge: 30 * time.Minute, // 连接最长存活
MaxConnectionAgeGrace: 10 * time.Second, // 关闭前的宽限期
Time: 1 * time.Minute, // 心跳间隔
Timeout: 20 * time.Second, // 心跳超时
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 30 * time.Second, // 客户端最小心跳间隔
PermitWithoutStream: true,
}),
// 限制消息大小
grpc.MaxRecvMsgSize(4*1024*1024), // 4MB
grpc.MaxSendMsgSize(4*1024*1024),
// 拦截器
grpc.ChainUnaryInterceptor(
recoveryInterceptor,
loggingInterceptor,
authInterceptor,
),
)
// pb.RegisterUserServiceServer(server, NewUserServer())
lis, _ := net.Listen("tcp", ":443")
log.Println("公网 gRPC Server: :443")
server.Serve(lis)
}9.2 通过 Nginx 反向代理
nginx
# /etc/nginx/conf.d/grpc.conf
# Nginx 1.13.10+ 支持 gRPC 反向代理
upstream grpc_backend {
server 127.0.0.1:50051;
# 多实例负载均衡
# server 127.0.0.1:50052;
# server 127.0.0.1:50053;
}
server {
listen 443 ssl http2; # gRPC 基于 HTTP/2
server_name grpc.example.com;
ssl_certificate /etc/letsencrypt/live/grpc.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/grpc.example.com/privkey.pem;
ssl_protocols TLSv1.3;
# gRPC 反向代理
location / {
grpc_pass grpc://grpc_backend;
# 超时设置
grpc_read_timeout 300s;
grpc_send_timeout 300s;
# 错误处理
error_page 502 = /error502grpc;
}
location = /error502grpc {
internal;
default_type application/grpc;
add_header grpc-status 14;
add_header grpc-message "unavailable";
return 204;
}
}9.3 gRPC-Gateway(REST + gRPC 共存)
text
gRPC-Gateway 架构:
浏览器 / curl gRPC Client
(REST / JSON) (Protobuf)
│ │
▼ │
┌─────────────┐ │
│ gRPC-Gateway │ HTTP/JSON │
│ (反向代理) │───────────────│
└──────┬──────┘ │
│ gRPC/Protobuf │
▼ ▼
┌──────────────────────────────┐
│ gRPC Server │
└──────────────────────────────┘
优点:
- 一套 Proto 定义同时支持 gRPC 和 REST
- 浏览器可以直接调用
- 渐进式迁移:先 REST 后 gRPCprotobuf
// 在 proto 中添加 HTTP 映射
import "google/api/annotations.proto";
service UserService {
rpc GetUser(GetUserRequest) returns (User) {
option (google.api.http) = {
get: "/v1/users/{id}"
};
}
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse) {
option (google.api.http) = {
post: "/v1/users"
body: "*"
};
}
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse) {
option (google.api.http) = {
get: "/v1/users"
};
}
}shell
# 安装 gRPC-Gateway 插件
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@latest
# 生成 Gateway 代码
protoc -I proto \
-I third_party/googleapis \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
--grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative \
--openapiv2_out=. \
proto/user.protogo
package main
import (
"context"
"log"
"net"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/myproject/pb"
)
func main() {
ctx := context.Background()
// 启动 gRPC Server
go func() {
lis, _ := net.Listen("tcp", ":50051")
grpcServer := grpc.NewServer()
pb.RegisterUserServiceServer(grpcServer, NewUserServer())
log.Println("gRPC Server: :50051")
grpcServer.Serve(lis)
}()
// 启动 gRPC-Gateway(HTTP 反向代理到 gRPC)
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err := pb.RegisterUserServiceHandlerFromEndpoint(ctx, mux, "localhost:50051", opts)
if err != nil {
log.Fatal(err)
}
log.Println("HTTP Gateway: :8080")
log.Println(" GET /v1/users/{id}")
log.Println(" POST /v1/users")
log.Println(" GET /v1/users")
http.ListenAndServe(":8080", mux)
}
// 现在可以用两种方式调用:
// gRPC: grpcClient.GetUser(ctx, &pb.GetUserRequest{Id: 1})
// HTTP: curl http://localhost:8080/v1/users/19.4 公网部署清单
⚠️ gRPC 公网部署清单
- 必须启用 TLS(Let's Encrypt 免费证书)
- 配置合理的 Keepalive 参数(公网延迟高)
- 限制消息大小(
MaxRecvMsgSize) - 添加认证拦截器(JWT / OAuth2)
- 添加限流拦截器(防刷)
- 配置 Recovery 拦截器(防 panic 崩溃)
- 考虑用 Nginx/Envoy 做反向代理和负载均衡
- 如需浏览器直接调用,使用 gRPC-Gateway 或 gRPC-Web
附录:gRPC 常用 API 速查
text
Server 端:
grpc.NewServer(opts...) 创建 gRPC 服务
pb.RegisterXxxServer(s, impl) 注册服务实现
server.Serve(lis) 启动服务
server.GracefulStop() 优雅关闭
grpc.Creds(creds) TLS 凭证
grpc.ChainUnaryInterceptor(...) 链式一元拦截器
grpc.ChainStreamInterceptor(...) 链式流拦截器
grpc.MaxRecvMsgSize(n) 最大接收消息大小
grpc.KeepaliveParams(params) 心跳配置
Client 端:
grpc.NewClient(addr, opts...) 创建连接
pb.NewXxxClient(conn) 创建客户端
grpc.WithTransportCredentials(creds) TLS 凭证
grpc.WithPerRPCCredentials(auth) 自动注入认证信息
grpc.WithChainUnaryInterceptor(...) 链式一元拦截器
Metadata:
metadata.New(map) 创建 metadata
metadata.NewOutgoingContext(ctx, md) 设置出站 metadata
metadata.FromIncomingContext(ctx) 读取入站 metadata
metadata.AppendToOutgoingContext(ctx, k,v) 追加 metadata
错误处理:
status.Errorf(code, msg, args...) 创建 gRPC 错误
status.FromError(err) 解析 gRPC 错误
codes.NotFound / codes.Internal / ... 错误码常量
Protobuf:
proto.Marshal(msg) 序列化
proto.Unmarshal(data, msg) 反序列化
proto.Clone(msg) 深拷贝
proto.Equal(a, b) 相等比较
timestamppb.Now() 当前时间戳