2016年7月14日 工程

创建一个你自己的 Beat

作者 Jongmin Kim (KR)Monica Sarbu

Beats 是一个用来构建轻量级数据汇集的开源平台,可用于将各种类型的数据发送至Elasticsearch 与 Logstash。其中,Packetbeat 用于监控局域网内服务器之间的网络流量信息,Filebeat 收集服务器上的日志信息,还有新推出的 Metricbeat 可以定期获取外部系统的监控指标信息。除此以外,你还可以非常方便的基于 libbeat 框架来构建你属于自己的专属 Beat,目前 Beats 社区已经有超过25个 Community Beats 了。

我们还提供一个 Beat generator(Beat 生成器)来帮你快速构建属于你自己的 Beat。通过这篇博客你将会看到如何通过 Beat 生成器来快速创建一个你自己的 Beat。今天我们创建的是一个叫做 lsbeat 的 Beat,lsbeat 非常类似 Unix 系统下的命令行 ls,我们用 lsbeat 来索引目录和文件信息。本篇文章环境基于 Unix 系统,如果你是 Windows 或是其它系统,相关操作可能需要根据实际情况进行调整。

第一步 - 配置 Golang 环境

Beats 是用 Golang写的,显然,要创建和开发一个 beat,Golang 环境必不可少。关于这方面的文章很多,建议查看这篇 Golang 的安装向导:install Golang。当前 Beats 需要的最低版本是 Golang 1.6。另外请确保正确设置了你的 $GOPATH 环境变量。

还有 Golang Glide 会被进行包用来依赖管理,所以也需要确保正确安装,最低版本是 Glide 0.10.0,安装说明点这里

让我们先来看看 lsbeat 将会用到的一段代码吧.这是一个简单的 golang 程序,通过命令行接收一个目录参数,然后列出该目录下的文件和子目录信息。

package main
import (
    "fmt"
    "io/ioutil"
    "os"
)
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}

后面我们将使用到这段代码和 listDir 函数。

第二步 - 生成项目

要生成一个你自己的 Beat,就要用到 beat-generator 了,首先你必须安装 cookiecutter。安装的详细说明看 这里。安装好 cookiecutter 之后,我们要给自己的 Beat 起一个好听的名字,最好是小写的英文字母,我们今天这个例子就叫 lsbeat 吧。

生成项目模板之前,我们需要下载 Beats generator 包文件,就在 beats 仓库。安装好 Golang之后,你就可以很方便的使用 go get 命令来下载 Beats generator 包文件了。 当你执行下面的这个命令后,所有的源码文件都会下载到 $GOPATH/src 目录。

$ go get github.com/elastic/beats

在 GOPATH 下创建一个以你自己github账号名称命名的目录,并切换过去,然后执行 cookiecutter 命令并指向 Beat Generator 源码路径。

$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat

Cookiecutter 接下来会问你几个问题,比如项目名称,我们输入:lsbeat;你的 github 用户名,输入你自己的 github 账户;还有两个关于beat和beat_path应该会自动识别,默认回车就好;最后的问题,你可以输入你的姓和名。

project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}

现在应该已经创建好了一个名为 lsbeat 的目录,并且里面应该会生成一些文件,让我们一起来看一下吧,结构如下:

$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py

我们刚刚已经生成好了一个原始的 Beat 模板了,但是你还需要获取相关的依赖和设置好 git 仓库。

首先,你需要拉取依赖的相关包信息。我们的这个例子是 lsbeat,我们先做一些基本的配置,回头再看看详细看看其它的模板和配置文件,只需要执行 make setup 就可以自动获取依赖。

$ make setup

当你创建好了自己的 Beat 之后,记得上传到 github 仓库,并和社区进行分享哦,如下:

Screenshot

要 push lsbeat 到你的 git 仓库,只需要执行如下命令:

$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master

恭喜你,现在你已经完成了一个 Beat ,并且发布了第一个版本到了 Github,不过里面还没有什么具体内容,现在让我们进一步看看里面的代码吧。

第四步 - 配置

执行过上面一系列命令之后,项目里将会自动创建名为 lsbeat.ymllsbeat.template.json 的配置文件。所有的基本配置项都已经生成在了里面。

lsbeat.yml:

lsbeat:
  # Defines how often an event is sent to the output
  period: 1s

Period 参数包含在每一个生成的 Beats 里面,它表示 lsbeat 将会每隔 1 秒钟轮询一次。这里我们修改 period 时间间隔为 10 秒,还可以在修改 etc/ 目录下面的 beat.yml 文件。所以这里新增一个 path 参数表示我们具体要监听哪个目录。

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."

参数添加好了之后,我们只需要运行 make update 命令就能让这些修改应用到配置文件 lsbeat.yml。

$ make update
$ cat lsbeat.yml
################### Lsbeat Configuration Example #########################
############################# Lsbeat ######################################
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################

修改完 yml 文件,记得修改 config/config.go 文件,添加一个 path 参数。

package config
import "time"
type Config struct {
    Period time.Duration `config:"period"`
    Path   string        `config:"path"`
}
var DefaultConfig = Config{
    Period: 10 * time.Second,
    Path:   ".",
}

同时我们修改 period 默认时间间隔为 10 秒,默认监听的是当前目录 (.) 。

第五步 - 添加代码

每一个 Beat 都需要实现Beater 接口,里面定义了 Run() 和 Stop() 函数。我们可以定义一个名为 Lsbeat 的结构体,然后用这个对象实现 Beater 接口,再添加字段 lastIndexTime 来保存最后运行的时间戳信息。

type Lsbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
    lastIndexTime time.Time
}

另外,每个 Beat 还需要实现 New() 方法来接收 Beat 配置信息和返回 Lsbeat 的具体实例。

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }
    ls := &Lsbeat{
        done:   make(chan struct{}),
        config: config,
    }
    return ls, nil
}

在我们的 lsbeat 例子中,我们要做的就是扩展默认的 Run() 函数来导出指定目录的文件和子目录信息。

在修改 Run() 函数之前,我们先在 lsbeat.go 增加 listDir() 函数。这就是我们前面最开始测试的那段代码,把收集目录和文件信息的简单例子稍微修改一下。另外我们还要生成以下字段信息:

  • "@timestamp": common.Time(time.Now())
  • "type": beatname
  • "modtime": common.Time(t)
  • "filename": f.Name()
  • "path": dirFile + "/" + f.Name()
  • "directory": f.IsDir()
  • "filesize": f.Size()

第一次运行的时候,我们将索引所有的文件和目录信息,然后我们定期检查是否有新文件被创建或者修改,再索引这些新创建的文件和目录。每次定期检查的时间戳都会保存在 lasIndexTime 变量,完整代码如下:

func (bt *Lsbeat) listDir(dirFile string, beatname string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        path := filepath.Join(dirFile, f.Name())
        if t.After(bt.lastIndexTime) {
            event := common.MapStr{
                "@timestamp": common.Time(time.Now()),
                "type":       beatname,
                "modtime":    common.Time(t),
                "filename":   f.Name(),
                "path":       path,
                "directory":  f.IsDir(),
                "filesize":   f.Size(),
            }
            bt.client.PublishEvent(event)
        }
        if f.IsDir() {
            bt.listDir(path, beatname)
        }
    }
}

记住在最开始需要导入 "io/ioutil" 包。

import (
    "fmt"
    "io/ioutil"
    "time"
)

现在,让我们看看如何在 Run() 函数里面调用 listDir() 函数,并且保存时间戳到 lasIndexTime 变量。

func (bt *Lsbeat) Run(b *beat.Beat) error {
    logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    for {
        now := time.Now()
        bt.listDir(bt.config.Path, b.Name) // call listDir
        bt.lastIndexTime = now             // mark Timestamp
        logp.Info("Event sent")
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }
    }
}

函数 Stop() 用来中断 run 的循环执行,保持默认生成的就行。

func (bt *Lsbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

到这里,编码部分基本就完成了。我们接下来添加新字段到 mapping 中,修改文件 etc/fields.yml。

- key: lsbeat
  title: LS Beat
  description: 
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

重新应用新的配置。

$ make update

字段 file_name 将使用 nGram 分词,我们还需要在文件 lsbeat.template.json 的 "settings" 节点添加一个自定义的 analyzer。

{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

第六步 - 编译和运行

现在我们可以进行编译和运行,只需要执行 make 命令就可以编译出可执行文件 lsbeat (lsbeat.exe on windows) 。

$ make

修改 lsbeat.yml 文件,设置需要监听的目录,如:"/Users/ec2-user/go",记住是全路径。

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"

同时确保 Elasticsearch 和 Kibana 正常运行。现在就运行一下 lsbeat 命令:

$ ./lsbeat

打开Kibana,通过调用 _cat 接口检视索引是不是创建了。

cat api

可以看到创建了一个名为 lsbeat-2016.06.03 的索引,并且已经包含了一些文档。现在对 filename 字段查询一下,由于使用的是 nGram 分词,使支持模糊匹配,因此需要使用 lsbeat 关键字搜索。

query.png

大功告成! 恭喜你,你已经完成了第一个属于你自己的 beat。