엔지니어링

나만의 Beat 만들기

Beats 는 다양한 종류의 데이터들을 Elasticsearch로 전송하기 위한 가벼운 오픈소스 플랫폼(lightweight data shipper)입니다. 저희는 서버의 네트워크 트래픽을 모니터링 하는 Packetbeat, 로그를 가져오기 위한 Filebeat 그리고 최근 새롭게 출시된 원격 시스템의 다양한 상태값을 가져오기 위한 Metricbeat을 제공하고 있습니다. 만약에 또 다른 종류의 사용자 정의 데이터를 수집해야 한다면, 여러분은 libbeat 프레임워크 기반으로 나만의 Beat 를 만들 수 있습니다. 이미 Beats 커뮤니티로부터 25개 이상의  Community Beats 들이 만들어져 있습니다.

Beats는 개발자들이 자신이 필요로 하는 Beat을 만들 수 있도록 libbeat 이라고 하는 라이브러리와 Beat Generator 라는 패키지를 제공하고 있습니다. 이번 포스트에서는 Beat Generator를 사용해서 직접 나만의 Beat 를 만드는 법을 알아보도록 하겠습니다. 오늘 우리가 만들 Beat은 유닉스 계열에서  ls 명령을 실행하면 나오는 파일, 디렉토리 리스트 정보를 elasticsearch 에 색인하는 lsbeat 입니다. 오늘 예제는 Unix 시스템 기반으로 진행이 되기 때문에 Windows 또는 다른 계열의 OS 에서는 추가적으로 필요한 정보들을 확인 하시고 그에 맞게 설치 및 진행을 하시기 바랍니다.

Step 1 - Go 언어 환경 설정

Beats는 Go 언어(Golang)로 개발이 되어 있습니다. Beats를 만들기 위해서는 시스템에 Go 언어가 설치되어 있어야 합니다.  Golang 설치 가이드 를 따라 Go 언어를 설치하세요. 현재 Beats는 최소한 Golang 1.6 버전을 필요로 합니다. 또한  $GOPATH 환경변수를 정확하게 설정했는지도 확인하세요.

여기서 잠깐 우리가 개발 할 Lsbeat 에서 사용할 코드를 살펴보겠습니다. 이 간단한 Golang 프로그램은 커맨드 라인 매개변수에 지정된 경로의 모든 파일과 디렉토리 그리고 하위 디렉토리의 목록을 조회합니다.

package main
import (
    "fmt"
    "io/ioutil"
    "os"
)
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}

우리는 위 프로그램에서 listDir 함수의 코드를 재사용 할 것입니다. 

Step 2 - 생성

Beat를 만들기 위해 우리는 Beat Generator 를 사용할 것입니다. 먼저 우리는 cookiecutter 를 설치해야 합니다.  이 링크에서 설치 가이드를 확인하세요. cookiecutter 설치가 끝난 다음에, 우리는 우리가 만들 Beat의 이름을 지정해야 합니다. 이름은 반드시 한 단어이며 영어 소문자이어야 합니다. 오늘 예제에서 사용할 이름은  lsbeat 입니다.

Beat skeleton 을 생성하기 위해 우리는 beats 리파지토리에 있는 Beats generator 패키지를 내려받아야 합니다. Golang 을 설치하고 나면 go get 명령을 이용해서 Beats generator 를 내려받을 수 있습니다. 명령을 실행하고 나면 모든 소스 파일들이 $GOPATH/src 경로 아래로 내려받아지게 됩니다.

$ go get github.com/elastic/beats

먼저 올바른 브랜치에서 동작하도록 체크아웃을 해 줘야 합니다.

$ cd $GOPATH/src/github.com/elastic/beats
$ git checkout 5.1

이제 GOPATH 아래에 있는 여러분의 리파지토리 아래로 이동을 해서 Beat Generator 를 지정한 cookiecutter를 실행합니다.

$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat

Cookiecutter 가 몇가지 질문을 합니다. project_name에는 lsbeat를 입력하고, github_user 에는 여러분의 github id를 입력합니다. 다음의 두 질문 beat 그리고 beat_path는 이미 자동으로 완성 되어 있을 것입니다. 마지막 질문에는 이름(Firstname)과 성(Lastname) 을 입력합니다.

project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}

이제 자동으로 lsbeat 디렉토리와 몇개의 파일들이 생성됩니다. 생성된 디렉토리로 가서 어떤 파일들이 생성되었는지 확인을 해 보겠습니다.

$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py

이제 우리는 Beat의 raw template을 생성했지만 의존 파일들(dependencies)을 내려받고 git 리파지토리 설정을 해야 합니다.

우리에게 필요한 의존 패키지는 현재 libbeat과 기본적인 설정 파일 그리고 템플릿 파일입니다. 설정 파일과 템플릿 파일은 좀 더 나중에 다시 살펴보도록 하겠습니다.

$ make setup

나만의 Beat을 생성한 다음에는 Github 리파지토리에 업로드를 해서 커뮤니티와 함께 공유를 합시다.

Screen Shot 2016-07-13 at 10.53.58 AM.png

lsbeat을 Github 리파지토리에 push 하기 위해 다음 명령어를 실행합니다.

$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master

이제 우리의 첫 버전 Beat을 Github에 push 했습니다. 빌드 그리고 실행을 해 보고 나서 코드로 더 깊이 들어가보도록 하겠습니다.

Step 4 - 환경 설정

앞의 과정을 진행하고 나면 자동적으로 lsbeat.yml 그리고 lsbeat.template.json 파일들이 생성됩니다. 모든 기본 설정들은 이미 이 파일들 안에 입력이 되어 있습니다.

lsbeat.yml:

lsbeat:
  # Defines how often an event is sent to the output
  period: 1s

period 는 모든 Beat의 generator에 포함되어 있는 파라메터입니다. 현재는 Lsbeat 가 매 1초마다 프로세스를 반복하도록 설정 되어 있습니다. 이 period 값을 1에서 10으로 변경하고 프로그램이 스캔할 top 디렉토리를 가리키는 새 파라메터 path 를 추가 해 보겠습니다. 우리는 새 파라메터를 etc/ 디렉토리 아래 있는 beat.yml 파일에서 추가할 수 있습니다.

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."

새 파라메터들을 추가 한 뒤 make update 명령을 통해 변경된 설정들을 lsbeat.yml 설정 파일에 반영합니다. etc/beat.yml 에 설정한 파라메터들이 이제 lsbeat.yml에서 사용할 수 있음을 확인할 수 있습니다.

$ make update
$ cat lsbeat.yml
################### Lsbeat Configuration Example #########################
############################# Lsbeat ######################################
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################

환경설정 파일이 업데이트 된 후에는 path 파라메터의 사용이 가능하도록 config/config.go 파일을 수정해줘야 합니다.

package config
import "time"
type Config struct {
    Period time.Duration `config:"period"`
    Path   string        `config:"path"`
}
var DefaultConfig = Config{
    Period: 10 * time.Second,
    Path:   ".",
}

period의 디폴트 값을 10초로 설정하고 디폴트 디렉토리는 현재 디렉토리(.) 로 설정합니다.

Step 5 - 코드 추가

모든 Beat는 Run() 그리고 Stop() 함수가 정의된 Beater 인터페이스를 상속받아야 합니다. Beater에 대한 더 자세한 가이드는 여기에서 확인하세요.

우리는 Beater 인터페이스를 상속받은 Lsbeat 객체를 정의하고 있는 구조체(struct)  Lsbeat 를 선언해야 합니다. 그리고 여기에 마지막 타임스탬프 값을 저장할 lastIndexTime 변수를 추가합니다.

type Lsbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client
    lastIndexTime time.Time
}

추가적으로, 모든 Beat는 환경설정 값을 받아 Lsbeat 객체 타입으로 리턴하는 New() 함수를 상속합니다.

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    if err := cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }
    ls := &Lsbeat{
        done:   make(chan struct{}),
        config: config,
    }
    return ls, nil
}

Lsbeat가 파일과 디렉토리 정보를 출력(export)할 수 있도록 우리는 Run() 함수에 기능을 추가합니다.

Run() 함수를 수정하기 전에, 먼저 lsbeat.go 파일 맨 아래에 파일과 디렉토리 정보를 수집하는 listDir() 함수를 추가합니다. 이 함수는 다음의 정보들을 포함한 이벤트들을 생성합니다.

  • "@timestamp":  common.Time(time.Now())
  • "type":        beatname
  • "modtime":     common.Time(t)
  • "filename":    f.Name()
  • "path":    dirFile + "/" + f.Name()
  • "directory": f.IsDir()
  • "filesize":    f.Size()

이 함수는 첫 루틴에 모든 파일과 디렉토리를 색인하지만, 두번째 루틴 이후 부터는 첫번째 루틴 이후에 생성 또는 수정된 파일과 디렉토리들만 체크해서 새로운 파일과 디렉토리 정보만 색인 할 것입니다. 가장 최근의 타임스탬프 정보는 lasIndexTime 변수에 저장됩니다.

func (bt *Lsbeat) listDir(dirFile string, beatname string, init bool) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        event := common.MapStr{
            "@timestamp": common.Time(time.Now()),
            "type":       beatname,
            "modtime":    common.Time(t),
            "filename":   f.Name(),
            "path":       dirFile + "/" + f.Name(),
            "directory":  f.IsDir(),
            "filesize":   f.Size(),
        }
        if init {
            // index all files and directories on init
            bt.client.PublishEvent(event) 
        } else {
            // Index only changed files since last run.
            if t.After(bt.lastIndexTime) {
                bt.client.PublishEvent(event)
            }
        }
        if f.IsDir() {
            bt.listDir(dirFile+"/"+f.Name(), beatname, init)
        }
    }
}

라이브러리에 io/ioutil 패키지를 import 하는것을 빼놓으면 안됩니다.

import (
    "fmt"
    "io/ioutil"
    "time"
)

이제 Run() 함수에 listDir() 함수를 호출하고 타임스탬프를 lasIndexTime 에 저장하는 로직을 추가합니다.

func (bt *Lsbeat) Run(b *beat.Beat) error {
    logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    counter := 1
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }
        bt.listDir(bt.config.Path, b.Name, true)   // call lsDir
        bt.lastIndexTime = time.Now()               // mark Timestamp
        logp.Info("Event sent")
        counter++
    }
}

Stop() 함수는 실행중인 루프를 멈추는 동작이며 이 함수는 처음 생성된 그대로 둡니다.

func (bt *Lsbeat) Stop() {
    bt.client.Close()
    close(bt.done)
}

코딩이 거의 끝나갑니다. 우리는 매핑 정보에 새로 추가된 필드들을 추가해야 합니다. etc/fields.yml 파일에 새 필드들의 정보를 추가합니다.

- key: lsbeat
  title: LS Beat
  description: 
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

그리고 업데이트를 반영합니다.

$ make update

filename 필드는 nGram 토크나이저로 분석을 할 것입니다.  lsbeat.template.json 의 "settings" 항목에 사용자 정의 애널라이저를 추가 해 줍니다.

{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

Step 6 - 빌드와 실행

이제 우리는 빌드와 실행을 할 수 있습니다. 간단히 make 명령을 실행하면 코드들이 컴파일되고 lsbeat (윈도우에서는 lsbeat.exe) 바이너리 실행 파일이 생성됩니다.

$ make

lsbeat.yml 파일에 루트 디렉토리를 설정합니다. 이 예제에서는 $GOPATH 인 /Users/ec2-user/go 를 설정 해 주었습니다. 반드시 full path로 설정을 해야 합니다.

lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"

그리고 Elasticsearch 와 Kibana도 실행중인지 반드시 확인하세요. 이제 Lsbeat를 실행 해 봅니다.

$ ./lsbeat

_cat API를 이용해서 인덱스가 제대로 생성되었고 데이터가 색인되고 있는지 확인합니다.cat.png

lsbeat-2016.06.03 인덱스와 도큐먼트 숫자들이 보입니다. 이제 nGram tokenizer가 적용된 filename 필드에서 lsbe 라는 키워드로 쿼리를 실행 해 봅시다.

query.png

정상적으로 실행이 되었습니다! 축하합니다, 우리의 첫번째 beat가 완성이 되었습니다.