ELK简介及架构分析

简介ELK 是 Elasticsearch、Logstash、Kibana 三大开源框架首字母大写简称。市面上也被...


之前已经介绍过了 ELK,并且普及了它的一些基本概念,也画了它的架构。现在来介绍介绍 Filebeat,至于为什么要介绍 Filebeat,相信看完你就知道了。

Filebeat简介

Filebeat 是本地文件的日志数据采集器,可监控日志目录或特定日志文件(tail file),并将它们转发给 Elasticsearch 或Logstatsh、kafka 等。带有内部模块(auditd,Apache,Nginx,System 和 MySQL),通过一个指定命令来简化通用日志格式的收集,解析和可视化。

日志采集器有很多,比如 Logstash,功能虽然强大,但是它依赖 java、在数据量大的时候,Logstash 进程会消耗过多的系统资源,这将严重影响业务系统的性能,而 Filebeat 就是一个完美的替代者,它基于 Go 语言没有任何依赖,配置文件简单,格式明了,同时,Filebeat 比 Logstash 更加轻量级,所以占用系统资源极少,非常适合安装在生产机器上。这就是推荐使用 Filebeat 来作为日志采集器的原因。Filebeat 可以直接(或者通过 Logstash)将数据发送到 Elasticsearch、Kafka 或者Redis,在那里可以进一步处理和增强数据,然后在 Kibana 中将其可视化,目前来说 Filebeat 是 ELK 日志系统在 Agent 上的第一选择。

工作原理

Filebeat原理

Filebeat 涉及两个组件:查找器 prospector 和采集器 harvester,来读取文件(tail file)并将事件数据发送到指定的输出。

Filebeat 的工作流程如下:当开启 Filebeat 程序的时候,它会启动一个或多个探测器去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,Filebeat 会启动收集进程,每一个收集进程读取一个日志文件的内容,然后将这些日志数据发送到后台处理程序,后台处理程序会集合这些事件,最后发送集合的数据到 output 指定的目的地。
process

当发送数据到 Logstash 或 Elasticsearch 时,Filebeat 使用一个反压力敏感(backpressure-sensitive)的协议来缓解高负荷的数据量。当 Logstash 数据处理繁忙时,Filebeat 放慢它的读取速度。一旦压力解除,Filebeat 将恢复到原来的速度,继续传输数据。

采集器Harvester

Harvester 负责读取单个文件的内容。读取每个文件,并将内容发送到 the output,每个文件启动一个 harvester, harvester 负责打开和关闭文件,这意味着在运行时文件描述符保持打开状态。

如果文件在读取时被删除或重命名,Filebeat 将继续读取文件。这会有副作用,即在 harvester 关闭之前,磁盘上的空间被保留。默认情况下,Filebeat 将文件保持打开状态,直到达到 close_inactive 状态。

关闭harvester会产生以下结果:

  1. 如果在 harvester 仍在读取文件时文件被删除,则关闭文件句柄,释放底层资源
  2. 文件的采集只会在 scan_frequency 过后重新开始
  3. 如果在harvester关闭的情况下移动或移除文件,则不会继续处理文件

要控制收割机何时关闭,请使用 close_ * 配置选项。

查找器Prospector

Prospector 负责管理 harvester 并找到所有要读取的文件来源。如果输入类型为日志,则查找器将查找路径匹配的所有文件,并为每个文件启动一个 harvester。每个 prospector 都在自己的 Go 协程中运行。

Filebeat 目前支持两种 prospector 类型:log 和 stdin。每个 prospector 类型可以定义多次。日志 prospector 检查每个文件来查看 harvester 是否需要启动,是否已经运行,或者该文件是否可以被忽略(请参阅 ignore_older)。

只有在 harvester 关闭后文件的大小发生了变化,才会读取到新行。

Filebeat prospector 只能读取本地文件,没有功能可以连接到远程主机来读取存储的文件或日志。

Filebeat安装使用

Filebeat安装

Filebeat 基于 go 语言开发无其他依赖,它最大的特点是性能稳定、配置简单、占用系统资源很少,安装使用也非常简单,可访问 Elastic-Beats 官网获取各版本 Filebeat。因为 Filebeat 各版本之间的差异较大,这里推荐7以上的新版,首先进行下载解压:

tar -zxvf filebeat-7.tar.gz
mv filebeat-7 filebeat
cd filebeat

Filebeat启动停止指令

调试模式下采用:终端启动(退出终端或 ctrl+c 会退出运行)

./filebeat -e -c filebeat.yml

线上环境配合 error 级别使用:以后台守护进程启动启动 filebeat

nohup ./filebeat -e -c filebeat.yml &

零输出启动(不推荐):将所有标准输出及标准错误输出到 /dev/null 空设备,即没有任何输出信息。

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

停止运行 FileBeat 进程

ps -ef | grep filebeat
Kill -9 线程号

Filebeat配置文件

FileBeat 的配置文件定义了在读取文件的位置,输出流的位置以及相应的性能参数,本实例是以 Kafka 消息中间件作为缓冲,所有的日志收集器都向 Kafka 输送日志流,相应的配置项如下,并附配置说明:

$ vim /usr/local/filebeat/fileat.yml

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

# json-api
- paths:
    - /xxx/xxx/xxx/app/*.log
document_type: json-app-api
input_type: log

- paths:
    - /xxx/xxx/xxx/web/*.log
document_type: json-web-api
input_type: log

- paths:
    - /xxx/xxx/xxx/error/*.log
document_type: json-error-api
input_type: log


#- input_type: log

# Paths that should be crawled and fetched. Glob based paths.
#  paths:
#    - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ["^DBG"]

# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ["^ERR", "^WARN"]

# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: [".gz$"]

# Optional additional fields. These field can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
#  level: debug
#  review: 1

### Multiline options

# Mutiline can be used for log messages spanning multiple lines. This is common
# for Java Stack Traces or C-Line Continuation

# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
#multiline.pattern: ^\[

# Defines if the pattern set under pattern should be negated or not. Default is false.
#multiline.negate: false

# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
#multiline.match: after


#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#  hosts: ["localhost:9200"]

# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"

#----------------------------- Logstash output --------------------------------
#ioutput.logstash:
# The Logstash hosts
#  hosts: ["xxx.xxx.x.xxx:9011"]

output.kafka:
  hosts: ["kafka-1:9092","kafka-2:9092","kafka-3:9092"]
  topic: 'apilog'
  partition.round_robin:
  reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

配置参数说明

参数 说明
paths 指定要监控的日志,目前按照Go语言的glob函数处理,没有对配置目录做递归处理
encoding 指定被监控的文件的编码类型,使用plain和utf-8都是可以处理中文日志的
input_type 指定文件的输入类型log(默认)或者stdin
exclude_lines 在输入中排除符合正则表达式列表的那些行
include_lines 包含输入中符合正则表达式列表的那些行(默认包含所有行),include_lines执行完毕之后会执行exclude_lines
exclude_files 忽略掉符合正则表达式列表的文件(默认为每一个符合paths定义的文件都创建一个harvester)
fields 向输出的每一条日志添加额外的信息,比如"level:debug",方便后续对日志进行分组统计。默认情况下,会在输出信息的fields子目录下以指定的新增fields建立子目录
fields_under_root 如果该选项设置为true,则新增fields成为顶级目录,而不是将其放在fields目录下。自定义的field会覆盖filebeat默认的field
ignore_older 可以指定Filebeat忽略指定时间段以外修改的日志内容,比如2h(两个小时)或者5m(5分钟)
close_older 如果一个文件在某个时间段内没有发生过更新,则关闭监控的文件handle。默认1h
force_close_files Filebeat会在没有到达close_older之前一直保持文件的handle,如果在这个时间窗内删除文件会有问题,所以可以把force_close_files设置为true,只要filebeat检测到文件名字发生变化,就会关掉这个handle
scan_frequency Filebeat以多快的频率去prospector指定的目录下面检测文件更新(比如是否有新增文件),如果设置为0s,则Filebeat会尽可能快地感知更新(占用的CPU会变高)。默认是10s
document_type 设定Elasticsearch输出时的document的type字段,也可以用来给日志进行分类
harvester_buffer_size 每个harvester监控文件时,使用的buffer的大小
max_bytes 日志文件中增加一行算一个日志事件,max_bytes限制在一次日志事件中最多上传的字节数,多出的字节会被丢弃。默认是10MB
multiline 适用于日志中每一条日志占据多行的情况,比如各种语言的报错信息调用栈
multiline.pattern 多行日志开始的那一行匹配的pattern
multiline.negate 是否需要对pattern条件转置使用,不翻转设为true,反转设置为false
multiline.match 匹配pattern后,与前面(before)还是后面(after)的内容合并为一条日志
multiline.max_lines 合并的最多行数(包含匹配pattern的那一行),默认为500行
multiline.timeout 到了timeout之后,即使没有匹配一个新的pattern(发生一个新的事件),也把已经匹配的日志事件发送出去
multiline.tail_files 如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送,而不是从文件开始处重新发送所有内容
multiline.backoff Filebeat检测到某个文件到了EOF之后,每次等待多久再去检测文件是否有更新,默认为1s
multiline.config_dir 如果要在本配置文件中引入其他位置的配置文件,可以写在这里(需要写完整路径),但是只处理prospector的部分
multiline.publish_async 是否采用异步发送模式(实验功能)

异常堆栈的多行合并问题

在收集日志过程中还常常涉及到对于应用中异常堆栈日志的处理,此时有两种方案,一种是在采集时归并,一种是 Logstash 过滤时归并,建议在客户端agent上直接实现堆栈的合并,把合并操作的压力在输入源头上进行控制,filebeat 合并行的思路有两种,正向和逆向处理。由于 filebeat 在合并行的时候需要设置 negate 和 match 来决定合并动作,意义混淆,简直是一种糟糕的设计,直接附上配置源码和说明便于理解

符合条件才合并,容易有漏网之鱼

multiline: 
    pattern: '^[[:space:]]+(at|\.{3})\b|^Caused by:'
    negate:  false
    match:   after

negate 参数为 false,表示“否定参数=false”。multiline 多行参数负负得正,表示符合 pattern、match 条件的行会融入多行之中、成为一条完整日志的中间部分。如果 match=after,则以b开头的和前面一行将合并成一条完整日志;如果match=before,则以b开头的和后面一行将合并成一条完整日志。

不符合条件通通合并,需事先约定

multiline:
    pattern: '^\['
    negate:  true
    match:   after

negate参数为true,表示“否定参数=true”。multiline 多行参数为负,表示符合 match 条件的行是多行的开头,是一条完整日志的开始或结尾。如果 match=after,则以b开头的行是一条完整日志的开始,它和后面多个不以b开头的行组成一条完整日志;如果 match=before,则以b开头的行是一条完整日志的结束,和前面多个不以b开头的合并成一条完整日志。

ELK之Filebeat架构

上一篇文章里我画了ELK的两种架构,这次介绍的是用 Filebeat 替换了 Logstatsh 的一种高可用,低耦合进行平行扩展后的ELK架构,在有条件的生产级部署中,强烈推荐此种部署架构。架构图如下:
architecture
当然,kafka 可能不止两个节点,而是更多节点,Elasticsearch 也可以集群分布式部署更多节点。

Kibana效果如图:
kibana

最后修改:2023 年 09 月 11 日
如果觉得我的文章对你有用,请随意赞赏