OpenTracing
微服务诊断分析系统
- 集中式日志系统(logging):ELK
- 集中式度量系统(metrics):Prometheus
- 分布式全链路追踪系统(tracing):Jaeger- 完全实现OpenTracing
- Golang编写
- UDP传输
 
分布式全链路追踪系统核心步骤
- 代码埋点
- 数据存储
- 查询展示
OpenTracing 术语
为了解决不同的分布式追踪系统 API 不兼容的问题,诞生了 OpenTracing规范
一条 Trace(调用链)可以被认为是一个由多个 Span 组成的有向无环图(DAG图),Span 与 Span 的关系被命名为 References
Span,可以被翻译为跨度,可以被理解为一次方法调用, 一个程序块的调用, 或者一次RPC/数据库访问.只要是一个具有完整时间周期的程序访问,都可以被认为是一个 Span
一个Span包含以下状态:
- An operation name: 操作名称
- A start timestamp: 起始时间
- A finish timestamp: 结束时间
- Span Tag: 一组键值对构成的 Span 标签集合。键值对中,键必须为 string,值可以是字符串,布尔,或者数字类型。Tag不会被子Span继承
- Span Log: 一组span的日志集合。 每次log操作包含一个键值对,以及一个时间戳。 键值对中,键必须为string,值可以是任意类型。 但是需要注意,不是所有的支持OpenTracing的Tracer,都需要支持所有的值类型。
- SpanContext: Span 上下文对象。SpanContext代表跨越进程边界,传递到下级span的状态,并用于封装Baggage。
- References: Span间关系,相关的零个或者多个 Span(Span 间通过 SpanContext 建立这种关系)- 一个Span可以与一个或者多个Span存在因果关系。OpenTracing目前定义了两种关系:ChildOf和FollowsFrom
 
- 一个Span可以与一个或者多个Span存在因果关系。OpenTracing目前定义了两种关系:
其他术语:
- Baggage: 存储在SpanContext中的键值对,在整个trace链路中全局传输,会被子Span继承。(全局传输代价大,过多baggage会影响性能)
- Inject & Extract: SpanContexts通过Inject/Extract实现跨进程通讯数据。SpanContexts通过Inject操作向Carrier(例如:HTTP header)增加数据,通过EXtract从Carrier中获取数据。
- Carrier: 用于在进场间携带SpanContext,对Carrier可进行Inject及Extract操作。OpenTracing支持2种Carrier格式(允许自定义Carrier格式):- text map,平台惯用的map格式,基于unicode编码的字符串对字符串键值对
- binary,不透明的二进制数组
 
- text map,平台惯用的map格式,基于unicode编码的
Jaeger
Jaeger 核心组件
- jaeger-client: 为不同语言实现了符合 OpenTracing 标准的 SDK 
- jaeger-agent: 监听在 UDP 端口上接收 span 数据的网络守护进程,将数据批量发送给 jaeger-collector 
| 端口号 | 协议 | 功能 | 
|---|---|---|
| 5775 | UDP | 通过兼容性 thrift 协议,接收 zipkin thrift 类型的数据 | 
| 6831 | UDP | 通过兼容性 thrift 协议,接收 jaeger thrift 类型的数据 | 
| 6832 | UDP | 通过二进制 thrift 协议,接收 jaeger thrift 类型的数据 | 
| 5778 | HTTP | 可用于配置采样策略 | 
- jaeger-collector: 接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。无状态
| 端口号 | 协议 | 功能 | 
|---|---|---|
| 14267 | TChannel | 用于接收 jaeger-agent 发送来的 jaeger.thrift 格式的 span | 
| 14268 | HTTP | 能直接接收来自客户端的 jaeger.thrift 格式的 span | 
| 14269 | HTTP | 健康检测/ | 
| 14250 | gRPC | 用于接收 jaeger-agent 发送来的 model.proto 格式的 span | 
| 9411 | HTTP | 能通过 JSON 或 Thrift 接收 Zipkin spans,默认关闭 | 
- Data Store: Cassandra、ElasticSearch 
- jaeger-query: 接收查询请求,然后从后端存储系统中检索 trace 并通过 UI 进行展示。无状态 
| 端口号 | 协议 | 功能 | 
|---|---|---|
| 16686 | HTTP | 1. /api/* - API 端口路径 2. / - Jaeger UI 路径 | 
| 16687 | HTTP | 健康检测/ | 
- jaeger-ui: 查询结果展示
jaeger-agent 和 jaeger-collecor 之间使用的是Uber自研的 TChannel协议
可以支持 Thrift 和 HTTP+JSON 等, 该协议的设计目标之一是将分布式追踪能力融入协议中, 为了实现这一目标,TChannel协议规范将追踪字段直接定义到了二进制格式中, 形如:
| 1 | spanid:8 parentid:8 traceid:8 traceflags:1 | 
各字段含义如下:
| 字段 | 类型 | 描述 | 
|---|---|---|
| spanid | int64 | Span 标识 | 
| parentid | int64 | 父Span标识 | 
| traceid | int64 | 负责分配的原始操作方 | 
| traceflags | uint8 | 标志位 | 
Jaeger 二进制部署
- Supervisor - 使用Supervisor管理,主配置文件 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22- # /etc/supervisord.conf 
 [unix_http_server]
 file=/tmp/supervisor.sock ; the path to the socket file
 [supervisord]
 logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
 logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
 logfile_backups=10 ; # of main logfile backups; 0 means none, default 10
 loglevel=info ; log level; default info; others: debug,warn,trace
 pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
 nodaemon=false ; start in foreground if true; default false
 minfds=655350 ; min. avail startup file descriptors; default 1024
 minprocs=655350 ; min. avail process descriptors;default 200
 [rpcinterface:supervisor]
 supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
 [supervisorctl]
 serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
 [include]
 files = /etc/supervisord.d/*.conf
- jaeger-collect - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13- # /etc/supervisord.d/jaeger-collect.conf 
 [program:jaeger-collect]
 command = /usr/local/jaeger/jaeger-collector --es.server-urls "http://192.168.1.180:9200" --es.timeout 30s --es.num-replicas 1 --es.num-shards 5
 directory = /usr/local/jaeger/
 user = root
 startsecs = 3
 stopwaitsecs= 5
 redirect_stderr = true
 stdout_logfile_maxbytes = 50MB
 stdout_logfile_backups = 10
 stdout_logfile = /var/log/jaeger/jaeger-collect.log
 environment=SPAN_STORAGE_TYPE="elasticsearch"
- jaeger-agent - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12- # /etc/supervisord.d/jaeger-agent.conf 
 [program:jaeger-agent]
 command = /usr/local/jaeger/jaeger-agent --collector.host-port=192.168.1.180:14267
 directory = /usr/local/jaeger/
 user = root
 startsecs = 3
 stopwaitsecs= 5
 redirect_stderr = true
 stdout_logfile_maxbytes = 50MB
 stdout_logfile_backups = 10
 stdout_logfile = /var/log/jaeger/jaeger-agent.log
- jaeger-query - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13- # /etc/supervisord.d/jaeger-query.conf 
 [program:jaeger-query]
 command = /usr/local/jaeger/jaeger-query --es.server-urls "http://192.168.1.180:9200"
 directory = /usr/local/jaeger/
 user = root
 startsecs = 3
 stopwaitsecs= 5
 redirect_stderr = true
 stdout_logfile_maxbytes = 50MB
 stdout_logfile_backups = 10
 stdout_logfile = /var/log/jaeger/jaeger-query.log
 environment=SPAN_STORAGE_TYPE="elasticsearch"
Jaeger Golang Demo
初始化Jaeger
| 1 | func InitJaeger(service string) (opentracing.Tracer, io.Closer) { | 
config.Configuration结构体中最主要的是设置Sampler和Reporter。
- Sampler:采集设置,对应结构体为- SamplerConfig,提供4种采集策略(由- Type值指定采集策略):- const:全量采集(0/1)
- probabilistic:概率采集(0~1, 0.1表示10个trace中只采集1个)
- rateLimiting:限速采集,每秒采集设定的Span(e.g, 2表示每秒采集2个trace)
- remote: 动态采集,根据当前访问量调节采集策略(adaptive sampler)
 
- Reporter:上报设置,对应结构体为- ReporterConfig,提供2种上报方式:- LocalAgentHostPort:上报至jaeger-agent,设置其IP:Port
- CollectorEndpoint:上报至jaeger-collector,设置其IP:Port
 
内部函数追踪
内部函数的追踪通过context.Context来实现span的传递
| 1 | func main() { | 
调用StartSpan(oprationName string, opts ...StartSpanOption) Span方法创建RootSpan
调用SetTag(key string, value interface{}) Span方法设置tag
调用LogFields(fields ...log.Field)方法设置log
使用opentracing.ContextWithSpan(ctx context.Context, span Span) context.Context函数创建携带span的Context,通过将Context传递给被调函数来传递span
| 1 | func CallService1(ctx context.Context, request string) string { | 
使用StartSpanFromContext(ctx context.Context, operationName string, opts ...StartSpanOption) (Span, context.Context)函数创建span
如果ctx中存在span,则创建的span为ctxspan的子span,否则创建的是RootSpan
ctxService1是新创建span的Context,如果CallService1需要调用其他函数创建子span,传递的Context应该为ctxService1而非传入的ctx参数

HTTP请求跟踪
HTTP请求的追踪是通过Carrier(HTTP Header)传递span,client通过Inject()函数将SpanContext注入到Carrier,再将Carrier写入到HTTP Header中,server通过Extract()函数从HTTP Header中提取SpanContext,从而实现请求跟踪。
client(:2333/http) —(内部调用)—> CallHttpService1 —(远程调用)—> HttpService1(:2334/httpService1)
- client - client的 - http.HandleFunc("/http", func(w http.ResponseWriter, r *http.Request) {JaegerHttp(w, r, tracer)})Handle函数- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30- // client handle function 
 func JaegerHttp(w http.ResponseWriter, r *http.Request, tracer opentracing.Tracer) {
 // 服务端从Carrier(http header)中提取span
 spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
 // 创建span
 // ext.RPCServerOption() 如果SpanContext中client==nil,则创建的span为RootSpan
 span := tracer.StartSpan("jaegerHttpOpname", ext.RPCServerOption(spanCtx))
 defer span.Finish()
 // 设置tag
 span.SetTag("httpKey", "httpValue")
 // 设置baggage
 span.SetBaggageItem("baggage", "123456789")
 response := "Jaeger Http"
 // 设置log
 span.LogFields(
 log.String("funcName", "JaegerHttp"),
 log.String("response", response),
 )
 // 内部函数间跟踪,使用context.Context类型传输span
 ctx := opentracing.ContextWithSpan(context.Background(), span)
 CallHttpService1(ctx)
 time.Sleep(1 * time.Second)
 //CallHttpService2(ctx)
 w.Write([]byte(response))
 }
- CallHttpService1(client) - client - http.HandleFunc("/http", ...)函数中调用- CallHttpService1()函数请求- HttpService1接口提供的服务- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39- func CallHttpService1(ctx context.Context) { 
 // 创建子Span
 span, _ := opentracing.StartSpanFromContext(ctx, "CallHttpService1Opname")
 defer span.Finish()
 // 设置baggage
 span.SetBaggageItem("baggage1", "12345")
 // 请求设置
 httpService1Url := "http://localhost:2334/httpService1"
 req, err := http.NewRequest("GET", httpService1Url, nil)
 if err != nil {
 panic(err.Error())
 }
 // 向span中增加tag
 ext.SpanKindRPCClient.Set(span)
 ext.HTTPUrl.Set(span, httpService1Url)
 ext.HTTPMethod.Set(span, "GET")
 // 向Carrier(http header)中添加span
 span.Tracer().Inject(
 span.Context(),
 opentracing.HTTPHeaders,
 opentracing.HTTPHeadersCarrier(req.Header),
 )
 // 请求接口
 httpClient := http.Client{}
 resp, err := httpClient.Do(req)
 if err != nil {
 panic(err.Error())
 }
 body, err := ioutil.ReadAll(resp.Body)
 // 设置log
 span.LogFields(
 log.String("body", string(body)),
 )
 }
- HttpService1(server) - server端注册 - http.HandleFunc("/httpService1", ...)路由,Handle处理函数- JaegerHttpService1()- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26- func JaegerHttpService1(w http.ResponseWriter, r *http.Request, tracer opentracing.Tracer) { 
 // 服务端从Carrier(http header)中提取span
 spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
 // 创建span
 // ext.RPCServerOption() 如果SpanContext中client==nil,则创建的span为RootSpan
 span := tracer.StartSpan("jaegerHttpService1Opname", ext.RPCServerOption(spanCtx))
 defer span.Finish()
 // 获取baggage
 baggage := span.BaggageItem("baggage")
 baggage1 := span.BaggageItem("baggage1")
 // 设置tag
 span.SetTag("baggage", baggage)
 span.SetTag("baggage1", baggage1)
 response := "Jaeger Http Service1"
 // 设置log
 span.LogFields(
 log.String("funcName", "JaegerHttpService1"),
 )
 time.Sleep(2 * time.Second)
 w.Write([]byte(response))
 }


gRPC请求跟踪
此例子基于gRPC helloworld 实现
gRPC同样需要通过Carrier(metadata)来传递Span,client通过Inject()函数将SpanContext注入到Carrier中,再将Carrier写入到metadata中,server通过Extract()函数从metadata中提取SpanContext
- protobuf - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34- // 协议版本 
 syntax = "proto3";
 package protos;
 // 定义服务及方法
 service Greeter {
 // 定义SayHello方法, 接收HelloRequest参数, 返回HelloReply
 rpc SayHello (HelloRequest) returns (HelloReply) {}
 }
 // 定义传入参数
 // protoc根据此生成HelloRequest结构体,包含元素Name
 // type HelloRequest struct {
 // Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
 // XXX_NoUnkeyedLiteral struct{} `json:"-"`
 // XXX_unrecognized []byte `json:"-"`
 // XXX_sizecache int32 `json:"-"`
 // }
 message HelloRequest {
 string name = 1;
 }
 // 定义返回参数
 // protoc根据此生成HelloReply结构体,包含元素Msg
 //type HelloReply struct {
 // Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
 // XXX_NoUnkeyedLiteral struct{} `json:"-"`
 // XXX_unrecognized []byte `json:"-"`
 // XXX_sizecache int32 `json:"-"`
 //}
 message HelloReply {
 string msg = 1;
 }
- client - client通过 - Inject()函数将SpanContext注入到Carrier(metadata)中,此操作在client调用- grpc.Dial()函数时设置拦截器完成- grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(gRPCClientInterceptor(tracer)))- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109- const ( 
 address = "localhost:50051"
 defaultName = "World"
 )
 type mdWriter struct {
 metadata.MD
 }
 // opentracing.TextMapWriter method
 // 重写Set()方法,将carrier写入metadata(key/value写入到metadata)
 func (mrw mdWriter) Set(key, value string) {
 mrw.MD[key] = append(mrw.MD[key], value)
 }
 // 将Span注入到metadata
 func InjectClientSpanToMetadata(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) (context.Context, error) {
 // 从context中获取metadata
 md, ok := metadata.FromIncomingContext(ctx)
 if !ok {
 md = metadata.New(nil)
 } else {
 // 不能对md直接进行修改,会产生竞争,需要拷贝然后对副本就行修改
 md = md.Copy()
 }
 // 将SpanContext注入到carrier(metadata)中
 carrier := mdWriter{md}
 err := tracer.Inject(clientSpan.Context(), opentracing.TextMap, carrier)
 if err != nil {
 return ctx, err
 }
 return metadata.NewOutgoingContext(ctx, md), nil
 }
 // 重写client interceptor 以传输数据
 func gRPCClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
 return func(
 ctx context.Context,
 method string,
 req, reply interface{},
 cc *grpc.ClientConn,
 invoker grpc.UnaryInvoker,
 opts ...grpc.CallOption,
 ) error {
 // 尝试从context中获取父span
 var parentCtx opentracing.SpanContext
 if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
 parentCtx = parentSpan.Context()
 }
 // 创建client span
 clientSpan := tracer.StartSpan(
 method,
 opentracing.ChildOf(parentCtx),
 ext.SpanKindRPCClient,
 opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
 )
 defer clientSpan.Finish()
 // 设置baggage
 clientSpan.SetBaggageItem("baggage", "gRPC-Baggage")
 // 将SpanContext注入到metadata
 newCtx, err := InjectClientSpanToMetadata(ctx, tracer, clientSpan)
 if err != nil {
 log.Printf("inject to metadata err %v", err)
 }
 err = invoker(newCtx, method, req, reply, cc, opts...)
 if err != nil {
 log.Printf("SpanContext inject to metadata failed: %v", err)
 }
 return err
 }
 }
 func main() {
 // 初始化并创建tracer
 tracer, closer := tracing.InitJaeger("jaeger-gRPC-client")
 defer closer.Close()
 // opentracing.StartSpanFromContext()需要使用opentracing.SetGlobalTracer()创建新span
 opentracing.SetGlobalTracer(tracer)
 // 与server建立连接
 conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(gRPCClientInterceptor(tracer)))
 if err != nil {
 log.Fatalf("can't connect to the server: %v", address)
 }
 defer conn.Close()
 // new client
 client := pb.NewGreeterClient(conn)
 name := defaultName
 if len(os.Args) > 1 {
 name = os.Args[1]
 }
 // 连接server调用SayHello()函数
 resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: name})
 if err != nil {
 log.Fatalf("client call SayHello() failed: %v", err)
 }
 log.Printf("server response: %v", resp.Msg)
 }
- server - server通过 - Extract()函数从Carrier(metadata)中提取SpanContext,此操作在调用- grpc.NewServer()时设置拦截器实现- grpc.NewServer(grpc.UnaryInterceptor(gRPCServerInterceptor(tracer)))- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65- type mdReader struct { 
 metadata.MD
 }
 // opentracing.TextMapReader method
 // 读取metadata中的所有span
 func (mdr mdReader) ForeachKey(handler func(key, value string) error) error {
 for key, value := range mdr.MD {
 for _, val := range value {
 if err := handler(key, val); err != nil {
 return err
 }
 }
 }
 return nil
 }
 // 从metadata提取SpanContext
 func ExtractSpanFromMetadata(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
 md, ok := metadata.FromIncomingContext(ctx)
 if !ok {
 md = metadata.New(nil)
 }
 carrier := mdReader{md}
 spanContext, err := tracer.Extract(opentracing.TextMap, carrier)
 return spanContext, err
 }
 // 重写server interceptor 以传输数据
 func gRPCServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
 return func(
 ctx context.Context,
 req interface{},
 info *grpc.UnaryServerInfo,
 handler grpc.UnaryHandler,
 ) (resp interface{}, err error) {
 // 从metadata中提取spanContext
 spanContext, err := ExtractSpanFromMetadata(ctx, tracer)
 if err != nil {
 log.Printf("Extract from metadata failed: %v", err)
 }
 // 创建server span
 serverSpan := tracer.StartSpan(
 info.FullMethod,
 ext.RPCServerOption(spanContext),
 ext.SpanKindRPCServer,
 opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
 )
 defer serverSpan.Finish()
 // 获取baggage,设置tag
 baggage := serverSpan.BaggageItem("baggage")
 serverSpan.SetTag("baggage", baggage)
 // 创建新context
 newCtx := opentracing.ContextWithSpan(ctx, serverSpan)
 return handler(newCtx, req)
 }
 }