数据采集工具之Flume的常用采集方式详细使用示例
Flume
Flume概述
官网:https://flume.apache.org/
文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
Apache Flume 是 Apache软件基金会的顶级项目。Flume初始发行版本被统称为Flume OG(original generation),属于cloudera。后来重构核心组件、核心配置以及代码架构,重构后的版本统称为Flume NG(next generation)
Apache Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Apache Flume 的使用不仅限于日志数据聚合。由于数据源是可定制的,Flume可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎任何可能的数据源。
Flume架构
外部数据源以特定格式向 Flume 发送 events (事件),当 source 接收到 events 时,它将其存储到一个或多个 channel,channe 会一直保存 events 直到它被 sink 所消费。sink 的主要功能从 channel 中读取 events,并将其存入外部存储系统或转发到下一个 source,成功后再从 channel 中移除 events。
Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。
核心的组件
Flume运行的核心是Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。相当于一个数据传递者 ,一个独立的Flume进程。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。通过这些组件, Event可以从一个地方流向另一个地方
Client
Client
:Client生产数据,运行在一个独立的线程
Event
Event
: 一个数据传输的基本单元,消息头和消息体组成,前者是键/值映射,后者是任意字节数组
Agent
Agent
:相当于一个数据传递者 ,一个独立独立的 (JVM) 进程,包含组件Source、 Channel、 Sink
Source
Source
是数据收集组件,负责将数据采集后进行特殊格式化,将数据封装到事件event里,然后将事件推入Channel中。
使用详情参考:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources
Channel
: 传输通道组件,中转Event的一个临时存储,可以看做一个数据的缓冲区(数据队列),保存由Source组件传递过来的Event。可以是内存或持久化的文件系统:
Memory Channel : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机)
File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢
使用详情参考:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channels
Sink
: 下沉组件,从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent或者往最终存储系统传递数据
使用详情参考:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sinks
当需要对数据进行过滤时,除了在Source、 Channel和Sink进行代码修改之外, Flume提供了拦截器,拦截器也是chain形式的。拦截器的位置在Source和Channel之间,当为Source指定拦截器后,在拦截器中会得到event,根据需求可以对event进行保留还是 抛弃,抛弃的数据不会进入Channel中。
常用Channel、Sink、Source类型
Channel支持的类型
Sink支持的类型
Source支持的类型
Flume架构模式
架构1:
Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 Avro 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口
Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。
Flume 支持从一个 Source向多个 Channel,也就是向多个 Sink 传递事件。默认情况下是向所有的 Channel复制 Event,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 Source 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。
安装Flume
下载地址:http://archive.apache.org/dist/flume/
wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume
查看jdk安装路径
find / -name java
修改配置文件
cd flume/conf
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
# export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export JAVA_HOME=JAVA_HOME=/usr/local/jdk1.8/
# 建议: 当chnnel设置为内存存储的时候,给Flume分配更多的内存
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
修改环境变量
vim /etc/profile
export FLUME_HOME=/usr/local/program/flume
export PATH=$FLUME_HOME/bin:$PATH
刷新配置文件
source /etc/profile
验证
flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
Flume的基本使用
监控端口数据参考: https://flume.apache.org/FlumeUserGuide.html#a-simple-example
编写配置文件
根据数据采集的需求配置采集方案,描述在配置文件中
在flume的conf目录下新建一个配置文件
vim /usr/local/program/flume/conf/example.conf
配置Agent实例各组件名称
#a1:Agent实例的名称
#sources名称
a1.sources = r1
# sinks的名称
a1.sinks = k1
# channels的名称
a1.channels = c1
配置Source
# source类型:从网络套接字/端口接收数据
a1.sources.r1.type = netcat
# 监听地址
a1.sources.r1.bind = node001
# 监听端口
a1.sources.r1.port = 44444
配置Channel
# 数据存放位置,存放内存中
a1.channels.c1.type = memory
# 通道中最大可存储的event数量
a1.channels.c1.capacity = 1000
# 一次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.transactionCapacity = 100
配置Sink
# 数据存放位置,存放日志系统中
a1.sinks.k1.type = logger
将source和sink绑定到channel
# source使用的channel
a1.sources.r1.channels = c1
# sink使用的channel
a1.sinks.k1.channel = c1
启动Agent实例
指定采集方案配置文件,在相应的节点上启动flume agent
-c conf 指定flume自身的配置文件所在目录
-f conf/example.con 指定描述的采集方案
-n a1 指定agent的名字
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
启动日志
2022-03-08 20:43:58,557 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-08 20:43:58,558 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-08 20:43:58,560 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2022-03-08 20:43:58,581 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.22.4.21:44444]
测试
安装telnet
yum -y install telnet
模拟发送数据
[root@administrator ~]# telnet node001 44444
Trying 172.22.4.21...
Connected to node01.
Escape character is '^]'.
Hello World
OK
收到数据
2022-03-08 20:44:21,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 0D Hello World. }
采集目录数据到HDFS
采集目录数据使用Spooling Source
,参考:https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
编写配置文件
vim /usr/local/program/flume/conf/spooldir.conf
1.配置Agent实例各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
2.配置Source
# Source类型:监控某目录下文件的变化
a1.sources.r1.type = spooldir
# 监控目录的路径
a1.sources.r1.spoolDir= /usr/local/program/flume/tempData
# 是否添加文件头部信息
a1.sources.r1.fileHeader= true
3.配置Channel
# 数据存放内存中
a1.channels.c1.type = memory
# 存放数据量,指存放Event数量,即每条/每行数据
a1.channels.c1.capacity = 1000
# 向Sink组件一次性传递多少Event数据量(队列);从chanel中获取event存入队列,队列满了发送Sink
a1.channels.c1.transactionCapacity = 100
4.配置Sink
hdfs sink
https://flume.apache.org/FlumeUserGuide.html#hdfs-sink
# sink类型:hdfs
a1.sinks.k1.type = hdfs
# 数据存放hdfs的位置
a1.sinks.k1.hdfs.path = hdfs://node01:9000/flume/spooldir/files/%y-%m-%d/%H%M/
# 文件前缀
a1.sinks.k1.hdfs.filePrefix = events-
# 文件是否滚动,产生新的文件
a1.sinks.k1.hdfs.round = true
# 文件夹滚动周期,即生成文件夹;每10分钟产生1个文件夹
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 每隔3秒产生一个新文件
a1.sinks.k1.hdfs.rollInterval = 3
# 文件(hdfs临时)大小达到20字节产生新文件
a1.sinks.k1.hdfs.rollSize = 20
# 文件(hdfs临时)大小达到5个event,产生新文件
a1.sinks.k1.hdfs.rollCount = 5
# 一次从channel向队列存入的event个数;batchSize < transactionCapacity < capacity
a1.sinks.k1.hdfs.batchSize = 1
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
5.将source和sink绑定到channel
# sources对应的channel
a1.sources.r1.channels = c1
# sink对应的channel
a1.sinks.k1.channel = c1
启动Agent实例
bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console
启动日志
2022-03-08 22:20:02,045 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-08 22:20:02,200 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-08 22:20:02,203 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-08 22:20:02,205 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:85)] SpoolDirectorySource source starting with directory: /usr/local/program/flume/tempData
2022-03-08 22:20:02,206 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-08 22:20:02,209 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-08 22:20:02,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-08 22:20:02,255 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
测试
在监控目录新建文件
cd /usr/local/program/flume/tempData
echo "hello world" >> spooldir.txt
采集日志
2022-03-08 22:22:37,622 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2022-03-08 22:22:37,624 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /usr/local/program/flume/tempData/spooldir.txt to /usr/local/program/flume/tempData/spooldir.txt.COMPLETED
2022-03-08 22:22:40,262 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false
2022-03-08 22:22:40,573 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp
2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp
2022-03-08 22:22:46,664 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257.tmp to hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257
查看文件是否被采集
采集完成的文件,会被agent自动添加一个后缀:COMPLETED
ls /usr/local/program/flume/tempData
spooldir.txt.COMPLETED
采集文件数据到HDFS
采集文件数据使用Exec Source
,参考:https://flume.apache.org/FlumeUserGuide.html#exec-source
编写配置文件
vim /usr/local/program/flume/conf/exec.conf
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 监控文件内容更新
agent1.sources.source1.type = exec
# 监控那个文件
agent1.sources.source1.command = tail -F /usr/local/program/flume/tempData/test.txt
# sink类型:hdfs
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://node01:9000/flume/exec/files/%y-%m-%d/%H%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
# 最大打开文件
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
# 写文件类型,普通类型
agent1.sinks.sink1.hdfs.fileType = DataStream
# 写文件的格式
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.channels.channel1.type = memory
# event添加到通道中或者移出的允许时间
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 5000
agent1.channels.channel1.transactionCapacity = 600
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
启动Agent实例
bin/flume-ng agent -c conf -f conf/exec.conf -n agent1 -Dflume.root.logger=INFO,console
启动日志
2022-03-08 22:41:39,484 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink sink1
2022-03-08 22:41:39,485 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source source1
2022-03-08 22:41:39,487 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt
2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: sink1 started
2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: source1 started
向监控文件写入数据
echo "hello world" >> /usr/local/program/flume/tempData/test.txt
查看监控日志
2022-03-08 22:41:42,555 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2022-03-08 22:41:42,975 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp
2022-03-08 22:42:16,054 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-08 22:42:16,455 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp
2022-03-08 22:42:16,497 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556.tmp to hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556
采集文件数据到Kafka
编写配置文件
vim /usr/local/program/flume/conf/exec2kafka.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumeTopic
a1.sinks.k1.brokerList = node001:9092,node002:9092,node003:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 10
a1.sinks.k1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
创建Topic
kafka-topics.sh --zookeeper node001:2181 --create --replication-factor 3 --partitions 3 --topic flumeTopic
启动消费者
kafka-console-consumer.sh --zookeeper node001:2181 --from-beginning --topic flumeTopic
启动Flume
flume-ng agent -n a1 -f /usr/local/program/flume/conf/exec2kafka.conf -Dflume.root.logger=INFO,console
多个Agent级联
Agent 级联过程:
1.第一个agent(1号服务器)收集数据,通过网络发送到第二个agent(2号服务器)
2.第二个agent(2号服务器)接收第一个agent(1号服务器)发送的数据,并将数据保存到hdfs
配置Agent1
配置第一个Agent实例
agent1.sources = r1
agent1.sinks = k1
agent1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
agent1.sources.r1.channels = c1
agent1.sinks = k1
# sink端的avro是一个数据发送者
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = node01
agent1.sinks.k1.port = 4141
agent1.sinks.k1.batch-size = 10
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
配置Agent2
配置第二个Agent实例,由于使用一台服务器,故修改Agent2实例名称位agent2,Agent三组件名称应该可以同Agent1一样。
agent2.sources = r2
agent2.sinks = k2
agent2.channels = c2
## source中的avro组件是一个接收者服务
agent2.sources.r2.type = avro
agent2.sources.r2.channels = c2
agent2.sources.r2.bind = node01
agent2.sources.r2.port = 4141
# Describe the sink
agent2.sinks.k2.type = hdfs
agent2.sinks.k2.hdfs.path = hdfs://node01:9000/flume/avro/files/%y-%m-%d/%H%M/
agent2.sinks.k2.hdfs.filePrefix = events-
agent2.sinks.k2.hdfs.round = true
agent2.sinks.k2.hdfs.roundValue = 10
agent2.sinks.k2.hdfs.roundUnit = minute
agent2.sinks.k2.hdfs.rollInterval = 3
agent2.sinks.k2.hdfs.rollSize = 20
agent2.sinks.k2.hdfs.rollCount = 5
agent2.sinks.k2.hdfs.batchSize = 1
agent2.sinks.k2.hdfs.useLocalTimeStamp = true
agent2.sinks.k2.hdfs.fileType = DataStream
agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2
启动Agent实例
先启动第二个Agent实例,再启动第一个Agent实例。
bin/flume-ng agent -c conf -f conf/avro2.conf -n agent2 -Dflume.root.logger=INFO,console
2022-03-11 21:32:36,296 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c2
2022-03-11 21:32:36,551 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean.
2022-03-11 21:32:36,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c2 started
2022-03-11 21:32:36,552 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2
2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2022-03-11 21:32:36,560 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2
2022-03-11 21:32:36,562 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 4141 }...
2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started
2022-03-11 21:32:37,056 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.
bin/flume-ng agent -c conf -f conf/avro1.conf -n agent1 -Dflume.root.logger=INFO,console
2022-03-11 21:33:42,883 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2022-03-11 21:33:43,056 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2022-03-11 21:33:43,057 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-11 21:33:43,058 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-11 21:33:43,062 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:294)] Starting RpcSink k1 { host: 172.22.4.21, port: 4141 }...
2022-03-11 21:33:43,063 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-11 21:33:43,063 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k1: Building RpcClient with hostname: 172.22.4.21, port: 4141
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt
2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2022-03-11 21:33:43,065 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client.
2022-03-11 21:33:43,090 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers
2022-03-11 21:33:43,578 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:308)] Rpc sink k1 started.
执行测试
向 /usr/local/program/flume/tempData/test.txt
写入数据
echo "Hello World" >> test.txt
第二个Agent实例
2022-03-11 21:38:26,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2022-03-11 21:38:26,865 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp
2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp
2022-03-11 21:38:32,551 (hdfs-k2-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610.tmp to hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610
Flume的高可用
Flume高可用搭建过程:
Agent1(1号服务器)采集数据,Agent2(2号服务器)与Agent3(3号服务器)接收Agent1发送的数据并写入HDFS
配置Agent1
配置第一台服务器,Agent1
agent1.channels = c1
agent1.sources = r1
# 指定sink有2个
agent1.sinks = k1 k2
# 2个sink分配一个组
agent1.sinkgroups = g1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
# 配置sink1,数据发送到指定服务器
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 4141
# 配置sink1,数据发送到指定服务器
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 4141
# 指定sink组配置
agent1.sinkgroups.g1.sinks = k1 k2
# 配置sink组的参数;failover:开启故障转移
agent1.sinkgroups.g1.processor.type = failover
# sink权值设置,权值高的优先接收数据
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
配置Agent2与Agent3
Agent2与Agent3的配置几乎一致
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.type = avro
a2.sources.r1.bind = node03
a2.sources.r1.port = 5151
a2.sources.r1.channels = c1
a2.sinks.k1.type=hdfs
a2.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/
a2.sinks.k1.hdfs.fileType=DataStream
a2.sinks.k1.hdfs.writeFormat=TEXT
a2.sinks.k1.hdfs.rollInterval=10
a2.sinks.k1.channel=c1
a2.sinks.k1.hdfs.filePrefix=%Y-%m-%d
启动各Agent
注意启动顺序
:
分别先启动Agent2与Agent3实例,再启动Agent1实例
启动后报错:
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
由于写入hfds使用到了时间戳区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳,再Agent2与Agent3中配置
a2.sinks.k2.hdfs.useLocalTimeStamp = true
Agent2
2022-03-11 22:48:39,568 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2022-03-11 22:48:39,572 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms
2022-03-11 22:48:39,814 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2022-03-11 22:48:39,815 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-11 22:48:40,073 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-11 22:48:40,077 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-11 22:48:40,082 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r1: { bindAddress: 172.22.4.21, port: 4141 }...
2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
# Agent1启动后
2022-03-11 22:48:40,614 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r1 started.
2022-03-11 22:48:48,804 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] OPEN
2022-03-11 22:48:48,808 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] BOUND: /172.22.4.21:4141
2022-03-11 22:48:48,809 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] CONNECTED: /172.22.4.21:35448
Agent3
2022-03-11 22:46:08,010 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2
2022-03-11 22:46:08,012 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2
2022-03-11 22:46:08,014 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 5151 }...
2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started
2022-03-11 22:46:08,699 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.
# Agent1启动后
2022-03-11 22:48:49,210 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] OPEN
2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] BOUND: /172.22.4.21:5151
2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] CONNECTED: /172.22.4.21:55952
故障转移Failover测试
由于node02权重值大于node03,故node02优先采集数据上传HDFS。
当node02宕机,node03将复制采集数据上传HDFS。
当node02服务恢复,node02将优先采集数据上传HDFS。
Flume的负载均衡
Flume的负载均衡就是:某Agent路由节点,负责将 Channel 暂存的Event 均衡到对应的多个 Sink组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上
参考flume高可用Agent1的配置开启Flume的负载均衡
# 配置sink组的参数;failover:开启故障转移
agent1.sinkgroups.g1.processor.type = failover
# sink权值设置,权值高的优先接收数据
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
# 负载均衡
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
# 轮询
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
Agent2与Agent3配置参考flume高可用的Agent2余Agent3;最终,当Agent1采集数据时,Agent2与Agent3轮询接受数据
Flume拦截器
配置数据采集Agent1
vim interceptors1.con
# 代表source源有3个
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# 第一个source源配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test1.txt
# 指定拦截器名称
a1.sources.r1.interceptors = i1
# 拦截器类型
a1.sources.r1.interceptors.i1.type = static
## static拦截器作用:往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = test1
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /usr/local/program/flume/tempData/test2.txt
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = test2
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /usr/local/program/flume/tempData/test3.txt
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = test3
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node01
a1.sinks.k1.port = 41414
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
配置数据接受Agent2
vim interceptors2.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = node01
a2.sources.r1.port =41414
# 添加时间拦截器
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a2.channels.c1.type = memory
a2.channels.c1.capacity = 20000
a21.channels.c1.transactionCapacity = 10000
a2.sinks.k1.type = hdfs
# %{type}%获取Agent1中配置的Key-Value对的值
a2.sinks.k1.hdfs.path=hdfs://node001:9000/flume/interceptors/files/%{type}%/%y-%m-%d/%H%M/
a2.sinks.k1.hdfs.filePrefix =events
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.writeFormat = Text
# 时间类型,使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a21.sinks.k1.hdfs.rollCount = 0
# 生成的文件按时间生成
a2.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a2.sinks.k1.hdfs.rollSize = 10485760
# 批量写入hdfs的个数
a2a1.sinks.k1.hdfs.batchSize = 10000
# flume操作hdfs的线程数(包括新建,写入等)
a21.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a2.sinks.k1.hdfs.callTimeout=30000
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
启动Agent
bin/flume-ng agent -c conf -f conf/interceptors2.conf -n a2 -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf -f conf/interceptors1.conf -n a1 -Dflume.root.logger=INFO,console
执行测试
[root@administrator tempData]# echo "Hello world" >> test1.txt
[root@administrator tempData]# echo "Hello world" >> test2.txt
[root@administrator tempData]# echo "Hello world" >> test3.txt
Flume的监控
Ganglia是一款大规模集群监控软件,适合分布式集群或并行集群监控
官网地址:http://ganglia.info/
安装Ganglia
使用Ganglia镜像创建容器配置Flume监控或安装Ganglia相关软件进行Flume的监控
拉取Ganglia相关镜像
docker pull jackchencoding/ganglia
创建Ganglia容器
docker run -itd --name ganglia -p 8080:80 -p 8081:8649 --privileged=true ganglia:latest
配置Flume
修改/usr/local/program/flume/conf
目录下的flume-env.sh
,进行如下配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=IP:8649 -Xms100m -Xmx200m"
启动Flume任务
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
执行测试
访问:IP:8080/ganglia
,模拟发送数据,操作Flume测试监控
[root@administrator ~]# telnet node001 44444
Trying 172.22.4.21...
Connected to node01.
Escape character is '^]'.
Hello World
OK
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136953.html