前言

本系列将对夜莺平台各个模块的主要逻辑代码进行介绍,方便大家进行二次开发,本篇是系列的第一篇,agent 和 transfer 模块代码解读。

首先贴下夜莺的项目地址和架构图,正在使用夜莺的读者欢迎给夜莺加一个star:

夜莺 v3.0 架构图

本节主要讲解监控系统部分的源代码主逻辑,本篇主要讲解 agenttransfer 模块。

Agent 模块解读

主要功能概述

首先看一下 agent 的 main 函数,可以看出来 agent 负责了三个主要的工作:

  1. 监控指标的采集
  2. 主机设备信息的收集
  3. 任务命令的执行
func main() {
    // ...
    
    if config.Config.Enable.Mon {
        monStart()
    }

    if config.Config.Enable.Job {
        jobStart()
    }

    if config.Config.Enable.Report {
        reportStart()
    }

    // ...
}

因为本节只讲监控,所以只介绍监控指标的采集能力实现,也就是 monStart() 部分,其他的放到其他章节来介绍。

monStart() 可以看出来,监控指标的采集主要分五部分:

  • 系统指标采集
  • 端口采集
  • 进程采集
  • 插件采集
  • 日志采集

下面逐个对这五个采集的实现进行介绍:

func monStart() {
    sys.Init(config.Config.Sys)
    stra.Init()

    // 系统指标采集
    funcs.BuildMappers()
    funcs.Collect()

    // 插件采集
    plugins.Detect()

    // 进程采集
    procs.Detect()

    // 端口采集
    ports.Detect()

    // 初始化缓存,用作保存 COUNTER 类型数据
    cache.Init()

    // 日志采集
    go worker.UpdateConfigsLoop()
    go worker.PusherStart()
    go worker.Zeroize()
}

1. 系统指标采集

首先看一下系统指标采集,如果想要增加系统指标采集项,可以从这里入手。

每一个 FuncsAndInterval 对象,都包含了一组采集函数和采集周期,我们可以自己实现一个采集系统指标函数,然后赋值给 FuncsAndInterval,再加入到 []FuncsAndInterval{} 中,即可扩充 agent 的基础指标采集能力

type FuncsAndInterval struct {
    Fs       []func() []*dataobj.MetricValue
    Interval int
}

2. 端口采集

夜莺 agent 要对机器上的那些端口进行探测采集是可以在 web 页面进行配置的,主要调度逻辑如下:

一个 for 循环,定期获取服务的采集配置,然后和本地的策略进行对比,没有的增加,多余的删除。agent 对获取到的采集策略在内存了一个备份,服务端不能正常工作,导致采集任务无法进行。

func Detect() {
    detect()
    go loopDetect()
}

func loopDetect() {
    for {
        time.Sleep(time.Second * 10)
        detect()
    }
}

func detect() {
    ps := stra.GetPortCollects()
    DelNoPortCollect(ps)
    AddNewPortCollect(ps)
}

// 端口采集主逻辑
func PortCollect(p *models.PortCollect) {
    value := 0
    if isListening(p.Port) {
        value = 1
    }

    item := core.GaugeValue("proc.port.listen", value, p.Tags)
    item.Step = int64(p.Step)
    item.Timestamp = time.Now().Unix()
    item.Endpoint = config.Endpoint
    core.Push([]*dataobj.MetricValue{item})
}

3. 进程采集

进程采集的调度逻辑和端口采集类似,不在赘述。

目前夜莺只采集了进程的个数、CPU、内存使用率,如果想增加进程的其他相关指标,可以修改下面的函数:

func ProcCollect(p *models.ProcCollect) {
    ps, err := process.Processes()
    if err != nil {
        logger.Error(err)
        return
    }
    
    var memUsedTotal uint64 = 0
    var memUtilTotal = 0.0
    var cpuUtilTotal = 0.0
    var items []*dataobj.MetricValue
    cnt := 0
    
    for _, procs := range ps {
        if isProc(procs, p.CollectMethod, p.Target) {
            cnt++
            procCache, exists := cache.ProcsCache.Get(procs.Pid)
            if !exists {
                cache.ProcsCache.Set(procs.Pid, procs)
                procCache = procs
            }
            
            mem, err := procCache.MemoryInfo()
            if err != nil {
                logger.Error(err)
                continue
            }
            memUsedTotal += mem.RSS
            
            memUtil, err := procCache.MemoryPercent()
            if err != nil {
                logger.Error(err)
                continue
            }
            memUtilTotal += float64(memUtil)
            
            cpuUtil, err := procCache.Percent(0)
            if err != nil {
                logger.Error(err)
                continue
            }
            cpuUtilTotal += cpuUtil
        }
    }
    
    // ...
}

4. 插件采集

插件采集的调度逻辑和端口采集也类似。

目前夜莺的插件是需要提前部署到执行插件的目标机器上的,如果想实现插件自动分发的逻辑的话,可以修改 ListPlugins() 的逻辑,从服务端提供一个下载插件的地址,然后 agent 从远端下载插件。

下面代码块是执行插件的实现,目前插件支持在 web 端配置命令行参数、环境变量、和 stdin,这里可以做一个功能升级,如果待执行的插件是一个脚本程序的话,可以改造 monapi 模块和下面的逻辑,支持在页面上直接编写脚本,然后 agent 直接执行,可以不需要再进行插件下发:

func PluginRun(plugin *Plugin) {
    // ...
    params := strings.Split(plugin.Params, " ")
    cmd := exec.Command(fpath, params...)
    cmd.Dir = filepath.Dir(fpath)
    var stdout bytes.Buffer

    cmd.Stdout = &stdout
    var stderr bytes.Buffer
    cmd.Stderr = &stderr

    if plugin.Stdin != "" {
        cmd.Stdin = bytes.NewReader([]byte(plugin.Stdin))
    }

    if plugin.Env != "" {
        envs := make(map[string]string)
        err := json.Unmarshal([]byte(plugin.Env), &envs)
        if err != nil {
            logger.Errorf("plugin:%+v %v", plugin, err)
            return
        }
        for k, v := range envs {
            cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
        }
    }

    err := cmd.Start()
    // ...
}

5. 日志采集

日志采集的调度逻辑和端口进程采集略有不同,因为代码是从滴滴开源的 falcon-log-agent 中集成过来的。

日志采集的实现思路是实时读取滚动的日志,使用正则表达式对日志进行匹配,然后把命中匹配的日志转换为时序数据,主要通过 worker 对象的 producer 方法来实现。

目前日志采集还遗留一个优化项,将最后匹配到的日志,放到监控数据的 Extra 字段中,这样如果触发的告警事件,最后一条错误日志可以很方便的可以看到,加快问题的定位。

采集策略下发逻辑

最后再补充下采集策略的下发逻辑,agent 要执行哪些采集策略,是从 monapi 获取的,用户在 web 界面添加了采集策略是和服务树节点相关的,并没有机器之间关联,所有 monapi 会对采集策略进行一次处理,根据服务树节点找到下面的机器,然后将采集策略和机器关联起来,放到一个 map 中,等待 agent 来获取。

如果想知道某台机器被分配了哪些采集策略的话,可以通过下面接口查询到:

curl http://monapi-ip/api/mon/collects/endpoint_ip

如果想知道某台机器的 agent 实际获取了哪些采集策略的话,可以通过 agent 提供的 http 接口查询到:

curl http://127.0.0.1:2080/api/collector/stra

agent 处理采集监控指标之后,还可以接受监控数据的上报,提供了监控数据上报的 http 接口,agent 获取的监控数据之后,下一步是把数据上报给服务端,即是下面我们要讲解的 transfer 模块。

Transfer 模块解读

主要功能概述

transfer 主要负责的工作有两个:

  1. 监控数据接收
  2. 监控数据查询

数据处理流程

transfer 接收到监控数据之后,会对监控数据进行有效性校验,函数是 CheckValidity()。如果想对监控数据进行改造的话,可以通过修改 CheckValidity() 函数来完成。

transfer 接收到数据之后,会把数据放到自己的内存队列中,然后由另一个任务实时的从队列消费数据再发给不同的后端:

// send to judge
backend.Push2JudgeQueue(items)

if aggr.AggrConfig.Enabled {
    go aggr.SendToAggr(items)
}

// send to push endpoints
pushEndpoints, err := backend.GetPushEndpoints()
if err != nil {
    logger.Errorf("could not find pushendpoint")
    return err
} else {
    for _, pushendpoint := range pushEndpoints {
        pushendpoint.Push2Queue(items)
    }
}
// ...

数据分发机制

夜莺的默认后端是 tsdb 模块,transfer 在将监控数据分发给 tsdb 的时候,使用了一致性哈希算法,保证了同一条曲线的监控数据,每次都可以转发到相同的后端,同时也负责了监控数据查询的工作,因为知道数据发给谁,所以也知道去哪里获取数据。

后端存储扩展

transfer 目前可以对接多种后端存储,如果已支持的存储不满足大家的需求的,可以根据定义好的 DataSource interface,开发支持自己的存储模块,只要实现以下方法即可:

type DataSource interface {
    PushEndpoint

    // query data for judge
    QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
    // query data for ui
    QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse

    // query metrics & tags
    QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp
    QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp
    QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp
    QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp

    // tsdb instance
    GetInstance(metric, endpoint string, tags map[string]string) []string
}

总结

本篇讲解到此结束,下篇将为大家带来告警链路模块的代码解读。