目录
  1. 1. jsonrpc
    1. 1.1. 目录结构
    2. 1.2. 请求响应参数结构体
    3. 1.3. RPC Server端
      1. 1.3.1. Server端RPC方法
      2. 1.3.2. 创建RPC Server服务
    4. 1.4. RPC Client端
      1. 1.4.1. 建立连接 & 远程调用
    5. 1.5. jsonrpc 调用
  2. 2. gRPC
    1. 2.1. 安装
      1. 2.1.1. Install Go
      2. 2.1.2. Install Protocol Buffers v3
        1. 2.1.2.1. Install protoc
        2. 2.1.2.2. Install proctoc-gen-go
      3. 2.1.3. Install gRPC
    2. 2.2. 目录结构
    3. 2.3. 定义protobuf文件
    4. 2.4. 服务端
    5. 2.5. 客户端

RPC(Remote Procedure Call),远程调用。RPC消息传输可通过TCP、HTTP和UDP等。
Golang中实现RPC有多种方式:

  • net/rpc: 官方RPC库,支持TCP和HTTP方式传输,使用gob编解码(导致无法跨语言调用)
  • net/rpc/jsonrpc: 官方RPC库,仅支持TCP传输,使用json编解码(可跨语言调用)
  • gRPC: Google开源RPC库,基于HTTP/2传输,使用ProtoBuf编解码

当然除上面3种外还是许多RPC库,目前有所接触的是net/rpc/jsonrpcgRPC,主要记录这2种,后续若有接触其他实现方式会持续更新。

jsonrpc

使用net/rpc/jsonrpc库,想要方法能被远程访问,需要满足以下条件:
可输出在Golang中即首字母大写

  1. 方法的类型可输出 (the method’s type is exported) *T
  2. 方法可输出 (the method is exported)MethodName
  3. 方法必须有两个参数,且必须是输出类型或者是内建类型 (the method has two arguments, both exported or builtin types) T1 *T2
  4. 方法的第二个参数是指针类型 (the method’s second argument is a pointer) *T2
  5. 方法返回类型为 error (the method has return type error) error
1
func (t *T) MethodName(argType T1, replyType *T2) error

目录结构

演示实例目录结构

1
2
3
4
5
6
7
8
9
10
.
├── client // client rpc封装调用方法
│   └── rpc.go
├── common // 公共结构体
│   └── rpc.go
├── jsonrpcClient.go
├── jsonrpcServer.go
└── server // server rpc封装方法
├── agent.go
└── rpc.go

请求响应参数结构体

定义请求参数、响应参数的结构体,client和server都需要使用
common/rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package common

// RPC请求参数
type AgentReportRequest struct {
Hostname string `json:"hostname"`
IP string `json:"ip"`
Version string `json:"version"`
}

// RPC响应参数
// code == 0 -> success
// code == 1 -> fail
type RpcResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
}

RPC Server端

Server端RPC方法

Server端的RPC方法都需要符合之前提到的5个条件方可别远程调用,方法接收的请求参数和响应参数为之前预先定义好的
server/agent.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package server

import (
"github.com/mogl/jsonrpc/common"
"log"
)

type Agent int

// 方法需要满足之前提到的5个条件
// 从client传入的args中 获取请求参数,进行逻辑处理,并构造reply结构返回
func (ag *Agent) ReportStatus(args *common.AgentReportRequest, reply *common.RpcResponse) error {
if args.Hostname == "" {
reply.Code = 1
return nil
}

log.Printf("rpc server receive, hostname:%s, ip:%s, version:%s", args.Hostname, args.IP, args.Version)
reply.Code = 0
reply.Msg = "call rpc server successfully."

return nil
}

创建RPC Server服务

监听TCP端口、注册服务
server/rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package server

import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"time"
)

// 创建rpc server服务
func Start() {
addr := ":2333"

// 创建server
server := rpc.NewServer()

// 注册处理器
server.Register(new(Agent)) // server/agent.go

// 监听TCP端口
l, e := net.Listen("tcp", addr)
if e != nil {
log.Fatalln("listen error:", e)
} else {
log.Println("listerning", addr)
}

// 等待请求、每个client请求新建goroutine处理
for {
conn, err := l.Accept()
if err != nil {
log.Fatalln("listener accept fail:", err)
time.Sleep(time.Duration(100) * time.Millisecond)
continue
}
// 绑定rpc的编码器
// 新建jsonrpc编码器,并将该编码器绑定给http处理器
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}

RPC Client端

建立连接 & 远程调用

在进行远程调用前,client需要先和server建立连接。同步方式使用Call()函数调用,异步方式使用Go()函数调用(异步调用需要使用channel取回返回数据,Done)
client/rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package client

import (
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"sync"
"time"
)

// 自定义client连接用的结构体
type ConnRpcClient struct {
sync.RWMutex
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
}

// client和server建立连接
func (c *ConnRpcClient) rpcServerConn() error {
if c.rpcClient != nil {
return nil
}

// 重试次数
retry := 1

for {
if c.rpcClient != nil {
return nil
}

// TCP连接
conn, err := net.DialTimeout("tcp", c.RpcServer, c.Timeout)
if err != nil {
log.Printf("dial %s fail: %v", c.RpcServer, err)
// 最多尝试建立连接3次
if retry > 3 {
log.Printf("dial %s fail(retry %d times): %v", c.RpcServer, retry, err)
return err
}
time.Sleep(time.Second << uint(retry)) // 指数退避重试
retry++
continue
} else {
c.rpcClient = jsonrpc.NewClient(conn) // 基于TCP连接建立rpcClient实例
}
return err
}
}

// client调用server方法
func (c *ConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
// 一个client实例如果要串行调用,同步阻塞,则加锁
//c.Lock()
//defer c.Unlock()

// client和server建立连接
err := c.rpcServerConn()
if err != nil {
log.Fatalf("connect rpc server %s fail: %v", c.RpcServer, err)
return err
}

// 远程调用超时
timeout := time.Duration(15) * time.Second
done := make(chan error, 1)

// client远程调用方法
go func() {
// 同步方式调用
err := c.rpcClient.Call(method, args, reply)
done <- err
}()

// 调用超时机制
select {
case <-time.After(timeout):
log.Printf("rpc call timeout: %s -> %s", c.rpcClient, c.RpcServer)
c.rpcClient.Close()
return fmt.Errorf("rpc call timeout: %s -> %s", c.rpcClient, c.RpcServer)
case err := <-done:
if err != nil {
c.rpcClient.Close()
return err
}
}

return nil
}

jsonrpc 调用

jsonrpcServer.go

1
2
3
4
5
6
7
8
9
10
11
package main

import (
"github.com/mogl/jsonrpc/server"
)

func main() {
go server.Start() // 创建jsonrpc server服务

select {}
}

jsonrpcClient.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"github.com/mogl/jsonrpc/client"
"github.com/mogl/jsonrpc/common"
"log"
"time"
)

func main() {
// 请求参数
req := &common.AgentReportRequest{
Hostname: "sai",
IP: "localhost",
Version: "1.0",
}

// 响应参数
resp := &common.RpcResponse{}

// client实例
rpcClient := client.ConnRpcClient{
RpcServer: ":2333",
Timeout: time.Duration(10) * time.Second,
}

time.Sleep(time.Duration(3 * time.Second))
rpcClient.Call("Agent.ReportStatus", req, resp) // jsonrpc 调用

log.Println("response code:", resp.Code)
log.Println("response msg:", resp.Msg)
}

gRPC

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制。用 proto files 创建 gRPC 服务,用 protocol buffers 消息类型来定义方法参数和返回类型。你可以在 Protocol Buffers 文档找到更多关于 Protocol Buffers 的资料。
在使用gRPC前需要进行一些比较麻烦的安装

安装

Install Go

Go version >= 1.6

Install Protocol Buffers v3

Install protoc

protoc-[version]-[platform].zip

1
2
unzip protoc-3.7.1-linux-x86_64.zip -d /usr/local/protoc/
echo 'PATH=/usr/local/protoc/bin:$PATH' >> /etc/profile && source /etc/profile
Install proctoc-gen-go
1
go get -u github.com/golang/protobuf/protoc-gen-go

Install gRPC

1
2
# fail
go get -u google.golang.org/grpc
1
2
3
4
5
6
7
8
git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc
#git clone https://github.com/golang/net.git $GOPATH/src/golang.org/x/net
#git clone https://github.com/golang/text.git $GOPATH/src/golang.org/x/text
#git clone https://github.com/golang/sys.git $GOPATH/src/golang.org/x/sys
#go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
#git clone https://github.com/google/go-genproto.git $GOPATH/src/google.golang.org/genproto
cd $GOPATH/src/
go install google.golang.org/grpc

目录结构

演示实例目录结构

1
2
3
4
5
6
7
8
9
.
└── helloworld
├── client
│   └── main.go
├── protos
│   ├── helloworld.pb.go
│   └── helloworld.proto
└── server
└── main.go

定义protobuf文件

gRPC能定义4种类型服务方法

  • 单项 RPC: 客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用

    1
    2
    3
    service serviceName {
    rpc SayHello(HelloRequest) returns (HelloResponse){}
    }

  • 服务端流式 RPC: 客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。

    1
    2
    3
    service serviceName {
    rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){}
    }

  • 客户端流式 RPC: 客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。

    1
    2
    3
    service serviceName {
    rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {}
    }

  • 双向流式 RPC: 两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写。

    1
    2
    3
    service serviceName {
    rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){}
    }

protobuf3的消息类型定义规则,详细protobuf3语法参见: Protobuf3 语法指南

1
2
3
message <MessageName> {
<FieldType> <FieldName> = <UniqueNumber>;
}

protobuf 文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 协议版本
syntax = "proto3";

package helloworld;

// 定义服务及方法
service Greeter {
// 定义SayHello方法, 接收HelloRequest参数, 返回HelloReply
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 定义传入参数
message HelloRequest {
string name = 1; // 1 是每个字段的唯一标识符,新增一个字段则递增

// int32 age = 2;
}

// 定义返回参数
message HelloReply {
string msg = 1;
}

将protobuf文件编译成Go文件

1
2
3
4
5
# pwd: helloworld
protoc -I protos protos/helloworld.proto --go_out=plugins=grpc:./protos/

# pwd helloworld/protos
#protoc --go_out=plugins=grpc:. helloworld.proto

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"log"
"net"

pb "github.com/mogl/gRPC/helloworld/protos"
"golang.org/x/net/context"
"google.golang.org/grpc"
//"google.golang.org/grpc/reflection"
)

const (
address = ":50051"
)

// 定义server结构体, 用于实现SayHello方法
type server struct{}

// 实现SayHello方法
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Msg: "Hello " + req.Name}, nil
}

func main() {
// 监听TCP
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("listen %v failed.", address)
}

// new rpc server
s := grpc.NewServer()
// RegisterGreeterServer(), proto文件定义Greeter服务, protoc自动生成
pb.RegisterGreeterServer(s, &server{})

// Serve接收来自listener的连接,为每个连接创建ServerTransport和service goroutine处理连接请求
if err := s.Serve(listener); err != nil {
log.Fatalf("Failed to server: %v", err)
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"log"
"os"

pb "github.com/mogl/gRPC/helloworld/protos"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

const (
address = "localhost:50051"
defaultName = "World"
)

func main() {
// 与server建立连接
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("can't connect to the server: %v", address)
}
defer conn.Close()

// new client
client := pb.NewGreeterClient(conn)

name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}

// 连接server调用SayHello()函数
resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("client call SayHello() failed: %v", err)
}

log.Printf("server response: %v", resp.Msg)
}

Powered: Hexo, Theme: Nadya remastered from NadyMain