索引:
第一部分 运行机制
1.1 构架简介
1.2 事件生命周期
1.3 queue 简介
1.4 线程简介
1.5 配置简介
1.6 多实例运行
1.7 Pipeline配置
第二部分 插件详解
2.1 input及glob
2.2 codec
2.3 filter插件date
2.4 filter 插件grok
2.5 filter 插件dissect
2.6 filter插件mutate
2.7 filter插件json
2.8 filter插件geoip及ruby
2.9 output插件
第一部分 运行机制
1.1 构架简介
Logstash
- 是一个数据收集处理引擎
- 可以理解成是一个ETL工具
如图
- 左边是各种数据源,log日志、excel、http server、数据库
- 右边是功能:基本指标分析、归档、输出到es、监控、报警
Logstash分为三个阶段:
- Input – 数据采集
- Filter – 数据解析/转换
- Output – 数据输出
下图展示了一个相对完备的Logstash处理流:
术语解释:
Pipeline
- 是指input-Filter-output的3阶段处理流程
- 队列管理
- 插件生命周期管理(如果自己写插件的话需要处理)
Logstash Event(是一个java object有一些api)
- 内部流转的数据表现形式
- 原始数据在input被转换为Event,在output event被转换为目标格式数据
- 在配置文件中可以对Event中的属性进行增删改查
构架简介:
一个简单的配置文件:
这个例子的执行流程(之前我们说过分为三步):
- 输入Input:
- 过滤Filter:注意这个例子中没有filter
- 输出:
执行这个例子:
首先到官网去下载Logstash,下载好了解压后就可以直接使用了,不需要安装,但是在使用之前需要自己写一个conf文件,也就是定义我们上面的三个执行流程(实际上只有两个因为filter这里我们没有设置,但是至少要有输入和输出吧。)配置文件是:
codec.conf
1 2 3 4 5 6 7 8 9 10 11 |
input{ stdin{ codec => line } } output { stdout{ codec => json } } |
然后直接在命令行执行:
1 |
echo "foo\nbar"|bin/logstash -f codec.conf |
可以看到拿到了预期json格式的输出:
1.2 事件生命周期Life_of_an_Event
构架6.x:
实际上input是可以有多个,每个input有一个codec,queue负责把流入的数据分发到不同的Pipeline中,每个Pipeline有filter和output和batcher,如图中有3个Pipeline,batcher的作用是从queue中批量的取数据。batcher当event的数目或者时间达到了一定的条件就会到filter中,然后再到output,到了output之后会有发送一个ACK给Queue来告诉Queue这些Logstash Event已经处理完了。注意这里的Worker threads是同步进行的,也就是说线程越多,那么处理数据的能力就越强。
1.3 Queue分类
在Logstash中有两类Queue:
1.In Memory
- 无法处理进程Crash、机器宕机等情况,会导致数据丢失
2.Persistent Queue In Disk
- 可处理进程Crash等情况,保证数据不丢失
- 保证数据至少消费一次
- 充当缓冲区,可以替代Kafka等消息队列的作用
持久队列Persistent queue实现容灾的能力的原理
- Input 把数据交给持久队列,持久队列把数据写入disk中后告诉input已经写入了(Input必须要支持response机制,一般都是支持的)
- 之后持久队列再把数据就给filter/output,处理完了之后filter/output告诉持久队列现在已经处理完了
- 持久队列收到filter/output处理完数据的消息之后就数据从disk中删除
持久队列和内存队列的性能分析:
可以看到两种队列方式的性能基本上差不多,因此在平时的使用的时候建议打开PQ(默认的是memory)
PQ的主要配置
- queue.type:persisted(默认是memory)
- queue.max_bytes:4gb(队列的最大数据量,默认是1GB,建议开大一些)
- 还有其他的参数详细参照文档
1.4线程
线程主要分为两类一类是每个Input都会有一个线程,另一类似每个Pipeline(Batcher -> Filter -> Output)都会有一个线程。一般情况先我们调整的是Pipeline的线程数
Pipeline线程相关配置:
- pipeline.workers | -w :Pipeline线程数,及filter_output的处理线程数,默认是CPU的核数
- pipeline.batch.size | -b :Batcher一次批量获取的待处理文档数,默认125(建议向es输出的时候10-20Mb之间,可以计算一下文档数),可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整
- pipeline.batch.delay | -u :Batcher的等待时长,单位为ms
使用visualVM查看Logstash的线程
vistualVM是一个JVM的性能分析软件,可以从程序运行时获得实时数据,从而进行动态的性能分析。首先我们正常的启动logstash
1 |
bin/logstash -f codec.conf |
可以在visualVM中看到:
- <代表输入,这里是stdin标准输入
- >代表输出
- main才是Logstash的线程
- 可以看到开启了8个线程因为我的电脑是八核的
- 也就是说不设置的话默认是几核的电脑就开启几个线程
接下来我们把线程设置一下,设置成1看看效果,可以在visualVM中看到字开启了一个线程:
1 |
bin/logstash -f codec.conf -w 1 |
1.5 Logstash配置文件
配置文件主要有两个:
1.logstash设置相关的配置文件(conf文件夹中,setting files)
logstash.yml 是Logstash的相关配置,比如
- node.name:节点名,便于识别,建议定义成有实际意义的名字
- path.data:持续化存储数据的文件夹,默认是在Logstash home目录下的data,一个path.data目录只能够被一个Logstash实例使用,如果设置了PQ,那么持久化队列的内容就会存在这里。
- pipeline.config:设定Pipeline配置文件的目录
- queue.type:设置pipeline日志文件
- pipeline.workers:设定pipeline的线程数(Filter+output),优化的常用项
- pipeline.batch.size/delay:设定批量处理数据的数目和延迟
- queue.type:设定队列类型,默认是memory
- queue.max_bytes:队列容量,默认是1g
- 这些配置会在Logstash启动的时候进行加载,并且这其中的配置可以被命令行中的相关参数覆盖,支持yaml语法支持如下两种形式(层级结构和扁平结构):
命令行配置选项:
jvm.options修改jvm的相关参数,比如修改heap size等等
2.Pipeline配置文件
- 定义数据处理流程的文件,以.conf结尾
1.6 多实例运行
本节主要说明在一台机器上运行多个Logstash实例应该如何实现,主要是通过配置中的path.setting来实现:
- bin/logstash –path.settings instance1
- bin/logstash –path.settings instance2
- 不同instance中修改Logstash.yml,自定义path.data,确保其不相同即可。
实际操作:
将config/logstash.yml赋值一份叫做instance2
1 |
cp -r config instance2 |
对instance2的中的logstash.yml的配置进行修改,
instance2 -> logstash.yml(注意冒号后面必须跟一个空格)
1 2 3 |
node.name: bing path.data: instance2 path.config: test.conf |
对于默认配置的logstash.yml只需要修改path.config即可,其他的保持默认配置:
config -> path.config
1 |
path.config: codec.conf |
其中test.conf和codec.conf都是之前提到的配置文件:
1 2 3 4 5 6 7 8 9 10 11 |
input{ stdin{ codec => line } } output { stdout{ codec => json } } |
然后启动默认的Logstash实例:
1 |
bin/logstash |
启动instance2的Logstash实例:
1 |
bin/logstash --path.settings instance2 |
可以看到这样就有优雅的在一台机器上运行了两个Logstash实例:
1.7 pipeline配置
用于配置input、filter和output插件,框架基本结构是(在大括号里面写一下需要用到的插件):
1 2 3 |
input{} filter{} output{} |
pipeline配置语法:
- 布尔类型 Boolean:isFailed => true
- 数值类型 Number:port => 33
- 字符串类型 String: name => “”Hello World”
- 数组类型:users => [{id=>1, name=>bbb}, {id => 2, name => jane}] 或者 path => [“/var/log/messages”,”/var/log/*.log”]
- 哈希类型:match => {“field” => “value1” “field”=>”value2”}
- 备注:#
在配置中可以引用Logstash Event的属性(字段),主要由下面两种方式:
- 直接引用字段值 – 直接使用[]即可,嵌套字段的话直接写多层即可
- 在字符串中以 sprintf 方式引用 – 使用%{}来实现
配置文件支持条件判断语法,增强了配置的多样性,主要格式如下:
1 2 3 4 5 6 7 |
if EXPRESSION{ ... } else if EXPRESSION { ... } else { ... } |
表达式主要包含如下的操作符:
- 比较:== != < > <= >=
- 正则是否匹配:=~ =
- 包含(字符串或者数组):in, not in
- 布尔操作符:and or nand xor !
- 分组操作符(条件非常复杂的时候,其实就相当于括号):()
一个例子:
第二部分 插件详解
2.1 input及glob
input插件指定数据的输入源,一个pipeline可以有多个插件,都大同小异主要讲解一下三个input插件:
1.stdin
最简单的输入,从标准输入读取数据(没有自己特别的配置),通用配置(所有的input都可以使用)为:
- codec 类型为codec,Logstash会检查输入的字符串是否存在codec,如果不存在就会认为这个输入是有问题的
- type 类型为string,自定义该事件的类型,可用于之后的判断
- tags类型为array,自定义该事件的tag,可用于后续的判断
- add_field类型为hash,为该事件添加字段
study -> input-stdin.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
input{ stdin{ codec => "plain" tags => ["test"] type => "std" add_field => {"key"=>"value"} } } output{ stdout{ codec=>"rubydebug" } } |
运行方式:和之前的一样存在一个*.conf文件中,然后-f指定配置运行
1 |
echo "test" | bin/logstash -f input-stdin.conf |
运行结果:
2.file
从文件读取数据,比如Nginx的日志MySQL的log都需要用到这个插件。文件的读取通常要解决几个问题:
- 文件内容如何只被读取一次?即重启LS时,从上次读取的位置继续:sincedb
- 如何及时读取到文件的新内容?定时检查文件是否有更新
- 如何发现新文件并进行读取?可以定时检查新文件
- 如果文件发生了归档(rotation)操作,是否会影响当前的内容读取? 不影响,被归档的文件内容可以继续被读取
关键配置:
- path类型为数组,指明读取文件路径,基于glob语法 path => [“var/log/**/*.log”,”/var/log/message”]
- exclue类型为数组,排除不想监听的文件规则,基于glob语法 exclude => “*.gz”
- sincedb_path类型为字符串,记录sincedb的文件路径
- start_position类型为字符串,设置为beginning或end,是否从头读取文件
- start_interval类型为数值,单位为秒,定时检查文件是否有更新,默认为1秒
- discover_interval类型为数值,单位秒,定时检查是否有新文件待读取,默认15秒
- ignore_older类型为数值,单位秒,扫描文件列表时,如果该文件上次修改的时间超过定时的时长,则不做处理,但依然会监控是否有新的内容,默认是关闭的
- close_older 类型为数值,单位秒,如果监听的文件在超过该设定时间内没有更新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新的内容,默认为3600秒,即1个小时。
glob匹配语法主要包含以下几种匹配符号:
- *匹配任意字符,但不以.开头的隐藏文件,匹配这类文件时要用.*来匹配
- **递归匹配子目录
- ?匹配单一字符
- []匹配多个字符比如[a-z]、[^a-z]
- {}匹配多个单词,比如{foo,bar,hello}
- \转义符号
几个glob匹配的例子:
实际使用:
调试时候的小技巧:
3.kafka
kafka是最流行的消息队列,也是Elastic Stack架构中常用的,使用相对简单:
2.2 codec 插件
Codec Plugin作用于input和output plugin,负责将数据在原始与Logstash Event之间转换,常见的codec有:
- plain 读取原始内容
- dots将内容简化为点进行输出,当不想看到详细的输出,只是想看到正常运行比如说压测的时候看到屏幕上不同的在打点
- rubydebug将Logstash Event按照ruby格式输出,方便调试
- line处理带有换行符的内容
- json处理json格式的内容
- multiline处理多行数据的内容
测试:
当一个Event的message由多行组成时,需要使用该codec,常见的情况是堆栈日志信息的处理,如下图所示:
主要设置参数如下:
- pattern 设置行匹配的正则表达式,可以使用grok
- what previous|next,如果匹配成功,那么匹配行是归属上一个事件还是下一个事件
- negate true or false是否对pattern的结果取反
匹配上述堆栈错误信息:
匹配以[时间戳]开头的日志信息:
2.3 filter插件date – 日期解析
作用:将日期的字符串解析为日期类型,然后替换@timestampo字段或者指定的其他字段
执行效果:
参数:
- match 类型为数组,用于指定日期匹配的格式,可以一个指定多种日期格式
- target:类型为字符串,用于指定赋值的字段名,默认是@timestamp
- timezone:类型为字符串,用于指定时区
2.4 filter 插件grok – 正则匹配解析
有一条非结构化日志:
希望把字段解析到不同的变量中,这个时候需要用正则表达式,但是这样的正则表达式很长基本上是不能够维护的。这个时候就可以使用Grok:
其实就是一些已经写好的有名字的正则表达式可以直接使用,比如说:
Grok提供的pattern有很多,如果能够灵活使用可以做到快速的进行匹配:
Logstash启动的速度很慢,可以在后面加上-r进行热加载。
自定义 grok pattern
- pattern_definitions参数,以键值对的方式定义pattern名称和内容
- pattern_dir参数,以文件的形式被读取
- mathc匹配多种样式:
- overwrite
2.5 filter 插件dissect – 分隔符解析
基于分隔符原理解析数据,解决grok解析时候消耗过多CPU资源的问题
例子:用dissect来解析
dissect的几个语法:
注意dissect和grok分割后都是字符串
2.6 filter插件mutate – 对字段作处理
是使用最频繁的,可以对字段进行各种操作,比如重命名、删除、替换、更新等等,主要操作如下:
- convert 类型转换:实现字段类型转换,类型为hash,支持integer、float、string和boolean
- gsub字符串转换:对字段进行替换,类型为数组,每3项为一个替换配置
- split/join/merge字符串切割、数组合并为字符串、数组合并为
- rename 字段重命名
- update/replace字段内容更新或替换:更新字段内容,区别在于update只在字段存在的时候生效,而replace在字段不存在时会执行新增的字段操作
- remove_field删除字段
2.7 filter插件json – 按照json解析字段内容到指定字段中
将字段内容为json格式的数据进行解析
2.8 filter插件geoip及ruby – 增加地理位置数据 & 利用ruby代码来动态修改Logstash Event
ruby是最灵活的插件,可以以ruby语言来随心所欲的修改Logstash Event对象
2.9 output插件 利用ruby代码来动态修改Logstash Event
负责将Logstash Event输出,常见的插件如下:
- stdout
- file
- elasticsearch
输出到标准输出,多用于调试:
输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到一个文件中,从而方便查阅:
输出到es,是最常用的插件,基于http协议的实现: