数据采集工具之Flume的常用采集方式详细使用示例
Flume
Flume概述
官网:https://flume.apache.org/
文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
Apache Flume 是 Apache软件基金会的顶级项目。Flume初始发行版本被统称为Flume OG(original generation),属于cloudera。后来重构核心组件、核心配置以及代码架构,重构后的版本统称为Flume NG(next generation)
Apache Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Apache Flume 的使用不仅限于日志数据聚合。由于数据源是可定制的,Flume可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎任何可能的数据源。
Flume架构
外部数据源以特定格式向 Flume 发送 events (事件),当 source 接收到 events 时,它将其存储到一个或多个 channel,channe 会一直保存 events 直到它被 sink 所消费。sink 的主要功能从 channel 中读取 events,并将其存入外部存储系统或转发到下一个 source,成功后再从 channel 中移除 events。
Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。
核心的组件
Flume运行的核心是Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。相当于一个数据传递者 ,一个独立的Flume进程。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。通过这些组件, Event可以从一个地方流向另一个地方
Client
Client
:Client生产数据,运行在一个独立的线程
Event
Event
: 一个数据传输的基本单元,消息头和消息体组成,前者是键/值映射,后者是任意字节数组
Agent
Agent
:相当于一个数据传递者 ,一个独立独立的 (JVM) 进程,包含组件Source、 Channel、 Sink
Source
Source
是数据收集组件,负责将数据采集后进行特殊格式化,将数据封装到事件event里,然后将事件推入Channel中。
使用详情参考:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources
Channel
Channel
: 传输通道组件,中转Event的一个临时存储,可以看做一个数据的缓冲区(数据队列),保存由Source组件传递过来的Event。可以是内存或持久化的文件系统:
Memory Channel : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机)
File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢
使用详情参考:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channels
Sink
Sink
: 下沉组件,从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent或者往最终存储系统传递数据
使用详情参考:
https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sinks
Interceptor
当需要对数据进行过滤时,除了在Source、 Channel和Sink进行代码修改之外, Flume提供了拦截器,拦截器也是chain形式的。拦截器的位置在Source和Channel之间,当为Source指定拦截器后,在拦截器中会得到event,根据需求可以对event进行保留还是 抛弃,抛弃的数据不会进入Channel中。
常用Channel、Sink、Source类型
Channel支持的类型
Sink支持的类型
Source支持的类型
Flume架构模式
架构1:
Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 Avro 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口
架构2:
Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。
架构3:
Flume 支持从一个 Source向多个 Channel,也就是向多个 Sink 传递事件。默认情况下是向所有的 Channel复制 Event,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 Source 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。
安装Flume
下载地址:http://archive.apache.org/dist/flume/
wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume
查看jdk安装路径
find / -name java
修改配置文件
cd flume/conf
cp flume-env.sh.template flume-env.sh
vim flume-env.sh
# export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export JAVA_HOME=JAVA_HOME=/usr/local/jdk1.8/
# 建议: 当chnnel设置为内存存储的时候,给Flume分配更多的内存
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
修改环境变量
vim /etc/profile
export FLUME_HOME=/usr/local/program/flume
export PATH=$FLUME_HOME/bin:$PATH
刷新配置文件
source /etc/profile
验证
flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
Flume的基本使用
监控端口数据参考: https://flume.apache.org/FlumeUserGuide.html#a-simple-example
编写配置文件
根据数据采集的需求配置采集方案,描述在配置文件中
在flume的conf目录下新建一个配置文件
vim /usr/local/program/flume/conf/example.conf
配置Agent实例各组件名称
#a1:Agent实例的名称
#sources名称
a1.sources = r1
# sinks的名称
a1.sinks = k1
# channels的名称
a1.channels = c1
配置Source
# source类型:从网络套接字/端口接收数据
a1.sources.r1.type = netcat
# 监听地址
a1.sources.r1.bind = node001
# 监听端口
a1.sources.r1.port = 44444
配置Channel
# 数据存放位置,存放内存中
a1.channels.c1.type = memory
# 通道中最大可存储的event数量
a1.channels.c1.capacity = 1000
# 一次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.transactionCapacity = 100
配置Sink
# 数据存放位置,存放日志系统中
a1.sinks.k1.type = logger
将source和sink绑定到channel
# source使用的channel
a1.sources.r1.channels = c1
# sink使用的channel
a1.sinks.k1.channel = c1
启动Agent实例
指定采集方案配置文件,在相应的节点上启动flume agent
-c conf 指定flume自身的配置文件所在目录
-f conf/example.con 指定描述的采集方案
-n a1 指定agent的名字
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
启动日志
2022-03-08 20:43:58,557 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-08 20:43:58,558 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-08 20:43:58,560 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2022-03-08 20:43:58,581 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.22.4.21:44444]
测试
安装telnet
yum -y install telnet
模拟发送数据
[root@administrator ~]# telnet node001 44444
Trying 172.22.4.21...
Connected to node01.
Escape character is '^]'.
Hello World
OK
收到数据
2022-03-08 20:44:21,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 0D Hello World. }
采集目录数据到HDFS
采集目录数据使用Spooling Source
,参考:https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
编写配置文件
vim /usr/local/program/flume/conf/spooldir.conf
1.配置Agent实例各组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
2.配置Source
# Source类型:监控某目录下文件的变化
a1.sources.r1.type = spooldir
# 监控目录的路径
a1.sources.r1.spoolDir= /usr/local/program/flume/tempData
# 是否添加文件头部信息
a1.sources.r1.fileHeader= true
3.配置Channel
# 数据存放内存中
a1.channels.c1.type = memory
# 存放数据量,指存放Event数量,即每条/每行数据
a1.channels.c1.capacity = 1000
# 向Sink组件一次性传递多少Event数据量(队列);从chanel中获取event存入队列,队列满了发送Sink
a1.channels.c1.transactionCapacity = 100
4.配置Sink
hdfs sink
https://flume.apache.org/FlumeUserGuide.html#hdfs-sink
# sink类型:hdfs
a1.sinks.k1.type = hdfs
# 数据存放hdfs的位置
a1.sinks.k1.hdfs.path = hdfs://node01:9000/flume/spooldir/files/%y-%m-%d/%H%M/
# 文件前缀
a1.sinks.k1.hdfs.filePrefix = events-
# 文件是否滚动,产生新的文件
a1.sinks.k1.hdfs.round = true
# 文件夹滚动周期,即生成文件夹;每10分钟产生1个文件夹
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 每隔3秒产生一个新文件
a1.sinks.k1.hdfs.rollInterval = 3
# 文件(hdfs临时)大小达到20字节产生新文件
a1.sinks.k1.hdfs.rollSize = 20
# 文件(hdfs临时)大小达到5个event,产生新文件
a1.sinks.k1.hdfs.rollCount = 5
# 一次从channel向队列存入的event个数;batchSize < transactionCapacity < capacity
a1.sinks.k1.hdfs.batchSize = 1
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
5.将source和sink绑定到channel
# sources对应的channel
a1.sources.r1.channels = c1
# sink对应的channel
a1.sinks.k1.channel = c1
启动Agent实例
bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console
启动日志
2022-03-08 22:20:02,045 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-08 22:20:02,200 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-08 22:20:02,203 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-08 22:20:02,205 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:85)] SpoolDirectorySource source starting with directory: /usr/local/program/flume/tempData
2022-03-08 22:20:02,206 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-08 22:20:02,209 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-08 22:20:02,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-08 22:20:02,255 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
测试
在监控目录新建文件
cd /usr/local/program/flume/tempData
echo "hello world" >> spooldir.txt
采集日志
2022-03-08 22:22:37,622 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2022-03-08 22:22:37,624 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /usr/local/program/flume/tempData/spooldir.txt to /usr/local/program/flume/tempData/spooldir.txt.COMPLETED
2022-03-08 22:22:40,262 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false
2022-03-08 22:22:40,573 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp
2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp
2022-03-08 22:22:46,664 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257.tmp to hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257
查看文件是否被采集
采集完成的文件,会被agent自动添加一个后缀:COMPLETED
ls /usr/local/program/flume/tempData
spooldir.txt.COMPLETED
查看HDFS
采集文件数据到HDFS
采集文件数据使用Exec Source
,参考:https://flume.apache.org/FlumeUserGuide.html#exec-source
编写配置文件
vim /usr/local/program/flume/conf/exec.conf
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 监控文件内容更新
agent1.sources.source1.type = exec
# 监控那个文件
agent1.sources.source1.command = tail -F /usr/local/program/flume/tempData/test.txt
# sink类型:hdfs
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://node01:9000/flume/exec/files/%y-%m-%d/%H%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
# 最大打开文件
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
# 写文件类型,普通类型
agent1.sinks.sink1.hdfs.fileType = DataStream
# 写文件的格式
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.channels.channel1.type = memory
# event添加到通道中或者移出的允许时间
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 5000
agent1.channels.channel1.transactionCapacity = 600
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
启动Agent实例
bin/flume-ng agent -c conf -f conf/exec.conf -n agent1 -Dflume.root.logger=INFO,console
启动日志
2022-03-08 22:41:39,484 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink sink1
2022-03-08 22:41:39,485 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source source1
2022-03-08 22:41:39,487 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt
2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: sink1 started
2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: source1 started
向监控文件写入数据
echo "hello world" >> /usr/local/program/flume/tempData/test.txt
查看监控日志
2022-03-08 22:41:42,555 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2022-03-08 22:41:42,975 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp
2022-03-08 22:42:16,054 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-08 22:42:16,455 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp
2022-03-08 22:42:16,497 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556.tmp to hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556
查看DHFS
采集文件数据到Kafka
编写配置文件
vim /usr/local/program/flume/conf/exec2kafka.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumeTopic
a1.sinks.k1.brokerList = node001:9092,node002:9092,node003:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 10
a1.sinks.k1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
创建Topic
kafka-topics.sh --zookeeper node001:2181 --create --replication-factor 3 --partitions 3 --topic flumeTopic
启动消费者
kafka-console-consumer.sh --zookeeper node001:2181 --from-beginning --topic flumeTopic
启动Flume
flume-ng agent -n a1 -f /usr/local/program/flume/conf/exec2kafka.conf -Dflume.root.logger=INFO,console
多个Agent级联
Agent 级联过程:
1.第一个agent(1号服务器)收集数据,通过网络发送到第二个agent(2号服务器)
2.第二个agent(2号服务器)接收第一个agent(1号服务器)发送的数据,并将数据保存到hdfs
配置Agent1
配置第一个Agent实例
agent1.sources = r1
agent1.sinks = k1
agent1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
agent1.sources.r1.channels = c1
agent1.sinks = k1
# sink端的avro是一个数据发送者
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = node01
agent1.sinks.k1.port = 4141
agent1.sinks.k1.batch-size = 10
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
配置Agent2
配置第二个Agent实例,由于使用一台服务器,故修改Agent2实例名称位agent2,Agent三组件名称应该可以同Agent1一样。
agent2.sources = r2
agent2.sinks = k2
agent2.channels = c2
## source中的avro组件是一个接收者服务
agent2.sources.r2.type = avro
agent2.sources.r2.channels = c2
agent2.sources.r2.bind = node01
agent2.sources.r2.port = 4141
# Describe the sink
agent2.sinks.k2.type = hdfs
agent2.sinks.k2.hdfs.path = hdfs://node01:9000/flume/avro/files/%y-%m-%d/%H%M/
agent2.sinks.k2.hdfs.filePrefix = events-
agent2.sinks.k2.hdfs.round = true
agent2.sinks.k2.hdfs.roundValue = 10
agent2.sinks.k2.hdfs.roundUnit = minute
agent2.sinks.k2.hdfs.rollInterval = 3
agent2.sinks.k2.hdfs.rollSize = 20
agent2.sinks.k2.hdfs.rollCount = 5
agent2.sinks.k2.hdfs.batchSize = 1
agent2.sinks.k2.hdfs.useLocalTimeStamp = true
agent2.sinks.k2.hdfs.fileType = DataStream
agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2
启动Agent实例
先启动第二个Agent实例,再启动第一个Agent实例。
bin/flume-ng agent -c conf -f conf/avro2.conf -n agent2 -Dflume.root.logger=INFO,console
2022-03-11 21:32:36,296 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c2
2022-03-11 21:32:36,551 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean.
2022-03-11 21:32:36,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c2 started
2022-03-11 21:32:36,552 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2
2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2022-03-11 21:32:36,560 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2
2022-03-11 21:32:36,562 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 4141 }...
2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started
2022-03-11 21:32:37,056 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.
bin/flume-ng agent -c conf -f conf/avro1.conf -n agent1 -Dflume.root.logger=INFO,console
2022-03-11 21:33:42,883 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2022-03-11 21:33:43,056 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2022-03-11 21:33:43,057 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-11 21:33:43,058 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-11 21:33:43,062 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:294)] Starting RpcSink k1 { host: 172.22.4.21, port: 4141 }...
2022-03-11 21:33:43,063 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-11 21:33:43,063 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k1: Building RpcClient with hostname: 172.22.4.21, port: 4141
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt
2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2022-03-11 21:33:43,065 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client.
2022-03-11 21:33:43,090 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers
2022-03-11 21:33:43,578 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:308)] Rpc sink k1 started.
执行测试
向 /usr/local/program/flume/tempData/test.txt
写入数据
echo "Hello World" >> test.txt
第二个Agent实例
2022-03-11 21:38:26,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2022-03-11 21:38:26,865 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp
2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp
2022-03-11 21:38:32,551 (hdfs-k2-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610.tmp to hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610
查看HDFS
Flume的高可用
Flume高可用搭建过程:
Agent1(1号服务器)采集数据,Agent2(2号服务器)与Agent3(3号服务器)接收Agent1发送的数据并写入HDFS
配置Agent1
配置第一台服务器,Agent1
agent1.channels = c1
agent1.sources = r1
# 指定sink有2个
agent1.sinks = k1 k2
# 2个sink分配一个组
agent1.sinkgroups = g1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
# 配置sink1,数据发送到指定服务器
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 4141
# 配置sink1,数据发送到指定服务器
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 4141
# 指定sink组配置
agent1.sinkgroups.g1.sinks = k1 k2
# 配置sink组的参数;failover:开启故障转移
agent1.sinkgroups.g1.processor.type = failover
# sink权值设置,权值高的优先接收数据
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
配置Agent2与Agent3
Agent2与Agent3的配置几乎一致
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.type = avro
a2.sources.r1.bind = node03
a2.sources.r1.port = 5151
a2.sources.r1.channels = c1
a2.sinks.k1.type=hdfs
a2.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/
a2.sinks.k1.hdfs.fileType=DataStream
a2.sinks.k1.hdfs.writeFormat=TEXT
a2.sinks.k1.hdfs.rollInterval=10
a2.sinks.k1.channel=c1
a2.sinks.k1.hdfs.filePrefix=%Y-%m-%d
启动各Agent
注意启动顺序
:
分别先启动Agent2与Agent3实例,再启动Agent1实例
启动后报错:
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
由于写入hfds使用到了时间戳区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳,再Agent2与Agent3中配置
a2.sinks.k2.hdfs.useLocalTimeStamp = true
Agent2
2022-03-11 22:48:39,568 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2022-03-11 22:48:39,572 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms
2022-03-11 22:48:39,814 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2022-03-11 22:48:39,815 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-11 22:48:40,073 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-11 22:48:40,077 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-11 22:48:40,082 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r1: { bindAddress: 172.22.4.21, port: 4141 }...
2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
# Agent1启动后
2022-03-11 22:48:40,614 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r1 started.
2022-03-11 22:48:48,804 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] OPEN
2022-03-11 22:48:48,808 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] BOUND: /172.22.4.21:4141
2022-03-11 22:48:48,809 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] CONNECTED: /172.22.4.21:35448
Agent3
2022-03-11 22:46:08,010 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2
2022-03-11 22:46:08,012 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2
2022-03-11 22:46:08,014 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 5151 }...
2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started
2022-03-11 22:46:08,699 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.
# Agent1启动后
2022-03-11 22:48:49,210 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] OPEN
2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] BOUND: /172.22.4.21:5151
2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] CONNECTED: /172.22.4.21:55952
故障转移Failover测试
由于node02权重值大于node03,故node02优先采集数据上传HDFS。
当node02宕机,node03将复制采集数据上传HDFS。
当node02服务恢复,node02将优先采集数据上传HDFS。
Flume的负载均衡
Flume的负载均衡就是:某Agent路由节点,负责将 Channel 暂存的Event 均衡到对应的多个 Sink组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上
参考flume高可用Agent1的配置开启Flume的负载均衡
# 配置sink组的参数;failover:开启故障转移
agent1.sinkgroups.g1.processor.type = failover
# sink权值设置,权值高的优先接收数据
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
# 负载均衡
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
# 轮询
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
Agent2与Agent3配置参考flume高可用的Agent2余Agent3;最终,当Agent1采集数据时,Agent2与Agent3轮询接受数据
Flume拦截器
配置数据采集Agent1
vim interceptors1.con
# 代表source源有3个
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# 第一个source源配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test1.txt
# 指定拦截器名称
a1.sources.r1.interceptors = i1
# 拦截器类型
a1.sources.r1.interceptors.i1.type = static
## static拦截器作用:往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = test1
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /usr/local/program/flume/tempData/test2.txt
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = test2
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /usr/local/program/flume/tempData/test3.txt
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = test3
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node01
a1.sinks.k1.port = 41414
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
配置数据接受Agent2
vim interceptors2.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = node01
a2.sources.r1.port =41414
# 添加时间拦截器
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a2.channels.c1.type = memory
a2.channels.c1.capacity = 20000
a21.channels.c1.transactionCapacity = 10000
a2.sinks.k1.type = hdfs
# %{type}%获取Agent1中配置的Key-Value对的值
a2.sinks.k1.hdfs.path=hdfs://node001:9000/flume/interceptors/files/%{type}%/%y-%m-%d/%H%M/
a2.sinks.k1.hdfs.filePrefix =events
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.writeFormat = Text
# 时间类型,使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a21.sinks.k1.hdfs.rollCount = 0
# 生成的文件按时间生成
a2.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a2.sinks.k1.hdfs.rollSize = 10485760
# 批量写入hdfs的个数
a2a1.sinks.k1.hdfs.batchSize = 10000
# flume操作hdfs的线程数(包括新建,写入等)
a21.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a2.sinks.k1.hdfs.callTimeout=30000
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
启动Agent
bin/flume-ng agent -c conf -f conf/interceptors2.conf -n a2 -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf -f conf/interceptors1.conf -n a1 -Dflume.root.logger=INFO,console
执行测试
[root@administrator tempData]# echo "Hello world" >> test1.txt
[root@administrator tempData]# echo "Hello world" >> test2.txt
[root@administrator tempData]# echo "Hello world" >> test3.txt
Flume的监控
Ganglia是一款大规模集群监控软件,适合分布式集群或并行集群监控
官网地址:http://ganglia.info/
安装Ganglia
使用Ganglia镜像创建容器配置Flume监控或安装Ganglia相关软件进行Flume的监控
拉取Ganglia相关镜像
docker pull jackchencoding/ganglia
创建Ganglia容器
docker run -itd --name ganglia -p 8080:80 -p 8081:8649 --privileged=true ganglia:latest
配置Flume
修改/usr/local/program/flume/conf
目录下的flume-env.sh
,进行如下配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=IP:8649 -Xms100m -Xmx200m"
启动Flume任务
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
执行测试
访问:IP:8080
访问:IP:8080/ganglia
,模拟发送数据,操作Flume测试监控
[root@administrator ~]# telnet node001 44444
Trying 172.22.4.21...
Connected to node01.
Escape character is '^]'.
Hello World
OK
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136953.html