深度剖析 ELK:实战经验分享与实用策略之进阶使用Filebeat+Logstash

[root@cong12 ~]# tar -zxvf filebeat-7.3.0-linux-x86_64.tar.gz -C /usr/local/


 [root@cong12 ~]# vim /usr/local/filebeat-7.3.0-linux-x86_64/filebeat.yml
- type: log
  enabled: true              #开启此配置
    - /etc/httpd/logs/*    #添加收集httpd服务日志
    #- /var/log/_.log          #注释这4行
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]
output.logstash:              #取消注释,把日志放到logstash中
  # The Logstash hosts
  hosts: [""]  #取消注释,修改logstash服务器IP和端口
logging.level: info          #调整日志级别


 [root@cong12 ~]# cd /usr/local/filebeat-7.3.0-linux-x86_64/
 [root@cong12 filebeat-7.3.0-linux-x86_64]# ./filebeat &
 [root@cong12 filebeat-7.3.0-linux-x86_64]# ./filebeat -e -c filebeat.yml &
 -e: 记录到stderr并禁用syslog /文件输出
 -c, --c FILE: 指定用于Filebeat的配置文件, 如果未指定-c标志,则使用默认配置文件filebeat.yml
  [root@cong12 ~]# echo " cd /usr/local/filebeat-7.3.0-linux-x86_64/ && ./filebeat -e -c   filebeat.yml & " >> /etc/rc.local 
 [root@cong12 ~]# chmod +x /etc/rc.local


3.9.1 配置Logstash文件

 [root@cong11 ~]# vim /usr/local/logstash-7.3.0/config/http_logstash.conf
    beats {
        codec => plain{charset => "UTF-8"#设置编解码器为utf8
        port => "5044"     
output {
  stdout { #标准输出,把收集的日志在当前终端显示,方便测试服务连通性
      codec => "rubydebug"    #编解码器为rubydebug
  elasticsearch {                #把收集的日志发送给elasticsearch
      hosts => [ "" ]  # elasticsearch的服务器地址
      index => "httpd-logs-%{+YYYY.MM.dd}" #创建索引

启动logstash [root@cong11 ~]# logstash -f /usr/local/logstash-7.3.0/config/http_logstash.conf

这里可以看到file beat发送过来的消息,如果没有新消息,可以去刷新http网页 开机自启动 [root@cong11 ~]# echo “nohup logstash -f /usr/local/logstash-7.3.0/config/http_logstash.conf” >> /etc/rc.local


4.0.1、Logstash filter的使用

logstash 之所以强大和流行,与其丰富的过滤器插件是分不开的 过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的新事件到后续流程中


%{语法 : 语义} “语法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹配出数字,IP 则会匹配出 这样的 IP 地址:%{NUMBER:lasttime}%{IP:client} 默认情况下,所有“语义”都被保存成字符串,你也可以添加转换到的数据类型 %{NUMBER:lasttime:int}%{IP:client} 目前转换类型只支持 int 和 float

4.0.3、使用Grok Filter插件编辑解析Web日志

grok 是一个十分强大的 logstash filter插件,他可以解析任何格式的文本,他是目前 logstash 中解析非结构化日志数据最好的方式. Grok插件使用详解:https://www.elastic.co/guide/en/logstash/7.3/plugins-filters-grok.html

使用grok filter过滤httpd日志 由于grok过滤器插件在传入的日志数据中查找模式,因此配置插件需要你决定如何识别用例感兴趣的模式。Web服务器日志示例中的代表行如下所示:[root@cong11 ~]# vim httpd.log – – [11/Sep/2019:01:59:37 +0800] “GET /noindex/css/fonts/ExtraBoldItalic/OpenSans-ExtraBoldItalic.ttf HTTP/1.1” 404 260 “” “Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko” 该行开头的IP地址很容易识别,括号中的时间戳也是如此。要解析数据,可以使用%{COMBINEDAPACHELOG}grok模式,该模式使用以下模式构建Apache日志中的行:

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}


 [root@cong11 ~]# vim /usr/local/logstash-7.3.0/config/test_logstash.conf
input {
  file {
        path => "/root/httpd.log"         # 收集来源,这里收集系统日志    
        start_position => "beginning"   #从哪里开始监控,beginning表示从logstash进程开始时监控(常用beginning)
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
output {
  stdout {
      codec => "rubydebug"

测试 [root@cong11 ~]# logstash -f /usr/local/logstash-7.3.0/config/test_logstash.conf –config.reload.automatic 该**–config.reload.automatic**选项启用自动配置重新加载,因此不必在每次修改配置文件时停止并重新启动Logstash。

自ES 6.3版本,原先X-Pack的功能免费开放,其中kibana 6.3版本提供Grok调试器,位于Dev(开发工具)页面中最后一个选项 Grok Debugger(Grok调试器),可以使用这个工具测试grok正则语句匹配测试。

 测试数据: - - [11/Sep/2019:01:59:37 +0800"GET /noindex/css/fonts/ExtraBoldItalic/OpenSans-ExtraBoldItalic.ttf HTTP/1.1" 404 260 "" "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"
  "request": "/noindex/css/fonts/ExtraBoldItalic/OpenSans-ExtraBoldItalic.ttf",
  "agent": ""Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"",
  "auth": "-",
  "ident": "-",
  "verb": "GET",
  "referrer": """",
  "response": "404",
  "bytes": "260",
  "clientip": "",
  "httpversion": "1.1",
  "timestamp": "11/Sep/2019:01:59:37 +0800"

或者使用网页版grok的调试器 http://grokdebug.herokuapp.com



Plugin Description Github repository
aggregate Aggregates information from several events originating with a single task logstash-filter-aggregate
alter Performs general alterations to fields that the mutate filter does not handle logstash-filter-alter
bytes Parses string representations of computer storage sizes, such as “123 MB” or “5.6gb”, into their numeric value in bytes logstash-filter-bytes
cidr Checks IP addresses against a list of network blocks logstash-filter-cidr
cipher Applies or removes a cipher to an event logstash-filter-cipher
clone Duplicates events logstash-filter-clone
csv Parses comma-separated value data into individual fields logstash-filter-csv
date Parses dates from fields to use as the Logstash timestamp for an event logstash-filter-date
de_dot Computationally expensive filter that removes dots from a field name logstash-filter-de_dot
dissect Extracts unstructured event data into fields using delimiters logstash-filter-dissect
dns Performs a standard or reverse DNS lookup logstash-filter-dns
drop Drops all events logstash-filter-drop
elapsed Calculates the elapsed time between a pair of events logstash-filter-elapsed
elasticsearch Copies fields from previous log events in Elasticsearch to current events logstash-filter-elasticsearch
environment Stores environment variables as metadata sub-fields logstash-filter-environment
extractnumbers Extracts numbers from a string logstash-filter-extractnumbers
fingerprint Fingerprints fields by replacing values with a consistent hash logstash-filter-fingerprint
geoip Adds geographical information about an IP address logstash-filter-geoip
grok Parses unstructured event data into fields logstash-filter-grok
http Provides integration with external web services/REST APIs logstash-filter-http
i18n Removes special characters from a field logstash-filter-i18n
java_uuid Generates a UUID and adds it to each processed event core plugin
jdbc_static Enriches events with data pre-loaded from a remote database logstash-filter-jdbc_static
jdbc_streaming Enrich events with your database data logstash-filter-jdbc_streaming
json Parses JSON events logstash-filter-json
json_encode Serializes a field to JSON logstash-filter-json_encode
kv Parses key-value pairs logstash-filter-kv
memcached Provides integration with external data in Memcached logstash-filter-memcached
metricize Takes complex events containing a number of metrics and splits these up into multiple events, each holding a single metric logstash-filter-metricize
metrics Aggregates metrics logstash-filter-metrics
mutate Performs mutations on fields logstash-filter-mutate
prune Prunes event data based on a list of fields to blacklist or whitelist logstash-filter-prune
range Checks that specified fields stay within given size or length limits logstash-filter-range
ruby Executes arbitrary Ruby code logstash-filter-ruby
sleep Sleeps for a specified time span logstash-filter-sleep
split Splits multi-line messages into distinct events logstash-filter-split
syslog_pri Parses the PRI (priority) field of a syslog message logstash-filter-syslog_pri
threats_classifier Enriches security logs with information about the attacker’s intent logstash-filter-threats_classifier
throttle Throttles the number of events logstash-filter-throttle
tld Replaces the contents of the default message field with whatever you specify in the configuration logstash-filter-tld
translate Replaces field contents based on a hash or YAML file logstash-filter-translate
truncate Truncates fields longer than a given length logstash-filter-truncate
urldecode Decodes URL-encoded fields logstash-filter-urldecode
useragent Parses user agent strings into fields logstash-filter-useragent
uuid Adds a UUID to events logstash-filter-uuid
xml Parses XML into fields logstash-filter-xml

