ELK+filebeat+kafka部署—3(kafka部署)

导读:本篇文章讲解 ELK+filebeat+kafka部署—3(kafka部署),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

部署kafka

为了防止收集的日志信息太多或者是服务器down机导致的信息丢失,我们这里引入kafka消息队列服务器,我们这里搭建单节点的kafka,生产环境中应该使用集群方式部署。

部署jdk

由于zookeeper依赖java环境,所以我们需要安装jdk,官网建议最低安装jdk 1.8版本

上传软件包

anaconda-ks.cfg  apache-zookeeper-3.5.5-bin.tar.gz  jdk-8u171-linux-x64.tar.gz  kafka_2.12-2.2.0.tgz

解压jdk

tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local/

配置jdk环境变量

vim /etc/profile

在文件最后一行加入以下内容

 JAVA_HOME=/usr/local/jdk1.8.0_171 
 PATH=$JAVA_HOME/bin:$PATH
 CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar 
 export PATH JAVA_HOME CLASSPATH
source /etc/profile   #使环境变量生效

安装zookeeper

配置hosts文件

[root@server03 conf]# vim /etc/hosts

添加本机对应的IP和主机名

ELK+filebeat+kafka部署—3(kafka部署)

 解压软件包

[root@server03 conf]# tar -zxf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local/

创建快照日志存放目录

mkdir -p /data/zk/data

创建事务日志存放目录

mkdir -p /data/zk/datalog

生成配置文件

cd /usr/local/apache-zookeeper-3.5.5-bin/conf/
 cp zoo_sample.cfg  zoo.cfg  #复制一份zoo_sample.cfg文件并命名为zoo.cfg

修改主配置文件zoo.cfg

vim zoo.cfg

dataDir=/data/zk/data        #修改这一行为我们创建的目录
dataLogDir=/data/zk/datalog   #添加这一行

添加path环境变量

这里必须是修改配置文件添加path环境变量,不然启动报错

vim /etc/profile

    export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.5.5-bin
    export PATH=$ZOOKEEPER_HOME/bin:$PATH

source /etc/profile

启动zookeeper

zkServer.sh start

添加开机自启动

将Zookeeper添加到开机自启服务
 在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service

vim /lib/systemd/system/zookeeper.service

[Unit]
Description=Zookeeper service
After=network.target

[Service]
Type=forking
Environment=”JAVA_HOME=/usr/local/jdk1.8.0_171″
User=root
Group=root
ExecStart=/usr/local/apache-zookeeper-3.5.5-bin/bin/zkServer.sh start
ExecStop=/usr/local/apache-zookeeper-3.5.5-bin/bin/zkServer.sh stop

[Install]
WantedBy=multi-user.target

 systemctl daemon-reload 
 systemctl enable zookeeper

Kafka 单节点单Broker部署

这里部署的是kafka单节点,生产环境应该部署kafka多节点集群。

解压软件到指定目录

tar -zxvf kafka_2.12-2.2.0.tgz -C /usr/local/

修改配置文件

vim /usr/local/kafka_2.12-2.2.0/config/server.properties

# broker的全局唯一编号,不能重复
broker.id=0
# 监听
listeners=PLAINTEXT://:9092  #开启此项
# 日志目录
log.dirs=/data/kafka/log      #修改日志目录
# 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)
zookeeper.connect=localhost:2181

 创建日志目录

mkdir -p /data/kafka/log

添加path环境变量

vim /etc/profile #文件最后天以下行

export KAFKA_HOME=/usr/local/kafka_2.12-2.2.0
export PATH=$KAFKA_HOME/bin:$PATH

source /etc/profile   #生效配置文件

启动kafka

kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties

 添加开机自启动

 将kafka添加到开机自启服务
 在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service

vim /lib/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
Environment=”PATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin”
User=root
Group=root
ExecStart=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.2.0/config/server.properties
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

 systemctl daemon-reload 
 systemctl enable kafka

设置filebeat

修改filebeat.yml配置文件

我们这里修改filebeat配置文件,把filebeat收集到的nginx日志保存到kafka消息队列中。把output.elasticsearch和output.logstash都给注释掉,添加kafka项

vim /usr/local/filebeat/filebeat.yml

– type: log
  enabled: true              #开启此配置
  paths:
    – /usr/local/nginx/logs/*.log     #添加收集nginx服务日志
    #- /var/log/*.log          #注释该行
#————————– Elasticsearch output ——————————
#output.elasticsearch:        # Elasticsearch这部分全部注释掉
  # Array of hosts to connect to.
  #hosts: [“localhost:9200”]
#—————————– Logstash output ——————————–
#output.logstash:      # logstash这部分全部注释掉
  # The Logstash hosts
  #hosts: [“192.168.1.11:5044”]
#在Logstash后面添加如下行
#—————————– KAFKA output ——————————–
output.kafka:            #把日志发送给kafka
  enabled: true          #开启kafka模块
  hosts: [“192.168.1.13:9092”]  #填写kafka服务器地址
  topic: nginx_logs        #填写kafka的topic(主题),自定义的
#logging.level: warning          #调整日志级别

 把output.elasticsearch和output.logstash都给注释掉,然后在output.logstash结尾添加KAFKA output把日志数据发送给kafka。需要注意的是kafka中如果不存在这个topic,则会自动创建。如果有多个kafka服务器,可用逗号分隔。

修改hosts文件

vim /etc/hosts

192.168.121.11 server02
192.168.121.12 server03

 这里需要添加kafka的hosts解析,如果不添加会报如下错误

ELK+filebeat+kafka部署—3(kafka部署)

重启filebeat

ps -ef | grep filebeat | grep -v "grep"
kill 6588
cd /usr/local/filebeat/ && ./filebeat -e -c filebeat.yml &

查看kafka上所以的topic信息

在kafka服务器上查看filebeat保存的数据,topice为nginx_logs

kafka-topics.sh --list --zookeeper localhost:2181

nginx_logs

启动一个消费者获取信息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_logs --from-beginning

ELK+filebeat+kafka部署—3(kafka部署)

 启动一个消费者去查看filebeat发送过来的消息,能看到消息说明我们的filebeat的output.kafka配置成功。接下来配置logstash去kafka消费数据

配置logstash

 配置logstash去kafka拿取数据,进行数据格式化,然后把格式化的数据保存到Elasticsearch,通过kibana展示给用户。kibana是通过Elasticsearch进行日志搜索的。
 filebeat中message要么是一段字符串,要么在日志生成的时候拼接成json,然后在filebeat中指定为json。但是大部分系统日志无法去修改日志格式,filebeat则无法通过正则去匹配出对应的field,这时需要结合logstash的grok来过滤。
 Logstash filter 的使用

添加hosts解析

 vim /etc/hosts

192.168.121.10 server01

192.168.121.11 server02

192.168.121.12 server03

 修改logstash配置文件

vim /usr/local/logstash-7.3.0/config/http_logstash.conf

input{
    kafka {
        codec => plain{charset => “UTF-8”}
        bootstrap_servers => “192.168.121.12:9092”
        client_id => “httpd_logs”  #这里设置client.id和group.id是为了做标识
        group_id => “httpd_logs”
        consumer_threads => 5    #设置消费kafka数据时开启的线程数,一个partition对应一个消费者消费,设置多了不影响,在kafka中一个进程对应一个线程
        auto_offset_reset => “latest”  #从最新的偏移量开始消费
        decorate_events => true  #此属性会将当前topic,offset,group,partition等信息>也带到message中
        topics => “nginx_logs”
    }
}
output {
  stdout {
      codec => “rubydebug”
  }
  elasticsearch {
      hosts => [ “192.168.121.10:9200” ]
      index => “nginx-logs-%{+YYYY.MM.dd}”
  }

}

 可以使用相同的group_id方式运行多个Logstash实例,以跨物理机分布负载。主题中的消息将分发到具有相同的所有Logstash实例group_id

重启logstash

 kill 9068
 logstash  -f /usr/local/logstash-7.3.0/config/http_logstash.conf

刷新nginx页面,产生数据

ELK+filebeat+kafka部署—3(kafka部署)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/22229.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!