这是另外一个服务,负责从kafka中读取数据,并传到es引擎中。
主干逻辑
package main
import (
"logtransfer/kafka"
"logtransfer/es"
"logtransfer/model"
"github.com/go-ini/ini"
"github.com/sirupsen/logrus"
)
func main(){
// 1. 加载配置文件
var cfg = new(model.Config)
err:=ini.MapTo(cfg, "./config/config.ini")
if err!=nil {
logrus.Error("配置参数加载错误", err)
}
logrus.Info("配置参数加载成功")
// 连接ES
err=es.InitES(cfg.ESConf)
if err!=nil {
logrus.Error("连接ES或者发送数据失败")
}
logrus.Info("初始化es成功")
// 连接kafka
err=kafka.InitKafka(cfg.KafkaConf)
if err!=nil {
logrus.Error("初始化kafka失败")
}
logrus.Info("初始化kafka成功")
select{}
}
先加载配置参数,包含这些
[kafka]
address=127.0.0.1:9092
topic=log1
[es]
address=127.0.0.1:9200
index=web
max_chan_size=10000
goroutine_num=16
定义这些model来接收配置参数
package model
type Config struct {
KafkaConf `ini:"kafka"`
ESConf `ini:"es"`
}
type KafkaConf struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
type ESConf struct {
Address string `ini:"address"`
Index string `ini:"index"`
MaxSize int `ini:"max_chan_size"`
GoNum int `ini:"goroutine_num"`
}
之后连接ES,并从channel中读取数据传输到es
之后连接kafka,从kafka消费数据,并向channel中写入数据,涉及到channel的操作大多是异步的。
连接ES
package es
import (
"context"
// "encoding/json"
"fmt"
"logtransfer/model"
"github.com/olivere/elastic/v7"
"github.com/sirupsen/logrus"
)
type ESClient struct {
client *elastic.Client
index string
logDataChan chan interface{}
}
var esClient *ESClient
func InitES(esConf model.ESConf) (err error){
client, err:=elastic.NewClient(elastic.SetURL("http://"+esConf.Address))
// client, err:=elastic.NewClient(elastic.SetURL(esConf.Address))
if err!=nil {
logrus.Error("es连接错误")
}
logrus.Info("es连接成功")
esClient=&ESClient{
client: client,
index: esConf.Index,
logDataChan: make(chan interface{}, esConf.MaxSize),
}
for i:=0;i<esConf.GoNum;i++ {
go sendToES()
}
return
}
func sendToES() {
fmt.Println(esClient.index)
for msg:=range esClient.logDataChan {
fmt.Println("esClient.logDataChan", msg)
put1, err:=esClient.client.Index().Index(esClient.index).BodyJson(msg).Do(context.Background())
if err!=nil {
logrus.Error("向es发送数据失败", err)
}
logrus.Infof("向es发送数据成功 user %s to index %s type %s\n", put1.Id, put1.Index, put1.Type)
}
}
// 通过一个首字母大写的函数从包外接收msg,发送到chan中
func PutLogData(msg interface{}){
esClient.logDataChan<-msg
}
InitES主要是初始化ES的连接
然后开启一些向es传输数据的协程 sendToES
sendToES是传输数据的逻辑,这里只实现了map格式数据的传输,即msg是要是map数据格式的,其它类型的数据会报错。
连接Kafka并消费
package kafka
import (
"logtransfer/es"
"logtransfer/model"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)
func InitKafka(kconfig model.KafkaConf) (err error){
// 创建消费者
cons, err:=sarama.NewConsumer([]string{kconfig.Address}, nil)
if err!=nil {
logrus.Error("获取消费者失败")
}
// 获取指定topic下的所有分区列表
partitionList,err:=cons.Partitions(kconfig.Topic)
if err!=nil {
logrus.Error("获取分区列表失败")
}
for partition:=range partitionList {
// 获取每个分区的消费者
var consP sarama.PartitionConsumer
consP, err=cons.ConsumePartition(kconfig.Topic, int32(partition), sarama.OffsetNewest)
if err!=nil {
logrus.Error("获取分区消费者失败")
}
go func(sarama.PartitionConsumer) {
for msg:=range consP.Messages() {
logrus.Info(msg.Topic, string(msg.Value))
// var res map[string]interface{}
var res map[string]interface{}
err=json.Unmarshal(msg.Value, &res)
if err!=nil {
logrus.Error("kafka json解析失败", err)
}
logrus.Info("kafka json解析成功", res)
es.PutLogData(res)
}
}(consP)
}
return
}
InitKafka初始化kafka连接,并建立消费者,读取的数据要求是son格式的,能解析成map对象的,就要求logagent收集的日志数据是map格式的了。这一部分之后可以完善下支持一般的日志内容类型。
es.PutLogData(res) 将kafka读取的内容通过channel发送给es传输前的协程。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/133467.html