创建一个你自己的 Beat
Beats 是一个用来构建轻量级数据汇集的开源平台,可用于将各种类型的数据发送至Elasticsearch 与 Logstash。其中,Packetbeat 用于监控局域网内服务器之间的网络流量信息,Filebeat 收集服务器上的日志信息,还有新推出的 Metricbeat 可以定期获取外部系统的监控指标信息。除此以外,你还可以非常方便的基于 libbeat 框架来构建你属于自己的专属 Beat,目前 Beats 社区已经有超过25个 Community Beats 了。
我们还提供一个 Beat generator(Beat 生成器)来帮你快速构建属于你自己的 Beat。通过这篇博客你将会看到如何通过 Beat 生成器来快速创建一个你自己的 Beat。今天我们创建的是一个叫做 lsbeat 的 Beat,lsbeat 非常类似 Unix 系统下的命令行 ls,我们用 lsbeat 来索引目录和文件信息。本篇文章环境基于 Unix 系统,如果你是 Windows 或是其它系统,相关操作可能需要根据实际情况进行调整。
第一步 - 配置 Golang 环境
Beats 是用 Golang写的,显然,要创建和开发一个 beat,Golang 环境必不可少。关于这方面的文章很多,建议查看这篇 Golang 的安装向导:install Golang。当前 Beats 需要的最低版本是 Golang 1.6。另外请确保正确设置了你的 $GOPATH 环境变量。
还有 Golang Glide 会被进行包用来依赖管理,所以也需要确保正确安装,最低版本是 Glide 0.10.0,安装说明点这里。
让我们先来看看 lsbeat 将会用到的一段代码吧.这是一个简单的 golang 程序,通过命令行接收一个目录参数,然后列出该目录下的文件和子目录信息。
package main
import (
"fmt"
"io/ioutil"
"os"
)
func main() {
//apply run path "." without argument.
if len(os.Args) == 1 {
listDir(".")
} else {
listDir(os.Args[1])
}
}
func listDir(dirFile string) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()
fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
if f.IsDir() {
listDir(dirFile + "/" + f.Name())
}
}
}
后面我们将使用到这段代码和 listDir 函数。
第二步 - 生成项目
要生成一个你自己的 Beat,就要用到 beat-generator 了,首先你必须安装 cookiecutter。安装的详细说明看 这里。安装好 cookiecutter 之后,我们要给自己的 Beat 起一个好听的名字,最好是小写的英文字母,我们今天这个例子就叫 lsbeat 吧。
生成项目模板之前,我们需要下载 Beats generator 包文件,就在 beats 仓库。安装好 Golang之后,你就可以很方便的使用 go get 命令来下载 Beats generator 包文件了。 当你执行下面的这个命令后,所有的源码文件都会下载到 $GOPATH/src 目录。
$ go get github.com/elastic/beats
在 GOPATH 下创建一个以你自己github账号名称命名的目录,并切换过去,然后执行 cookiecutter 命令并指向 Beat Generator 源码路径。
$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat
Cookiecutter 接下来会问你几个问题,比如项目名称,我们输入:lsbeat;你的 github 用户名,输入你自己的 github 账户;还有两个关于beat和beat_path应该会自动识别,默认回车就好;最后的问题,你可以输入你的姓和名。
project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}
现在应该已经创建好了一个名为 lsbeat 的目录,并且里面应该会生成一些文件,让我们一起来看一下吧,结构如下:
$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│ └── lsbeat.go
├── config
│ ├── config.go
│ └── config_test.go
├── dev-tools
│ └── packer
│ ├── Makefile
│ ├── beats
│ │ └── lsbeat.yml
│ └── version.yml
├── docs
│ └── index.asciidoc
├── etc
│ ├── beat.yml
│ └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
└── system
├── config
│ └── lsbeat.yml.j2
├── lsbeat.py
├── requirements.txt
└── test_base.py
我们刚刚已经生成好了一个原始的 Beat 模板了,但是你还需要获取相关的依赖和设置好 git 仓库。
首先,你需要拉取依赖的相关包信息。我们的这个例子是 lsbeat,我们先做一些基本的配置,回头再看看详细看看其它的模板和配置文件,只需要执行 make setup 就可以自动获取依赖。
$ make setup
当你创建好了自己的 Beat 之后,记得上传到 github 仓库,并和社区进行分享哦,如下:
要 push lsbeat 到你的 git 仓库,只需要执行如下命令:
$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master
恭喜你,现在你已经完成了一个 Beat ,并且发布了第一个版本到了 Github,不过里面还没有什么具体内容,现在让我们进一步看看里面的代码吧。
第四步 - 配置
执行过上面一系列命令之后,项目里将会自动创建名为 lsbeat.yml 和 lsbeat.template.json 的配置文件。所有的基本配置项都已经生成在了里面。
lsbeat.yml:
lsbeat: # Defines how often an event is sent to the output period: 1s
Period 参数包含在每一个生成的 Beats 里面,它表示 lsbeat 将会每隔 1 秒钟轮询一次。这里我们修改 period 时间间隔为 10 秒,还可以在修改 etc/ 目录下面的 beat.yml 文件。所以这里新增一个 path 参数表示我们具体要监听哪个目录。
lsbeat: # Defines how often an event is sent to the output period: 10s path: "."
参数添加好了之后,我们只需要运行 make update 命令就能让这些修改应用到配置文件 lsbeat.yml。
$ make update $ cat lsbeat.yml
################### Lsbeat Configuration Example ######################### ############################# Lsbeat ###################################### lsbeat: # Defines how often an event is sent to the output period: 10s path: "." ###############################################################################
修改完 yml 文件,记得修改 config/config.go 文件,添加一个 path 参数。
package config
import "time"
type Config struct {
Period time.Duration `config:"period"`
Path string `config:"path"`
}
var DefaultConfig = Config{
Period: 10 * time.Second,
Path: ".",
}
同时我们修改 period 默认时间间隔为 10 秒,默认监听的是当前目录 (.) 。
第五步 - 添加代码
每一个 Beat 都需要实现Beater 接口,里面定义了 Run() 和 Stop() 函数。我们可以定义一个名为 Lsbeat 的结构体,然后用这个对象实现 Beater 接口,再添加字段 lastIndexTime 来保存最后运行的时间戳信息。
type Lsbeat struct {
done chan struct{}
config config.Config
client publisher.Client
lastIndexTime time.Time
}
另外,每个 Beat 还需要实现 New() 方法来接收 Beat 配置信息和返回 Lsbeat 的具体实例。
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
ls := &Lsbeat{
done: make(chan struct{}),
config: config,
}
return ls, nil
}
在我们的 lsbeat 例子中,我们要做的就是扩展默认的 Run() 函数来导出指定目录的文件和子目录信息。
在修改 Run() 函数之前,我们先在 lsbeat.go 增加 listDir() 函数。这就是我们前面最开始测试的那段代码,把收集目录和文件信息的简单例子稍微修改一下。另外我们还要生成以下字段信息:
"@timestamp": common.Time(time.Now())"type": beatname"modtime": common.Time(t)"filename": f.Name()"path": dirFile + "/" + f.Name()"directory": f.IsDir()"filesize": f.Size()
第一次运行的时候,我们将索引所有的文件和目录信息,然后我们定期检查是否有新文件被创建或者修改,再索引这些新创建的文件和目录。每次定期检查的时间戳都会保存在 lasIndexTime 变量,完整代码如下:
func (bt *Lsbeat) listDir(dirFile string, beatname string) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()
path := filepath.Join(dirFile, f.Name())
if t.After(bt.lastIndexTime) {
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": beatname,
"modtime": common.Time(t),
"filename": f.Name(),
"path": path,
"directory": f.IsDir(),
"filesize": f.Size(),
}
bt.client.PublishEvent(event)
}
if f.IsDir() {
bt.listDir(path, beatname)
}
}
}
记住在最开始需要导入 "io/ioutil" 包。
import (
"fmt"
"io/ioutil"
"time"
)
现在,让我们看看如何在 Run() 函数里面调用 listDir() 函数,并且保存时间戳到 lasIndexTime 变量。
func (bt *Lsbeat) Run(b *beat.Beat) error {
logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
for {
now := time.Now()
bt.listDir(bt.config.Path, b.Name) // call listDir
bt.lastIndexTime = now // mark Timestamp
logp.Info("Event sent")
select {
case <-bt.done:
return nil
case <-ticker.C:
}
}
}
函数 Stop() 用来中断 run 的循环执行,保持默认生成的就行。
func (bt *Lsbeat) Stop() {
bt.client.Close()
close(bt.done)
}
到这里,编码部分基本就完成了。我们接下来添加新字段到 mapping 中,修改文件 etc/fields.yml。
- key: lsbeat
title: LS Beat
description:
fields:
- name: counter
type: integer
required: true
description: >
PLEASE UPDATE DOCUMENTATION
#new fiels added lsbeat
- name: modtime
type: date
- name: filename
type: text
- name: path
- name: directory
type: boolean
- name: filesize
type: long
重新应用新的配置。
$ make update
字段 file_name 将使用 nGram 分词,我们还需要在文件 lsbeat.template.json 的 "settings" 节点添加一个自定义的 analyzer。
{
"mappings": {
...
},
"order": 0,
"settings": {
"index.refresh_interval": "5s",
"analysis": {
"analyzer": {
"ls_ngram_analyzer": {
"tokenizer": "ls_ngram_tokenizer"
}
},
"tokenizer": {
"ls_ngram_tokenizer": {
"type": "ngram",
"min_gram": "2",
"token_chars": [
"letter",
"digit"
]
}
}
}
},
"template": "lsbeat-*"
}
第六步 - 编译和运行
现在我们可以进行编译和运行,只需要执行 make 命令就可以编译出可执行文件 lsbeat (lsbeat.exe on windows) 。
$ make
修改 lsbeat.yml 文件,设置需要监听的目录,如:"/Users/ec2-user/go",记住是全路径。
lsbeat: # Defines how often an event is sent to the output period: 10s path: "/Users/ec2-user/go"
同时确保 Elasticsearch 和 Kibana 正常运行。现在就运行一下 lsbeat 命令:
$ ./lsbeat
打开Kibana,通过调用 _cat 接口检视索引是不是创建了。
可以看到创建了一个名为 lsbeat-2016.06.03 的索引,并且已经包含了一些文档。现在对 filename 字段查询一下,由于使用的是 nGram 分词,使支持模糊匹配,因此需要使用 lsbeat 关键字搜索。
大功告成! 恭喜你,你已经完成了第一个属于你自己的 beat。