目录
  1. 1. Agent源码目录结构
  2. 2. main函数
    1. 2.1. 命令行参数解析
      1. 2.1.1. funcs.CheckCollector()
    2. 2.2. 配置文件解析
      1. 2.2.1. g.ParseConfig(*cfg)
      2. 2.2.2. g初始化
    3. 2.3. 监控项采集mapping
      1. 2.3.1. funcs.BuildMappers()
    4. 2.4. 定时任务
      1. 2.4.1. cron.InitDataHistory()
      2. 2.4.2. cron.ReportAgentStatus()
      3. 2.4.3. cron.SyncMinePlugins()
      4. 2.4.4. cron.SyncBuiltinMetrics()
      5. 2.4.5. cron.SyncTrustableIps()
      6. 2.4.6. cron.Collect()
    5. 2.5. API & Dashboard

Agent源码目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
.
├── cfg.example.json // 配置文件示例
├── control
├── cron // 定时任务
├── funcs // 各种采集指标函数
├── g // 配置文件、日志、rpc等全局设置及初始化
├── http // HTTP API
├── LICENSE
├── main.go // 入口函数
├── NOTICE
├── plugins // 插件
├── public // html、css、js文件
└── README.md

main函数

命令行参数解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 命令行参数解析
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
check := flag.Bool("check", false, "check collector")

flag.Parse()

// 是否打印agent版本信息
if *version {
fmt.Println(g.VERSION) // g/const.go VERSION
os.Exit(0)
}

// 是否检查采集器
if *check {
funcs.CheckCollector() // funcs/checker.go
os.Exit(0)
}

funcs.CheckCollector()

函数主要功能为检测各个采集模块功能是否正常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func CheckCollector() {

// 创建并初始化存放检测结果mapping
output := make(map[string]bool)

// toolkits/nux/cpustat.go
// 检测CPU状态,/proc/stat
_, procStatErr := nux.CurrentProcStat()
// toolkits/nux/iostat.go
// 检测磁盘状态,/proc/diskstats
_, listDiskErr := nux.ListDiskStats()
// toolkits/nux/prostat.go
// 获取系统所有TCP监听端口,ss -t -n -l
ports, listeningPortsErr := nux.ListeningPorts()
// toolkits/nux/proc.go
// 获取所有进程,/proc/<pid>/status,/proc/<pid>cmdline
procs, psErr := nux.AllProcs()

_, duErr := sys.CmdOut("du", "--help")

// 大部分调用funcs包下模块对各采集项进行检测
output["kernel "] = len(KernelMetrics()) > 0
output["df.bytes"] = DeviceMetricsCheck()
output["net.if "] = len(CoreNetMetrics([]string{})) > 0
output["loadavg "] = len(LoadAvgMetrics()) > 0
output["cpustat "] = procStatErr == nil
output["disk.io "] = listDiskErr == nil
output["memory "] = len(MemMetrics()) > 0
output["netstat "] = len(NetstatMetrics()) > 0
output["ss -s "] = len(SocketStatSummaryMetrics()) > 0
output["ss -tln "] = listeningPortsErr == nil && len(ports) > 0
output["ps aux "] = psErr == nil && len(procs) > 0
output["du -bs "] = duErr == nil

// 打印检测结果
for k, v := range output {
status := "fail"
if v {
status = "ok"
}
fmt.Println(k, "...", status)
}
}

配置文件解析

g.ParseConfig(*cfg)

解析通过命令行-c参数传入的配置文件

通过调用g.Config()函数获取package级变量config *GlobalConfig

1
`g.Config()` ---> `config`(`config *GobalConfig`) ---> `config = &c` ---> `json.Unmarshal([]byte(configContent), &c)` ---> `configContent, err := file.ToTrimString(cfg)`

通过g.Config().XXX即可获取-c指定配置文件的具体配置项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// g/cfg.go
func ParseConfig(cfg string) {
...
// toolkits/file/reader.go
// ToTrimString()/strings.TrimSpace() ---> ToString() ---> ioutil.ReadFile()
configContent, err := file.ToTrimString(cfg)
if err != nil {
log.Fatalln("read config file:", cfg, "fail:", err)
}

// agent配置文件结构体
// GlobalConfig嵌套了Plugin、Heartbeat、Transfer、Http和Collector结构体,结合cfg.example.json模板配置文件看
// json ---> struct
var c GlobalConfig
err = json.Unmarshal([]byte(configContent), &c)
if err != nil {
log.Fatalln("parse config file:", cfg, "fail:", err)
}

// package级变量
// lock = new(sync.RWMutex)
lock.Lock()
defer lock.Unlock()

// package级变量
// config *GlobalConfig
config = &c
}
1
2
3
4
5
6
7
// g/cfg.go
// 通过g.Config()函数获取到存放配置文件内容的*GlobalConfig指针结构体变量config
func Config() *GlobalConfig {
lock.RLock()
defer lock.RUnlock()
return config
}

g初始化

1
2
3
4
5
6
// 使用os.Getwd()获取当前目录路径
g.InitRootDir()
// 通过net包向Heartbeat server服务发起TCP连接获取本地agent机器IP
g.InitLocalIp()
// 构造SingleConnRpcClient结构体对象,设置Heartbeat server服务端IP:Port及连接超时时间
g.InitRpcClients()
1
2
3
4
5
6
type SingleConnRpcClient struct {
sync.Mutex
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
}

监控项采集mapping

funcs.BuildMappers()

BuildMappers()函数的作用是构建采集各种指标的函数,而这些函数的返回值统一都是[]*model.MetricValuemodel.MetricValue结构体为:

1
2
3
4
5
6
7
8
9
type MetricValue struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"` // 监控项名称
Value interface{} `json:"value"` // 监控项值
Step int64 `json:"step"` // 采集间隔
Type string `json:"counterType"` //RRD数据类型: Counter、Derive、Absolute、Gauge、Compute
Tags string `json:"tags"` // 监控项标签
Timestamp int64 `json:"timestamp"`
}

其中Type是指RRD中的DST(data source type),共计5种:

  • Counter: 递增类型(必须增加) (Open-Falcon常用类型)
  • Derive: 可增可减类型
  • Absolute: 每次都假定前一个interval的值是0,再计算平均值
  • Gauge: 直接将值存入RRD,不做任何处理 (Open-Falcon常用类型)
  • Compute: 不接受输入,直接定义表达式,引用其他RRD中的DS自动计算出某个值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// funcs/funcs.go
type FuncsAndInterval struct {
Fs []func() []*model.MetricValue // 一组生成MetricValue的函数
Interval int // 上传数据到Transfer的时间间隔
}

var Mappers []FuncsAndInterval

func BuildMappers() {
interval := g.Config().Transfer.Interval // 获取配置文件中上传数据到Transfer的时间间隔
Mappers = []FuncsAndInterval{
{
Fs: []func() []*model.MetricValue{
AgentMetrics,
CpuMetrics,
NetMetrics,
KernelMetrics,
LoadAvgMetrics,
MemMetrics,
DiskIOMetrics,
IOStatsMetrics,
NetstatMetrics,
ProcMetrics,
UdpMetrics,
},
Interval: interval,
},
...

LoadAvgMetrics为例

1
获取指标值 ---> GaugeValue()|CounterValue() ---> NewMetricValue() 创建*model.MetricValue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func LoadAvgMetrics() []*model.MetricValue {
load, err := nux.LoadAvg() // 获取指标值
if err != nil {
log.Println(err)
return nil
}

return []*model.MetricValue{
GaugeValue("load.1min", load.Avg1min), // 调用GaugeValue()函数,实际构建*medel.MetricValue数据的是NewMetricValue()函数
GaugeValue("load.5min", load.Avg5min),
GaugeValue("load.15min", load.Avg15min),
}

}
1
2
3
func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue {
return NewMetricValue(metric, val, "GAUGE", tags...) // 调用NewMetricValue()构建*model.MetricValue数据
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue {
// 根据传入参数构建model.MetricValue数据
mv := model.MetricValue{
Metric: metric, // 监控项名称,load.1min/load.5min/load.15min
Value: val, // 监控项值
Type: dataType, // 数据类型,Gauge/Counter,Open-Falcon Agent只用到这2种DST
}

size := len(tags)

if size > 0 {
mv.Tags = strings.Join(tags, ",") // tag有值则加入
}

return &mv
}

最终LoadAvgMetrics()函数返回的数据样例如下:

1
[<Endpoint:, Metric:load.1min, Type:GAUGE, Tags:, Step:0, Time:0, Value:0.32> <Endpoint:, Metric:load.5min, Type:GAUGE, Tags:, Step:0, Time:0, Value:0.26> <Endpoint:, Metric:load.15min, Type:GAUGE, Tags:, Step:0, Time:0, Value:0.28>]

定时任务

cron.InitDataHistory()

cron.InitDataHistory()作用是周期性更新CPU和Disk状态信息,保留上次采集及此次更新的数据,总共会保留2份数据

1
2
3
4
5
6
7
func InitDataHistory() {
for {
funcs.UpdateCpuStat() // 更新CPU状态
funcs.UpdateDiskStats() // 更新磁盘状态
time.Sleep(g.COLLECT_INTERVAL) // 默认1秒
}
}

cron.ReportAgentStatus()

cron.ReportAgentStatus()作用是每隔Heartbeat.Interval(配置文件中设定)秒通过RPC方式向HBS上报agent信息,上报数据为model.AgentReportRequest

上报逻辑为

1
cron.ReportAgentStatus() ---> go reportAgentStatus() ---> g.HbsClient.Call() ---> serverConn() & rcpClient.Call()
1
2
3
4
5
6
7
// open-falcon/falcon-plus/common/model/agent.go
type AgentReportRequest struct {
Hostname string
IP string
AgentVersion string
PluginVersion string
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// cron/reporter.go
// 上报信息
req := model.AgentReportRequest{
Hostname: hostname,
IP: g.IP(),
AgentVersion: g.VERSION,
PluginVersion: g.GetCurrPluginVersion(),
}

// RPC请求返回值
var resp model.SimpleRpcResponse
// RPC方式请求上报agent信息
// g.HbsClient 为之前g.InitRpcClient()中初始化变量,var HbsClient *SingleConnRpcClient
// Call()为结构体SingleConnRpcClient方法
// func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {...}
err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
if err != nil || resp.Code != 0 {
log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
}

cron.SyncMinePlugins()

每隔Heartbeat.Interval秒以RPC方式向HBS请求并同步plugin信息,执行删除无用plugin,添加新plugin等同步操作。所有plugin信息都存储在packae级变量Plugins(map[string]*Plugin)中

请求体为model.AgentHeartbeatRequest结构体,响应体为model.AgentPluginsResponse结构体

1
2
3
4
5
6
// open-falcon/falcon-plus/common/model/agent.go
// 请求体数据结构
type AgentHeartbeatRequest struct {
Hostname string // agent主机名
Checksum string
}
1
2
3
4
5
// 响应体数据结构
type AgentPluginsResponse struct {
Plugins []string
Timestamp int64
}
1
2
3
4
5
6
// 每个Plugin数据结构
type Plugin struct {
FilePath string
MTime int64
Cycle int
}

cron.SyncBuiltinMetrics()

每隔Heartbeat.Interval秒以RPC方式g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)周期性从HBS同步监控端口、URL、进程及du路径信息到agent

cron.SyncTrustableIps()

每隔Heartbeat.Interval秒以RPC方式g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)周期性从HBS同步信任IP列表到agent

cron.Collect()

每隔Transfer.Interval秒间隔执行由funcs.BuildMappers()构建Mappers变量中的所有采集函数Fs,先获得监控项目的值,并为每个监控项数据(MetricValue)加上StepEndpoint(hostname)和Timestamp(time.Now().Unix())。最后调用g.SendToTransfer()将所有监控数据上传到Transfer

1
2
3
4
5
6
7
func Collect() {
...

for _, v := range funcs.Mappers {
go collect(int64(v.Interval), v.Fs) // 创建一个goroutine执行指标采集函数(`funcs.BuildMappers()`)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec)) // 每隔sec秒(v.Interval)执行采集函数并上报transfer
defer t.Stop()
for {
<-t.C

hostname, err := g.Hostname()
if err != nil {
continue
}

mvs := []*model.MetricValue{} // 最终存储metric指标的slice
ignoreMetrics := g.Config().IgnoreMetrics

for _, fn := range fns { // 迭代所有采集函数,汇总所有指标到mvs
items := fn() // 执行指标采集函数,获得指标
if items == nil {
continue
}

if len(items) == 0 {
continue
}

for _, mv := range items { //检测指标,若没被忽略,则汇总到mvs中
if b, ok := ignoreMetrics[mv.Metric]; ok && b {
continue
} else {
mvs = append(mvs, mv)
}
}
}

now := time.Now().Unix()
for j := 0; j < len(mvs); j++ { // 为每个metric设置Step、Endpoint和Timestamp
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}

g.SendToTransfer(mvs) // 将所有指标上报到Transfer

}
}

API & Dashboard

1
go http.Start()

监听端口、启动API服务和Dashboard服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// http/http.go

// 加载路由函数
func init() {
configAdminRoutes()
configCpuRoutes()
configDfRoutes()
configHealthRoutes()
configIoStatRoutes()
configKernelRoutes()
configMemoryRoutes()
configPageRoutes()
configPluginRoutes()
configPushRoutes()
configRunRoutes()
configSystemRoutes()
}

func Start() {
if !g.Config().Http.Enabled {
return
}

addr := g.Config().Http.Listen
if addr == "" {
return
}

s := &http.Server{ // 设置监听端口
Addr: addr,
MaxHeaderBytes: 1 << 30,
}

log.Println("listening", addr)
log.Fatalln(s.ListenAndServe()) // 启动http服务
}

Powered: Hexo, Theme: Nadya remastered from NadyMain