目录
一、传统日志收集的弊端
我们知道我们大多数是通过日志,然后判断程序哪里报错了,这样针对日志我们才能对症下一剂猛药。如果在集群环境中,成百上千的服务器,如果报错了,我们如何查找日志呢,一个一个日志文件这样排查么?那可就为难死我们了。
二、ELK收集系统过程
基于Elasticsearch、Logstash、Kibana可以实现分布式日志收集系统,再加上Kibana的可视化系统,对数据进行分析,嗯真香。
在请求过程中创建AOP,拦截请求,然后在Aop方法中开启异步线程,将消息发送到Kafka(单机或者集群),logstash接收kafka的日志,经过消息过滤,然后发送到ElasticSearch系统,然后经过Kibana可视化界面,对日志进行搜索分析等。
三、搭建ELK系统
Zookeeper搭建:搭建ZooKeeper3.7.0集群(传统方式&Docker方式)_熟透的蜗牛的博客-CSDN博客
Kafka搭建:Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka_熟透的蜗牛的博客-CSDN博客
ElasticSearch搭建:
Elasticsearch的安装(传统方式&docker方式)&整合Springboot_熟透的蜗牛的博客-CSDN博客
Kibana搭建:
Kibana的安装&整合ElasticSearch_熟透的蜗牛的博客-CSDN博客
Logstash搭建
LogStash的安装(传统方式&Docker)与使用_熟透的蜗牛的博客-CSDN博客
本文演示基于Docker-compose,所有的均为单机,如需集群请参考上述搭建方式
1、搭建docker-compose
#下载docker-compose文件
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
#授权
sudo chmod +x /usr/local/bin/docker-compose
2、创建目录
mkdir -p /usr/local/docker-compose/elk
3、在上面目录创建docker-compose.yml文件
version: '2'
services:
zookeeper:
image: zookeeper:latest
container_name: zookeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
container_name: kafka
volumes:
- /etc/localtime:/etc/localtime
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.139.160
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: 120
KAFKA_MESSAGE_MAX_BYTES: 10000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
KAFKA_NUM_PARTITIONS: 3
KAFKA_DELETE_RETENTION_MS: 1000
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
restart: always
container_name: elasticsearch
environment:
- discovery.type=single-node #单点启动,实际生产不允许
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- 9200:9200
kibana:
image: docker.elastic.co/kibana/kibana:7.15.2
restart: always
container_name: kibana
ports:
- 5601:5601
environment:
- elasticsearch_url=http://192.168.139.160:9200
depends_on:
- elasticsearch
logstash:
image: docker.elastic.co/logstash/logstash:7.15.2
volumes:
- /data/logstash/pipeline/:/usr/share/logstash/pipeline/
- /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
- /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml
restart: always
container_name: logstash
ports:
- 9600:9600
depends_on:
- elasticsearch
4、启动.
#进入docker-compose所在的目录执行
[root@localhost elk]# docker-compose up
四、代码
切面类
package com.xiaojie.elk.aop;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.elk.pojo.RequestPojo;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
* @author xiaojie
* @version 1.0
* @description: 日志切面类
* @date 2021/12/5 16:51
*/
@Aspect
@Component
public class AopLogAspect {
@Value("${server.port}")
private String serverPort;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 申明一个切点 里面是 execution表达式
@Pointcut("execution(* com.xiaojie.elk.service.*.*(..))")
private void serviceAspect() {
}
@Autowired
private LogContainer logContainer;
// 请求method前打印内容
@Before(value = "serviceAspect()")
public void methodBefore(JoinPoint joinPoint) {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
RequestPojo requestPojo = new RequestPojo();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
requestPojo.setRequestTime(df.format(new Date()));
requestPojo.setUrl(request.getRequestURL().toString());
requestPojo.setMethod(request.getMethod());
requestPojo.setSignature(joinPoint.getSignature().toString());
requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
// IP地址信息
requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
// 将日志信息投递到kafka中
String log = JSONObject.toJSONString(requestPojo);
logContainer.put(log);
}
// 在方法执行完结后打印返回内容
/* @AfterReturning(returning = "o", pointcut = "serviceAspect()")
public void methodAfterReturing(Object o) {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
JSONObject respJSONObject = new JSONObject();
JSONObject jsonObject = new JSONObject();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
jsonObject.put("response_time", df.format(new Date()));
jsonObject.put("response_content", JSONObject.toJSONString(o));
// IP地址信息
jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
respJSONObject.put("response", jsonObject);
logContainer.put(respJSONObject.toJSONString());
}*/
/**
* 异常通知
*
* @param point
*/
@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
public void serviceAspect(JoinPoint joinPoint, Exception e) {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
RequestPojo requestPojo = new RequestPojo();
requestPojo.setRequestTime(df.format(new Date()));
requestPojo.setUrl(request.getRequestURL().toString());
requestPojo.setMethod(request.getMethod());
requestPojo.setSignature(joinPoint.getSignature().toString());
requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
// IP地址信息
requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
requestPojo.setError(e.toString());
// 将日志信息投递到kafka中
String log = JSONObject.toJSONString(requestPojo);
logContainer.put(log);
}
public static String getIpAddr(HttpServletRequest request) {
//X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
String ipAddress = request.getHeader("x-forwarded-for");
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
ipAddress = request.getHeader("Proxy-Client-IP");
}
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
ipAddress = request.getHeader("WL-Proxy-Client-IP");
}
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
ipAddress = request.getRemoteAddr();
if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {
//根据网卡取本机配置的IP
InetAddress inet = null;
try {
inet = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
ipAddress = inet.getHostAddress();
}
}
//对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15
if (ipAddress.indexOf(",") > 0) {
ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
}
}
return ipAddress;
}
}
异步线程
package com.xiaojie.elk.aop;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author xiaojie
* @version 1.0
* @description: 开启异步线程发送日志
* @date 2021/12/5 16:50
*/
@Component
public class LogContainer {
private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public LogContainer() {
// 初始化
new LogThreadKafka().start();
}
/**
* 存入日志
*
* @param log
*/
public void put(String log) {
logDeque.offer(log);
}
class LogThreadKafka extends Thread {
@Override
public void run() {
while (true) {
String log = logDeque.poll();
if (!StringUtils.isEmpty(log)) {
// 将消息投递kafka中
kafkaTemplate.send("kafka-log", log);
}
}
}
}
}
五、验证效果
完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 elk模块
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18489.html