Agent源码目录结构
1 2 3 4 5 6 7 8 9 10 11 12 13
| . ├── cfg.example.json // 配置文件示例 ├── control ├── cron // 定时任务 ├── funcs // 各种采集指标函数 ├── g // 配置文件、日志、rpc等全局设置及初始化 ├── http // HTTP API ├── LICENSE ├── main.go // 入口函数 ├── NOTICE ├── plugins // 插件 ├── public // html、css、js文件 └── README.md
|
main函数
命令行参数解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| cfg := flag.String("c", "cfg.json", "configuration file") version := flag.Bool("v", false, "show version") check := flag.Bool("check", false, "check collector")
flag.Parse()
if *version { fmt.Println(g.VERSION) os.Exit(0) }
if *check { funcs.CheckCollector() os.Exit(0) }
|
funcs.CheckCollector()
函数主要功能为检测各个采集模块功能是否正常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| func CheckCollector() {
output := make(map[string]bool)
_, procStatErr := nux.CurrentProcStat() _, listDiskErr := nux.ListDiskStats() ports, listeningPortsErr := nux.ListeningPorts() procs, psErr := nux.AllProcs()
_, duErr := sys.CmdOut("du", "--help")
output["kernel "] = len(KernelMetrics()) > 0 output["df.bytes"] = DeviceMetricsCheck() output["net.if "] = len(CoreNetMetrics([]string{})) > 0 output["loadavg "] = len(LoadAvgMetrics()) > 0 output["cpustat "] = procStatErr == nil output["disk.io "] = listDiskErr == nil output["memory "] = len(MemMetrics()) > 0 output["netstat "] = len(NetstatMetrics()) > 0 output["ss -s "] = len(SocketStatSummaryMetrics()) > 0 output["ss -tln "] = listeningPortsErr == nil && len(ports) > 0 output["ps aux "] = psErr == nil && len(procs) > 0 output["du -bs "] = duErr == nil
for k, v := range output { status := "fail" if v { status = "ok" } fmt.Println(k, "...", status) } }
|
配置文件解析
g.ParseConfig(*cfg)
解析通过命令行-c
参数传入的配置文件
通过调用g.Config()
函数获取package级变量config *GlobalConfig
1
| `g.Config()` ---> `config`(`config *GobalConfig`) ---> `config = &c` ---> `json.Unmarshal([]byte(configContent), &c)` ---> `configContent, err := file.ToTrimString(cfg)`
|
通过g.Config().XXX
即可获取-c
指定配置文件的具体配置项目
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func ParseConfig(cfg string) { ... configContent, err := file.ToTrimString(cfg) if err != nil { log.Fatalln("read config file:", cfg, "fail:", err) }
var c GlobalConfig err = json.Unmarshal([]byte(configContent), &c) if err != nil { log.Fatalln("parse config file:", cfg, "fail:", err) }
lock.Lock() defer lock.Unlock()
config = &c }
|
1 2 3 4 5 6 7
|
func Config() *GlobalConfig { lock.RLock() defer lock.RUnlock() return config }
|
g初始化
1 2 3 4 5 6
| g.InitRootDir()
g.InitLocalIp()
g.InitRpcClients()
|
1 2 3 4 5 6
| type SingleConnRpcClient struct { sync.Mutex rpcClient *rpc.Client RpcServer string Timeout time.Duration }
|
监控项采集mapping
funcs.BuildMappers()
BuildMappers()
函数的作用是构建采集各种指标的函数,而这些函数的返回值统一都是[]*model.MetricValue
,model.MetricValue
结构体为:
1 2 3 4 5 6 7 8 9
| type MetricValue struct { Endpoint string `json:"endpoint"` Metric string `json:"metric"` Value interface{} `json:"value"` Step int64 `json:"step"` Type string `json:"counterType"` Tags string `json:"tags"` Timestamp int64 `json:"timestamp"` }
|
其中Type
是指RRD中的DST(data source type),共计5种:
Counter
: 递增类型(必须增加) (Open-Falcon常用类型)
Derive
: 可增可减类型
Absolute
: 每次都假定前一个interval的值是0,再计算平均值
Gauge
: 直接将值存入RRD,不做任何处理 (Open-Falcon常用类型)
Compute
: 不接受输入,直接定义表达式,引用其他RRD中的DS自动计算出某个值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| type FuncsAndInterval struct { Fs []func() []*model.MetricValue Interval int }
var Mappers []FuncsAndInterval
func BuildMappers() { interval := g.Config().Transfer.Interval Mappers = []FuncsAndInterval{ { Fs: []func() []*model.MetricValue{ AgentMetrics, CpuMetrics, NetMetrics, KernelMetrics, LoadAvgMetrics, MemMetrics, DiskIOMetrics, IOStatsMetrics, NetstatMetrics, ProcMetrics, UdpMetrics, }, Interval: interval, }, ...
|
以LoadAvgMetrics
为例
1
| 获取指标值 ---> GaugeValue()|CounterValue() ---> NewMetricValue() 创建*model.MetricValue
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func LoadAvgMetrics() []*model.MetricValue { load, err := nux.LoadAvg() if err != nil { log.Println(err) return nil }
return []*model.MetricValue{ GaugeValue("load.1min", load.Avg1min), GaugeValue("load.5min", load.Avg5min), GaugeValue("load.15min", load.Avg15min), }
}
|
1 2 3
| func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue { return NewMetricValue(metric, val, "GAUGE", tags...) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue { mv := model.MetricValue{ Metric: metric, Value: val, Type: dataType, }
size := len(tags)
if size > 0 { mv.Tags = strings.Join(tags, ",") }
return &mv }
|
最终LoadAvgMetrics()
函数返回的数据样例如下:
1
| [<Endpoint:, Metric:load.1min, Type:GAUGE, Tags:, Step:0, Time:0, Value:0.32> <Endpoint:, Metric:load.5min, Type:GAUGE, Tags:, Step:0, Time:0, Value:0.26> <Endpoint:, Metric:load.15min, Type:GAUGE, Tags:, Step:0, Time:0, Value:0.28>]
|
定时任务
cron.InitDataHistory()
cron.InitDataHistory()
作用是周期性更新CPU和Disk状态信息,保留上次采集及此次更新的数据,总共会保留2份数据
1 2 3 4 5 6 7
| func InitDataHistory() { for { funcs.UpdateCpuStat() funcs.UpdateDiskStats() time.Sleep(g.COLLECT_INTERVAL) } }
|
cron.ReportAgentStatus()
cron.ReportAgentStatus()
作用是每隔Heartbeat.Interval(配置文件中设定)
秒通过RPC方式向HBS上报agent信息,上报数据为model.AgentReportRequest
上报逻辑为
1
| cron.ReportAgentStatus() ---> go reportAgentStatus() ---> g.HbsClient.Call() ---> serverConn() & rcpClient.Call()
|
1 2 3 4 5 6 7
| type AgentReportRequest struct { Hostname string IP string AgentVersion string PluginVersion string }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| req := model.AgentReportRequest{ Hostname: hostname, IP: g.IP(), AgentVersion: g.VERSION, PluginVersion: g.GetCurrPluginVersion(), }
var resp model.SimpleRpcResponse err = g.HbsClient.Call("Agent.ReportStatus", req, &resp) if err != nil || resp.Code != 0 { log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp) }
|
cron.SyncMinePlugins()
每隔Heartbeat.Interval
秒以RPC方式向HBS请求并同步plugin信息,执行删除无用plugin,添加新plugin等同步操作。所有plugin信息都存储在packae级变量Plugins
(map[string]*Plugin
)中
请求体为model.AgentHeartbeatRequest
结构体,响应体为model.AgentPluginsResponse
结构体
1 2 3 4 5 6
|
type AgentHeartbeatRequest struct { Hostname string Checksum string }
|
1 2 3 4 5
| type AgentPluginsResponse struct { Plugins []string Timestamp int64 }
|
1 2 3 4 5 6
| type Plugin struct { FilePath string MTime int64 Cycle int }
|
cron.SyncBuiltinMetrics()
每隔Heartbeat.Interval
秒以RPC方式g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)
周期性从HBS同步监控端口、URL、进程及du路径信息到agent
cron.SyncTrustableIps()
每隔Heartbeat.Interval
秒以RPC方式g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
周期性从HBS同步信任IP列表到agent
cron.Collect()
每隔Transfer.Interval
秒间隔执行由funcs.BuildMappers()
构建Mappers
变量中的所有采集函数Fs
,先获得监控项目的值,并为每个监控项数据(MetricValue
)加上Step
、Endpoint
(hostname
)和Timestamp
(time.Now().Unix()
)。最后调用g.SendToTransfer()
将所有监控数据上传到Transfer
1 2 3 4 5 6 7
| func Collect() { ... for _, v := range funcs.Mappers { go collect(int64(v.Interval), v.Fs) } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| func collect(sec int64, fns []func() []*model.MetricValue) { t := time.NewTicker(time.Second * time.Duration(sec)) defer t.Stop() for { <-t.C
hostname, err := g.Hostname() if err != nil { continue }
mvs := []*model.MetricValue{} ignoreMetrics := g.Config().IgnoreMetrics
for _, fn := range fns { items := fn() if items == nil { continue }
if len(items) == 0 { continue }
for _, mv := range items { if b, ok := ignoreMetrics[mv.Metric]; ok && b { continue } else { mvs = append(mvs, mv) } } }
now := time.Now().Unix() for j := 0; j < len(mvs); j++ { mvs[j].Step = sec mvs[j].Endpoint = hostname mvs[j].Timestamp = now }
g.SendToTransfer(mvs)
} }
|
API & Dashboard
监听端口、启动API服务和Dashboard服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
func init() { configAdminRoutes() configCpuRoutes() configDfRoutes() configHealthRoutes() configIoStatRoutes() configKernelRoutes() configMemoryRoutes() configPageRoutes() configPluginRoutes() configPushRoutes() configRunRoutes() configSystemRoutes() }
func Start() { if !g.Config().Http.Enabled { return }
addr := g.Config().Http.Listen if addr == "" { return }
s := &http.Server{ Addr: addr, MaxHeaderBytes: 1 << 30, }
log.Println("listening", addr) log.Fatalln(s.ListenAndServe()) }
|