RPC(Remote Procedure Call),远程调用。RPC消息传输可通过TCP、HTTP和UDP等。
Golang中实现RPC有多种方式:
net/rpc
: 官方RPC库,支持TCP和HTTP方式传输,使用gob
编解码(导致无法跨语言调用)
net/rpc/jsonrpc
: 官方RPC库,仅支持TCP传输,使用json
编解码(可跨语言调用)
gRPC
: Google开源RPC库,基于HTTP/2传输,使用ProtoBuf
编解码
当然除上面3种外还是许多RPC库,目前有所接触的是net/rpc/jsonrpc
、gRPC
,主要记录这2种,后续若有接触其他实现方式会持续更新。
jsonrpc
使用net/rpc/jsonrpc
库,想要方法能被远程访问,需要满足以下条件:
可输出在Golang中即首字母大写
- 方法的类型可输出 (the method’s type is exported)
*T
- 方法可输出 (the method is exported)
MethodName
- 方法必须有两个参数,且必须是输出类型或者是内建类型 (the method has two arguments, both exported or builtin types)
T1
*T2
- 方法的第二个参数是指针类型 (the method’s second argument is a pointer)
*T2
- 方法返回类型为 error (the method has return type error)
error
1
| func (t *T) MethodName(argType T1, replyType *T2) error
|
目录结构
演示实例目录结构
1 2 3 4 5 6 7 8 9 10
| . ├── client // client rpc封装调用方法 │ └── rpc.go ├── common // 公共结构体 │ └── rpc.go ├── jsonrpcClient.go ├── jsonrpcServer.go └── server // server rpc封装方法 ├── agent.go └── rpc.go
|
请求响应参数结构体
定义请求参数、响应参数的结构体,client和server都需要使用
common/rpc.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package common
type AgentReportRequest struct { Hostname string `json:"hostname"` IP string `json:"ip"` Version string `json:"version"` }
type RpcResponse struct { Code int `json:"code"` Msg string `json:"msg"` }
|
RPC Server端
Server端RPC方法
Server端的RPC方法都需要符合之前提到的5个条件方可别远程调用,方法接收的请求参数和响应参数为之前预先定义好的
server/agent.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package server
import ( "github.com/mogl/jsonrpc/common" "log" )
type Agent int
func (ag *Agent) ReportStatus(args *common.AgentReportRequest, reply *common.RpcResponse) error { if args.Hostname == "" { reply.Code = 1 return nil }
log.Printf("rpc server receive, hostname:%s, ip:%s, version:%s", args.Hostname, args.IP, args.Version) reply.Code = 0 reply.Msg = "call rpc server successfully."
return nil }
|
创建RPC Server服务
监听TCP端口、注册服务
server/rpc.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package server
import ( "log" "net" "net/rpc" "net/rpc/jsonrpc" "time" )
func Start() { addr := ":2333"
server := rpc.NewServer()
server.Register(new(Agent))
l, e := net.Listen("tcp", addr) if e != nil { log.Fatalln("listen error:", e) } else { log.Println("listerning", addr) }
for { conn, err := l.Accept() if err != nil { log.Fatalln("listener accept fail:", err) time.Sleep(time.Duration(100) * time.Millisecond) continue } go server.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
|
RPC Client端
建立连接 & 远程调用
在进行远程调用前,client需要先和server建立连接。同步方式使用Call()
函数调用,异步方式使用Go()
函数调用(异步调用需要使用channel取回返回数据,Done
)
client/rpc.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| package client
import ( "fmt" "log" "net" "net/rpc" "net/rpc/jsonrpc" "sync" "time" )
type ConnRpcClient struct { sync.RWMutex rpcClient *rpc.Client RpcServer string Timeout time.Duration }
func (c *ConnRpcClient) rpcServerConn() error { if c.rpcClient != nil { return nil }
retry := 1
for { if c.rpcClient != nil { return nil }
conn, err := net.DialTimeout("tcp", c.RpcServer, c.Timeout) if err != nil { log.Printf("dial %s fail: %v", c.RpcServer, err) if retry > 3 { log.Printf("dial %s fail(retry %d times): %v", c.RpcServer, retry, err) return err } time.Sleep(time.Second << uint(retry)) retry++ continue } else { c.rpcClient = jsonrpc.NewClient(conn) } return err } }
func (c *ConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
err := c.rpcServerConn() if err != nil { log.Fatalf("connect rpc server %s fail: %v", c.RpcServer, err) return err }
timeout := time.Duration(15) * time.Second done := make(chan error, 1)
go func() { err := c.rpcClient.Call(method, args, reply) done <- err }()
select { case <-time.After(timeout): log.Printf("rpc call timeout: %s -> %s", c.rpcClient, c.RpcServer) c.rpcClient.Close() return fmt.Errorf("rpc call timeout: %s -> %s", c.rpcClient, c.RpcServer) case err := <-done: if err != nil { c.rpcClient.Close() return err } }
return nil }
|
jsonrpc 调用
jsonrpcServer.go
1 2 3 4 5 6 7 8 9 10 11
| package main
import ( "github.com/mogl/jsonrpc/server" )
func main() { go server.Start()
select {} }
|
jsonrpcClient.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package main
import ( "github.com/mogl/jsonrpc/client" "github.com/mogl/jsonrpc/common" "log" "time" )
func main() { req := &common.AgentReportRequest{ Hostname: "sai", IP: "localhost", Version: "1.0", }
resp := &common.RpcResponse{}
rpcClient := client.ConnRpcClient{ RpcServer: ":2333", Timeout: time.Duration(10) * time.Second, }
time.Sleep(time.Duration(3 * time.Second)) rpcClient.Call("Agent.ReportStatus", req, resp)
log.Println("response code:", resp.Code) log.Println("response msg:", resp.Msg) }
|
gRPC
gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制。用 proto files 创建 gRPC 服务,用 protocol buffers 消息类型来定义方法参数和返回类型。你可以在 Protocol Buffers 文档找到更多关于 Protocol Buffers 的资料。
在使用gRPC前需要进行一些比较麻烦的安装
安装
Install Go
Go version >= 1.6
Install Protocol Buffers v3
Install protoc
protoc-[version]-[platform].zip
1 2
| unzip protoc-3.7.1-linux-x86_64.zip -d /usr/local/protoc/ echo 'PATH=/usr/local/protoc/bin:$PATH' >> /etc/profile && source /etc/profile
|
Install proctoc-gen-go
1
| go get -u github.com/golang/protobuf/protoc-gen-go
|
Install gRPC
1 2
| go get -u google.golang.org/grpc
|
1 2 3 4 5 6 7 8
| git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc
cd $GOPATH/src/ go install google.golang.org/grpc
|
目录结构
演示实例目录结构
1 2 3 4 5 6 7 8 9
| . └── helloworld ├── client │ └── main.go ├── protos │ ├── helloworld.pb.go │ └── helloworld.proto └── server └── main.go
|
定义protobuf文件
gRPC能定义4种类型服务方法
单项 RPC: 客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用
1 2 3
| service serviceName { rpc SayHello(HelloRequest) returns (HelloResponse){} }
|
服务端流式 RPC: 客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
1 2 3
| service serviceName { rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){} }
|
客户端流式 RPC: 客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
1 2 3
| service serviceName { rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {} }
|
双向流式 RPC: 两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写。
1 2 3
| service serviceName { rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){} }
|
protobuf3的消息类型定义规则,详细protobuf3语法参见: Protobuf3 语法指南
1 2 3
| message <MessageName> { <FieldType> <FieldName> = <UniqueNumber>; }
|
protobuf 文件编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| syntax = "proto3";
package helloworld;
service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} }
// 定义传入参数 message HelloRequest { string name = 1; }
message HelloReply { string msg = 1; }
|
将protobuf文件编译成Go文件
1 2 3 4 5
| protoc -I protos protos/helloworld.proto --go_out=plugins=grpc:./protos/
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package main
import ( "log" "net"
pb "github.com/mogl/gRPC/helloworld/protos" "golang.org/x/net/context" "google.golang.org/grpc" )
const ( address = ":50051" )
type server struct{}
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) { return &pb.HelloReply{Msg: "Hello " + req.Name}, nil }
func main() { listener, err := net.Listen("tcp", address) if err != nil { log.Fatalf("listen %v failed.", address) }
s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{})
if err := s.Serve(listener); err != nil { log.Fatalf("Failed to server: %v", err) } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package main
import ( "log" "os"
pb "github.com/mogl/gRPC/helloworld/protos" "golang.org/x/net/context" "google.golang.org/grpc" )
const ( address = "localhost:50051" defaultName = "World" )
func main() { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatalf("can't connect to the server: %v", address) } defer conn.Close()
client := pb.NewGreeterClient(conn)
name := defaultName if len(os.Args) > 1 { name = os.Args[1] }
resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: name}) if err != nil { log.Fatalf("client call SayHello() failed: %v", err) }
log.Printf("server response: %v", resp.Msg) }
|