OpenTracing
微服务诊断分析系统
- 集中式日志系统(logging):ELK
- 集中式度量系统(metrics):Prometheus
- 分布式全链路追踪系统(tracing):Jaeger
- 完全实现OpenTracing
- Golang编写
- UDP传输
分布式全链路追踪系统核心步骤
- 代码埋点
- 数据存储
- 查询展示
OpenTracing 术语
为了解决不同的分布式追踪系统 API 不兼容的问题,诞生了 OpenTracing规范
一条 Trace(调用链)可以被认为是一个由多个 Span 组成的有向无环图(DAG图),Span 与 Span 的关系被命名为 References
Span,可以被翻译为跨度,可以被理解为一次方法调用, 一个程序块的调用, 或者一次RPC/数据库访问.只要是一个具有完整时间周期的程序访问,都可以被认为是一个 Span
一个Span包含以下状态:
- An operation name: 操作名称
- A start timestamp: 起始时间
- A finish timestamp: 结束时间
- Span Tag: 一组键值对构成的 Span 标签集合。键值对中,键必须为 string,值可以是字符串,布尔,或者数字类型。Tag不会被子Span继承
- Span Log: 一组span的日志集合。 每次log操作包含一个键值对,以及一个时间戳。 键值对中,键必须为string,值可以是任意类型。 但是需要注意,不是所有的支持OpenTracing的Tracer,都需要支持所有的值类型。
- SpanContext: Span 上下文对象。SpanContext代表跨越进程边界,传递到下级span的状态,并用于封装Baggage。
- References: Span间关系,相关的零个或者多个 Span(Span 间通过 SpanContext 建立这种关系)
- 一个Span可以与一个或者多个Span存在因果关系。OpenTracing目前定义了两种关系:
ChildOf
和FollowsFrom
- 一个Span可以与一个或者多个Span存在因果关系。OpenTracing目前定义了两种关系:
其他术语:
- Baggage: 存储在SpanContext中的键值对,在整个trace链路中全局传输,会被子Span继承。(全局传输代价大,过多baggage会影响性能)
- Inject & Extract: SpanContexts通过Inject/Extract实现跨进程通讯数据。SpanContexts通过Inject操作向Carrier(例如:HTTP header)增加数据,通过EXtract从Carrier中获取数据。
- Carrier: 用于在进场间携带SpanContext,对Carrier可进行Inject及Extract操作。OpenTracing支持2种Carrier格式(允许自定义Carrier格式):
- text map,平台惯用的map格式,基于unicode编码的
字符串
对字符串
键值对 - binary,不透明的二进制数组
- text map,平台惯用的map格式,基于unicode编码的
Jaeger
Jaeger 核心组件
jaeger-client: 为不同语言实现了符合 OpenTracing 标准的 SDK
jaeger-agent: 监听在 UDP 端口上接收 span 数据的网络守护进程,将数据批量发送给 jaeger-collector
端口号 | 协议 | 功能 |
---|---|---|
5775 | UDP | 通过兼容性 thrift 协议,接收 zipkin thrift 类型的数据 |
6831 | UDP | 通过兼容性 thrift 协议,接收 jaeger thrift 类型的数据 |
6832 | UDP | 通过二进制 thrift 协议,接收 jaeger thrift 类型的数据 |
5778 | HTTP | 可用于配置采样策略 |
- jaeger-collector: 接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。无状态
端口号 | 协议 | 功能 |
---|---|---|
14267 | TChannel | 用于接收 jaeger-agent 发送来的 jaeger.thrift 格式的 span |
14268 | HTTP | 能直接接收来自客户端的 jaeger.thrift 格式的 span |
14269 | HTTP | 健康检测/ |
14250 | gRPC | 用于接收 jaeger-agent 发送来的 model.proto 格式的 span |
9411 | HTTP | 能通过 JSON 或 Thrift 接收 Zipkin spans,默认关闭 |
Data Store: Cassandra、ElasticSearch
jaeger-query: 接收查询请求,然后从后端存储系统中检索 trace 并通过 UI 进行展示。无状态
端口号 | 协议 | 功能 |
---|---|---|
16686 | HTTP | 1. /api/* - API 端口路径 2. / - Jaeger UI 路径 |
16687 | HTTP | 健康检测/ |
- jaeger-ui: 查询结果展示
jaeger-agent 和 jaeger-collecor 之间使用的是Uber自研的 TChannel协议
可以支持 Thrift 和 HTTP+JSON 等, 该协议的设计目标之一是将分布式追踪能力融入协议中, 为了实现这一目标,TChannel协议规范将追踪字段直接定义到了二进制格式中, 形如:
1 | spanid:8 parentid:8 traceid:8 traceflags:1 |
各字段含义如下:
字段 | 类型 | 描述 |
---|---|---|
spanid | int64 | Span 标识 |
parentid | int64 | 父Span标识 |
traceid | int64 | 负责分配的原始操作方 |
traceflags | uint8 | 标志位 |
Jaeger 二进制部署
Supervisor
使用Supervisor管理,主配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# /etc/supervisord.conf
[unix_http_server]
file=/tmp/supervisor.sock ; the path to the socket file
[supervisord]
logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
loglevel=info ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false ; start in foreground if true; default false
minfds=655350 ; min. avail startup file descriptors; default 1024
minprocs=655350 ; min. avail process descriptors;default 200
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
[include]
files = /etc/supervisord.d/*.confjaeger-collect
1
2
3
4
5
6
7
8
9
10
11
12
13# /etc/supervisord.d/jaeger-collect.conf
[program:jaeger-collect]
command = /usr/local/jaeger/jaeger-collector --es.server-urls "http://192.168.1.180:9200" --es.timeout 30s --es.num-replicas 1 --es.num-shards 5
directory = /usr/local/jaeger/
user = root
startsecs = 3
stopwaitsecs= 5
redirect_stderr = true
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 10
stdout_logfile = /var/log/jaeger/jaeger-collect.log
environment=SPAN_STORAGE_TYPE="elasticsearch"jaeger-agent
1
2
3
4
5
6
7
8
9
10
11
12# /etc/supervisord.d/jaeger-agent.conf
[program:jaeger-agent]
command = /usr/local/jaeger/jaeger-agent --collector.host-port=192.168.1.180:14267
directory = /usr/local/jaeger/
user = root
startsecs = 3
stopwaitsecs= 5
redirect_stderr = true
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 10
stdout_logfile = /var/log/jaeger/jaeger-agent.logjaeger-query
1
2
3
4
5
6
7
8
9
10
11
12
13# /etc/supervisord.d/jaeger-query.conf
[program:jaeger-query]
command = /usr/local/jaeger/jaeger-query --es.server-urls "http://192.168.1.180:9200"
directory = /usr/local/jaeger/
user = root
startsecs = 3
stopwaitsecs= 5
redirect_stderr = true
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 10
stdout_logfile = /var/log/jaeger/jaeger-query.log
environment=SPAN_STORAGE_TYPE="elasticsearch"
Jaeger Golang Demo
初始化Jaeger
1 | func InitJaeger(service string) (opentracing.Tracer, io.Closer) { |
config.Configuration
结构体中最主要的是设置Sampler
和Reporter
。
Sampler
:采集设置,对应结构体为SamplerConfig
,提供4种采集策略(由Type
值指定采集策略):const
:全量采集(0/1)probabilistic
:概率采集(0~1, 0.1表示10个trace中只采集1个)rateLimiting
:限速采集,每秒采集设定的Span(e.g, 2表示每秒采集2个trace)remote
: 动态采集,根据当前访问量调节采集策略(adaptive sampler)
Reporter
:上报设置,对应结构体为ReporterConfig
,提供2种上报方式:LocalAgentHostPort
:上报至jaeger-agent,设置其IP:PortCollectorEndpoint
:上报至jaeger-collector,设置其IP:Port
内部函数追踪
内部函数的追踪通过context.Context
来实现span的传递
1 | func main() { |
调用StartSpan(oprationName string, opts ...StartSpanOption) Span
方法创建RootSpan
调用SetTag(key string, value interface{}) Span
方法设置tag
调用LogFields(fields ...log.Field)
方法设置log
使用opentracing.ContextWithSpan(ctx context.Context, span Span) context.Context
函数创建携带span的Context
,通过将Context
传递给被调函数来传递span
1 | func CallService1(ctx context.Context, request string) string { |
使用StartSpanFromContext(ctx context.Context, operationName string, opts ...StartSpanOption) (Span, context.Context)
函数创建span
如果ctx
中存在span,则创建的span为ctx
span的子span,否则创建的是RootSpan
ctxService1
是新创建span的Context,如果CallService1
需要调用其他函数创建子span,传递的Context应该为ctxService1
而非传入的ctx
参数
HTTP请求跟踪
HTTP请求的追踪是通过Carrier(HTTP Header)
传递span,client通过Inject()
函数将SpanContext注入到Carrier,再将Carrier写入到HTTP Header中,server通过Extract()
函数从HTTP Header中提取SpanContext,从而实现请求跟踪。
client(:2333/http) —(内部调用)—> CallHttpService1 —(远程调用)—> HttpService1(:2334/httpService1)
client
client的
http.HandleFunc("/http", func(w http.ResponseWriter, r *http.Request) {JaegerHttp(w, r, tracer)})
Handle函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// client handle function
func JaegerHttp(w http.ResponseWriter, r *http.Request, tracer opentracing.Tracer) {
// 服务端从Carrier(http header)中提取span
spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
// 创建span
// ext.RPCServerOption() 如果SpanContext中client==nil,则创建的span为RootSpan
span := tracer.StartSpan("jaegerHttpOpname", ext.RPCServerOption(spanCtx))
defer span.Finish()
// 设置tag
span.SetTag("httpKey", "httpValue")
// 设置baggage
span.SetBaggageItem("baggage", "123456789")
response := "Jaeger Http"
// 设置log
span.LogFields(
log.String("funcName", "JaegerHttp"),
log.String("response", response),
)
// 内部函数间跟踪,使用context.Context类型传输span
ctx := opentracing.ContextWithSpan(context.Background(), span)
CallHttpService1(ctx)
time.Sleep(1 * time.Second)
//CallHttpService2(ctx)
w.Write([]byte(response))
}CallHttpService1(client)
client
http.HandleFunc("/http", ...)
函数中调用CallHttpService1()
函数请求HttpService1
接口提供的服务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39func CallHttpService1(ctx context.Context) {
// 创建子Span
span, _ := opentracing.StartSpanFromContext(ctx, "CallHttpService1Opname")
defer span.Finish()
// 设置baggage
span.SetBaggageItem("baggage1", "12345")
// 请求设置
httpService1Url := "http://localhost:2334/httpService1"
req, err := http.NewRequest("GET", httpService1Url, nil)
if err != nil {
panic(err.Error())
}
// 向span中增加tag
ext.SpanKindRPCClient.Set(span)
ext.HTTPUrl.Set(span, httpService1Url)
ext.HTTPMethod.Set(span, "GET")
// 向Carrier(http header)中添加span
span.Tracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
// 请求接口
httpClient := http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
panic(err.Error())
}
body, err := ioutil.ReadAll(resp.Body)
// 设置log
span.LogFields(
log.String("body", string(body)),
)
}HttpService1(server)
server端注册
http.HandleFunc("/httpService1", ...)
路由,Handle处理函数JaegerHttpService1()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func JaegerHttpService1(w http.ResponseWriter, r *http.Request, tracer opentracing.Tracer) {
// 服务端从Carrier(http header)中提取span
spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
// 创建span
// ext.RPCServerOption() 如果SpanContext中client==nil,则创建的span为RootSpan
span := tracer.StartSpan("jaegerHttpService1Opname", ext.RPCServerOption(spanCtx))
defer span.Finish()
// 获取baggage
baggage := span.BaggageItem("baggage")
baggage1 := span.BaggageItem("baggage1")
// 设置tag
span.SetTag("baggage", baggage)
span.SetTag("baggage1", baggage1)
response := "Jaeger Http Service1"
// 设置log
span.LogFields(
log.String("funcName", "JaegerHttpService1"),
)
time.Sleep(2 * time.Second)
w.Write([]byte(response))
}
gRPC请求跟踪
此例子基于gRPC helloworld 实现
gRPC同样需要通过Carrier(metadata)
来传递Span,client通过Inject()
函数将SpanContext注入到Carrier
中,再将Carrier
写入到metadata中,server通过Extract()
函数从metadata中提取SpanContext
protobuf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// 协议版本
syntax = "proto3";
package protos;
// 定义服务及方法
service Greeter {
// 定义SayHello方法, 接收HelloRequest参数, 返回HelloReply
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 定义传入参数
// protoc根据此生成HelloRequest结构体,包含元素Name
// type HelloRequest struct {
// Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
// XXX_NoUnkeyedLiteral struct{} `json:"-"`
// XXX_unrecognized []byte `json:"-"`
// XXX_sizecache int32 `json:"-"`
// }
message HelloRequest {
string name = 1;
}
// 定义返回参数
// protoc根据此生成HelloReply结构体,包含元素Msg
//type HelloReply struct {
// Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
// XXX_NoUnkeyedLiteral struct{} `json:"-"`
// XXX_unrecognized []byte `json:"-"`
// XXX_sizecache int32 `json:"-"`
//}
message HelloReply {
string msg = 1;
}client
client通过
Inject()
函数将SpanContext注入到Carrier(metadata)中,此操作在client调用grpc.Dial()
函数时设置拦截器完成grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(gRPCClientInterceptor(tracer)))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109const (
address = "localhost:50051"
defaultName = "World"
)
type mdWriter struct {
metadata.MD
}
// opentracing.TextMapWriter method
// 重写Set()方法,将carrier写入metadata(key/value写入到metadata)
func (mrw mdWriter) Set(key, value string) {
mrw.MD[key] = append(mrw.MD[key], value)
}
// 将Span注入到metadata
func InjectClientSpanToMetadata(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) (context.Context, error) {
// 从context中获取metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
// 不能对md直接进行修改,会产生竞争,需要拷贝然后对副本就行修改
md = md.Copy()
}
// 将SpanContext注入到carrier(metadata)中
carrier := mdWriter{md}
err := tracer.Inject(clientSpan.Context(), opentracing.TextMap, carrier)
if err != nil {
return ctx, err
}
return metadata.NewOutgoingContext(ctx, md), nil
}
// 重写client interceptor 以传输数据
func gRPCClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// 尝试从context中获取父span
var parentCtx opentracing.SpanContext
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
parentCtx = parentSpan.Context()
}
// 创建client span
clientSpan := tracer.StartSpan(
method,
opentracing.ChildOf(parentCtx),
ext.SpanKindRPCClient,
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
)
defer clientSpan.Finish()
// 设置baggage
clientSpan.SetBaggageItem("baggage", "gRPC-Baggage")
// 将SpanContext注入到metadata
newCtx, err := InjectClientSpanToMetadata(ctx, tracer, clientSpan)
if err != nil {
log.Printf("inject to metadata err %v", err)
}
err = invoker(newCtx, method, req, reply, cc, opts...)
if err != nil {
log.Printf("SpanContext inject to metadata failed: %v", err)
}
return err
}
}
func main() {
// 初始化并创建tracer
tracer, closer := tracing.InitJaeger("jaeger-gRPC-client")
defer closer.Close()
// opentracing.StartSpanFromContext()需要使用opentracing.SetGlobalTracer()创建新span
opentracing.SetGlobalTracer(tracer)
// 与server建立连接
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(gRPCClientInterceptor(tracer)))
if err != nil {
log.Fatalf("can't connect to the server: %v", address)
}
defer conn.Close()
// new client
client := pb.NewGreeterClient(conn)
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
// 连接server调用SayHello()函数
resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("client call SayHello() failed: %v", err)
}
log.Printf("server response: %v", resp.Msg)
}server
server通过
Extract()
函数从Carrier(metadata)中提取SpanContext,此操作在调用grpc.NewServer()
时设置拦截器实现grpc.NewServer(grpc.UnaryInterceptor(gRPCServerInterceptor(tracer)))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65type mdReader struct {
metadata.MD
}
// opentracing.TextMapReader method
// 读取metadata中的所有span
func (mdr mdReader) ForeachKey(handler func(key, value string) error) error {
for key, value := range mdr.MD {
for _, val := range value {
if err := handler(key, val); err != nil {
return err
}
}
}
return nil
}
// 从metadata提取SpanContext
func ExtractSpanFromMetadata(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
}
carrier := mdReader{md}
spanContext, err := tracer.Extract(opentracing.TextMap, carrier)
return spanContext, err
}
// 重写server interceptor 以传输数据
func gRPCServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
// 从metadata中提取spanContext
spanContext, err := ExtractSpanFromMetadata(ctx, tracer)
if err != nil {
log.Printf("Extract from metadata failed: %v", err)
}
// 创建server span
serverSpan := tracer.StartSpan(
info.FullMethod,
ext.RPCServerOption(spanContext),
ext.SpanKindRPCServer,
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
)
defer serverSpan.Finish()
// 获取baggage,设置tag
baggage := serverSpan.BaggageItem("baggage")
serverSpan.SetTag("baggage", baggage)
// 创建新context
newCtx := opentracing.ContextWithSpan(ctx, serverSpan)
return handler(newCtx, req)
}
}