部署kafka
为了防止收集的日志信息太多或者是服务器down机导致的信息丢失,我们这里引入kafka消息队列服务器,我们这里搭建单节点的kafka,生产环境中应该使用集群方式部署。
部署jdk
由于zookeeper依赖java环境,所以我们需要安装jdk,官网建议最低安装jdk 1.8版本
上传软件包
anaconda-ks.cfg apache-zookeeper-3.5.5-bin.tar.gz jdk-8u171-linux-x64.tar.gz kafka_2.12-2.2.0.tgz
解压jdk
tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local/
配置jdk环境变量
vim /etc/profile
在文件最后一行加入以下内容
JAVA_HOME=/usr/local/jdk1.8.0_171
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH JAVA_HOME CLASSPATH
source /etc/profile #使环境变量生效
安装zookeeper
配置hosts文件
[root@server03 conf]# vim /etc/hosts
添加本机对应的IP和主机名
解压软件包
[root@server03 conf]# tar -zxf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local/
创建快照日志存放目录
mkdir -p /data/zk/data
创建事务日志存放目录
mkdir -p /data/zk/datalog
生成配置文件
cd /usr/local/apache-zookeeper-3.5.5-bin/conf/
cp zoo_sample.cfg zoo.cfg #复制一份zoo_sample.cfg文件并命名为zoo.cfg
修改主配置文件zoo.cfg
vim zoo.cfg
dataDir=/data/zk/data #修改这一行为我们创建的目录
dataLogDir=/data/zk/datalog #添加这一行
添加path环境变量
这里必须是修改配置文件添加path环境变量,不然启动报错
vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.5.5-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile
启动zookeeper
zkServer.sh start
添加开机自启动
将Zookeeper添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service
vim /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target[Service]
Type=forking
Environment=”JAVA_HOME=/usr/local/jdk1.8.0_171″
User=root
Group=root
ExecStart=/usr/local/apache-zookeeper-3.5.5-bin/bin/zkServer.sh start
ExecStop=/usr/local/apache-zookeeper-3.5.5-bin/bin/zkServer.sh stop[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable zookeeper
Kafka 单节点单Broker部署
这里部署的是kafka单节点,生产环境应该部署kafka多节点集群。
解压软件到指定目录
tar -zxvf kafka_2.12-2.2.0.tgz -C /usr/local/
修改配置文件
vim /usr/local/kafka_2.12-2.2.0/config/server.properties
# broker的全局唯一编号,不能重复
broker.id=0
# 监听
listeners=PLAINTEXT://:9092 #开启此项
# 日志目录
log.dirs=/data/kafka/log #修改日志目录
# 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)
zookeeper.connect=localhost:2181
创建日志目录
mkdir -p /data/kafka/log
添加path环境变量
vim /etc/profile #文件最后天以下行
export KAFKA_HOME=/usr/local/kafka_2.12-2.2.0
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile #生效配置文件
启动kafka
kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.2.0/config/server.properties
添加开机自启动
将kafka添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service
vim /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service[Service]
Type=simple
Environment=”PATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin”
User=root
Group=root
ExecStart=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-2.2.0/config/server.properties
ExecStop=/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
Restart=on-failure[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable kafka
设置filebeat
修改filebeat.yml配置文件
我们这里修改filebeat配置文件,把filebeat收集到的nginx日志保存到kafka消息队列中。把output.elasticsearch和output.logstash都给注释掉,添加kafka项
vim /usr/local/filebeat/filebeat.yml
– type: log
enabled: true #开启此配置
paths:
– /usr/local/nginx/logs/*.log #添加收集nginx服务日志
#- /var/log/*.log #注释该行
#————————– Elasticsearch output ——————————
#output.elasticsearch: # Elasticsearch这部分全部注释掉
# Array of hosts to connect to.
#hosts: [“localhost:9200”]
#—————————– Logstash output ——————————–
#output.logstash: # logstash这部分全部注释掉
# The Logstash hosts
#hosts: [“192.168.1.11:5044”]
#在Logstash后面添加如下行
#—————————– KAFKA output ——————————–
output.kafka: #把日志发送给kafka
enabled: true #开启kafka模块
hosts: [“192.168.1.13:9092”] #填写kafka服务器地址
topic: nginx_logs #填写kafka的topic(主题),自定义的
#logging.level: warning #调整日志级别
把output.elasticsearch和output.logstash都给注释掉,然后在output.logstash结尾添加KAFKA output,把日志数据发送给kafka。需要注意的是kafka中如果不存在这个topic,则会自动创建。如果有多个kafka服务器,可用逗号分隔。
修改hosts文件
vim /etc/hosts
192.168.121.11 server02
192.168.121.12 server03
这里需要添加kafka的hosts解析,如果不添加会报如下错误
重启filebeat
ps -ef | grep filebeat | grep -v "grep"
kill 6588
cd /usr/local/filebeat/ && ./filebeat -e -c filebeat.yml &
查看kafka上所以的topic信息
在kafka服务器上查看filebeat保存的数据,topice为nginx_logs
kafka-topics.sh --list --zookeeper localhost:2181
nginx_logs
启动一个消费者获取信息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic nginx_logs --from-beginning
启动一个消费者去查看filebeat发送过来的消息,能看到消息说明我们的filebeat的output.kafka配置成功。接下来配置logstash去kafka消费数据
配置logstash
配置logstash去kafka拿取数据,进行数据格式化,然后把格式化的数据保存到Elasticsearch,通过kibana展示给用户。kibana是通过Elasticsearch进行日志搜索的。
filebeat中message要么是一段字符串,要么在日志生成的时候拼接成json,然后在filebeat中指定为json。但是大部分系统日志无法去修改日志格式,filebeat则无法通过正则去匹配出对应的field,这时需要结合logstash的grok来过滤。
Logstash filter 的使用
添加hosts解析
vim /etc/hosts
192.168.121.10 server01
192.168.121.11 server02
192.168.121.12 server03
修改logstash配置文件
vim /usr/local/logstash-7.3.0/config/http_logstash.conf
input{
kafka {
codec => plain{charset => “UTF-8”}
bootstrap_servers => “192.168.121.12:9092”
client_id => “httpd_logs” #这里设置client.id和group.id是为了做标识
group_id => “httpd_logs”
consumer_threads => 5 #设置消费kafka数据时开启的线程数,一个partition对应一个消费者消费,设置多了不影响,在kafka中一个进程对应一个线程
auto_offset_reset => “latest” #从最新的偏移量开始消费
decorate_events => true #此属性会将当前topic,offset,group,partition等信息>也带到message中
topics => “nginx_logs”
}
}
output {
stdout {
codec => “rubydebug”
}
elasticsearch {
hosts => [ “192.168.121.10:9200” ]
index => “nginx-logs-%{+YYYY.MM.dd}”
}}
可以使用相同的group_id方式运行多个Logstash实例,以跨物理机分布负载。主题中的消息将分发到具有相同的所有Logstash实例group_id
重启logstash
kill 9068
logstash -f /usr/local/logstash-7.3.0/config/http_logstash.conf
刷新nginx页面,产生数据
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/22229.html