数据采集工具之Flume的常用采集方式详细使用示例

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 数据采集工具之Flume的常用采集方式详细使用示例,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

Flume

Flume概述

官网:https://flume.apache.org/

文档:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

Apache Flume 是 Apache软件基金会的顶级项目。Flume初始发行版本被统称为Flume OG(original generation),属于cloudera。后来重构核心组件、核心配置以及代码架构,重构后的版本统称为Flume NG(next generation)

Apache Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

Apache Flume 的使用不仅限于日志数据聚合。由于数据源是可定制的,Flume可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎任何可能的数据源。

Flume架构

外部数据源以特定格式向 Flume 发送 events (事件),当 source 接收到 events 时,它将其存储到一个或多个 channel,channe 会一直保存 events 直到它被 sink 所消费。sink 的主要功能从 channel 中读取 events,并将其存入外部存储系统或转发到下一个 source,成功后再从 channel 中移除 events。

在这里插入图片描述

Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。

核心的组件

Flume运行的核心是Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。相当于一个数据传递者 ,一个独立的Flume进程。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。通过这些组件, Event可以从一个地方流向另一个地方

Client

Client:Client生产数据,运行在一个独立的线程

Event

Event: 一个数据传输的基本单元,消息头和消息体组成,前者是键/值映射,后者是任意字节数组

Agent

Agent:相当于一个数据传递者 ,一个独立独立的 (JVM) 进程,包含组件Source、 Channel、 Sink

Source

Source是数据收集组件,负责将数据采集后进行特殊格式化,将数据封装到事件event里,然后将事件推入Channel中。

使用详情参考:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

在这里插入图片描述
Channel

Channel: 传输通道组件,中转Event的一个临时存储,可以看做一个数据的缓冲区(数据队列),保存由Source组件传递过来的Event。可以是内存或持久化的文件系统:

Memory Channel : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机)

File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢

使用详情参考:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channels

在这里插入图片描述
Sink

Sink: 下沉组件,从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent或者往最终存储系统传递数据

使用详情参考:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sinks

在这里插入图片描述
Interceptor

当需要对数据进行过滤时,除了在Source、 Channel和Sink进行代码修改之外, Flume提供了拦截器,拦截器也是chain形式的。拦截器的位置在Source和Channel之间,当为Source指定拦截器后,在拦截器中会得到event,根据需求可以对event进行保留还是 抛弃,抛弃的数据不会进入Channel中。

在这里插入图片描述

常用Channel、Sink、Source类型

Channel支持的类型
在这里插入图片描述
Sink支持的类型
在这里插入图片描述
Source支持的类型
在这里插入图片描述

Flume架构模式

架构1:

Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 Avro 类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口

在这里插入图片描述
架构2:

Flume 支持使用多个 Agent 分别收集日志,然后通过一个或者多个 Agent 聚合后再存储到文件系统中。

在这里插入图片描述
架构3:

Flume 支持从一个 Source向多个 Channel,也就是向多个 Sink 传递事件。默认情况下是向所有的 Channel复制 Event,即所有 Channel 收到的数据都是相同的。同时 Flume 也支持在 Source 上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。

在这里插入图片描述

安装Flume

下载地址:http://archive.apache.org/dist/flume/

wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

tar -zxvf apache-flume-1.9.0-bin.tar.gz

mv apache-flume-1.9.0-bin flume

查看jdk安装路径

 find / -name java

修改配置文件

cd flume/conf

cp flume-env.sh.template flume-env.sh

vim flume-env.sh

# export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export JAVA_HOME=JAVA_HOME=/usr/local/jdk1.8/ 


# 建议: 当chnnel设置为内存存储的时候,给Flume分配更多的内存
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

修改环境变量

vim /etc/profile

export FLUME_HOME=/usr/local/program/flume
export PATH=$FLUME_HOME/bin:$PATH

刷新配置文件

source /etc/profile

验证

flume-ng version

Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume的基本使用

监控端口数据参考: https://flume.apache.org/FlumeUserGuide.html#a-simple-example

编写配置文件

根据数据采集的需求配置采集方案,描述在配置文件中

在flume的conf目录下新建一个配置文件

vim /usr/local/program/flume/conf/example.conf

配置Agent实例各组件名称

#a1:Agent实例的名称

#sources名称
a1.sources  =  r1 
# sinks的名称
a1.sinks  =  k1 
# channels的名称
a1.channels  =  c1

配置Source

# source类型:从网络套接字/端口接收数据
a1.sources.r1.type  =  netcat 
# 监听地址
a1.sources.r1.bind  =  node001
# 监听端口
a1.sources.r1.port  =  44444

配置Channel

# 数据存放位置,存放内存中
a1.channels.c1.type  =  memory 
# 通道中最大可存储的event数量
a1.channels.c1.capacity  =  1000 
# 一次最大可以从source中拿到或者送到sink中的event数量
a1.channels.c1.transactionCapacity  =  100

配置Sink

# 数据存放位置,存放日志系统中
a1.sinks.k1.type  =  logger

将source和sink绑定到channel

# source使用的channel
a1.sources.r1.channels  =  c1 
# sink使用的channel
a1.sinks.k1.channel  =  c1

启动Agent实例

指定采集方案配置文件,在相应的节点上启动flume agent

-c conf  指定flume自身的配置文件所在目录
-f conf/example.con  指定描述的采集方案
-n a1  指定agent的名字
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

启动日志

2022-03-08 20:43:58,557 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-08 20:43:58,558 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-08 20:43:58,560 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2022-03-08 20:43:58,581 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.22.4.21:44444]

测试

安装telnet

yum -y install telnet

模拟发送数据

[root@administrator ~]# telnet node001 44444
Trying  172.22.4.21...
Connected to node01.
Escape character is '^]'.
Hello World
OK

收到数据

2022-03-08 20:44:21,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 0D             Hello World. }

采集目录数据到HDFS

采集目录数据使用Spooling Source,参考:https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

编写配置文件

vim /usr/local/program/flume/conf/spooldir.conf

1.配置Agent实例各组件的名称

a1.sources  =  r1 
a1.sinks  =  k1 
a1.channels  =  c1

2.配置Source

# Source类型:监控某目录下文件的变化
a1.sources.r1.type  =  spooldir
# 监控目录的路径
a1.sources.r1.spoolDir=  /usr/local/program/flume/tempData
# 是否添加文件头部信息
a1.sources.r1.fileHeader= true

3.配置Channel

# 数据存放内存中
a1.channels.c1.type  =  memory 
# 存放数据量,指存放Event数量,即每条/每行数据
a1.channels.c1.capacity  =  1000
# 向Sink组件一次性传递多少Event数据量(队列);从chanel中获取event存入队列,队列满了发送Sink 
a1.channels.c1.transactionCapacity  =  100

4.配置Sink

hdfs sink
https://flume.apache.org/FlumeUserGuide.html#hdfs-sink

# sink类型:hdfs
a1.sinks.k1.type = hdfs
# 数据存放hdfs的位置
a1.sinks.k1.hdfs.path = hdfs://node01:9000/flume/spooldir/files/%y-%m-%d/%H%M/
# 文件前缀
a1.sinks.k1.hdfs.filePrefix = events-
# 文件是否滚动,产生新的文件
a1.sinks.k1.hdfs.round = true
# 文件夹滚动周期,即生成文件夹;每10分钟产生1个文件夹
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 每隔3秒产生一个新文件
a1.sinks.k1.hdfs.rollInterval = 3
# 文件(hdfs临时)大小达到20字节产生新文件
a1.sinks.k1.hdfs.rollSize = 20
# 文件(hdfs临时)大小达到5个event,产生新文件
a1.sinks.k1.hdfs.rollCount = 5
# 一次从channel向队列存入的event个数;batchSize < transactionCapacity  < capacity  
a1.sinks.k1.hdfs.batchSize = 1
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true

5.将source和sink绑定到channel

# sources对应的channel
a1.sources.r1.channels = c1
# sink对应的channel
a1.sinks.k1.channel = c1

启动Agent实例

bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console

启动日志

2022-03-08 22:20:02,045 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-08 22:20:02,200 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-08 22:20:02,203 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-08 22:20:02,205 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:85)] SpoolDirectorySource source starting with directory: /usr/local/program/flume/tempData
2022-03-08 22:20:02,206 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-08 22:20:02,209 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-08 22:20:02,254 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-08 22:20:02,255 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started

测试

在监控目录新建文件

cd /usr/local/program/flume/tempData

echo "hello world" >> spooldir.txt

采集日志

2022-03-08 22:22:37,622 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:384)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2022-03-08 22:22:37,624 (pool-5-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:497)] Preparing to move file /usr/local/program/flume/tempData/spooldir.txt to /usr/local/program/flume/tempData/spooldir.txt.COMPLETED
2022-03-08 22:22:40,262 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false
2022-03-08 22:22:40,573 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp
2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-08 22:22:46,605 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/spooldir/files/22-03-08/2220//events-.1646749360257.tmp
2022-03-08 22:22:46,664 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257.tmp to hdfs://node01:9000/flume/spooldir/files/22-03-08/2220/events-.1646749360257

查看文件是否被采集

采集完成的文件,会被agent自动添加一个后缀:COMPLETED

ls /usr/local/program/flume/tempData

 spooldir.txt.COMPLETED

查看HDFS
在这里插入图片描述

采集文件数据到HDFS

采集文件数据使用Exec Source,参考:https://flume.apache.org/FlumeUserGuide.html#exec-source

编写配置文件

vim /usr/local/program/flume/conf/exec.conf

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 监控文件内容更新
agent1.sources.source1.type = exec
# 监控那个文件
agent1.sources.source1.command = tail -F /usr/local/program/flume/tempData/test.txt

# sink类型:hdfs
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://node01:9000/flume/exec/files/%y-%m-%d/%H%M/
agent1.sinks.sink1.hdfs.filePrefix = access_log
# 最大打开文件
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
# 写文件类型,普通类型
agent1.sinks.sink1.hdfs.fileType = DataStream
# 写文件的格式
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

agent1.channels.channel1.type = memory
# event添加到通道中或者移出的允许时间
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 5000
agent1.channels.channel1.transactionCapacity = 600

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

启动Agent实例

bin/flume-ng agent -c conf -f conf/exec.conf -n agent1 -Dflume.root.logger=INFO,console

启动日志

2022-03-08 22:41:39,484 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink sink1
2022-03-08 22:41:39,485 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source source1
2022-03-08 22:41:39,487 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt
2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
2022-03-08 22:41:39,488 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: sink1 started
2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
2022-03-08 22:41:39,489 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: source1 started

向监控文件写入数据

 echo "hello world" >> /usr/local/program/flume/tempData/test.txt

查看监控日志

2022-03-08 22:41:42,555 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2022-03-08 22:41:42,975 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp
2022-03-08 22:42:16,054 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-08 22:42:16,455 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/exec/files/22-03-08/2240//access_log.1646750502556.tmp
2022-03-08 22:42:16,497 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556.tmp to hdfs://node01:9000/flume/exec/files/22-03-08/2240/access_log.1646750502556

查看DHFS
在这里插入图片描述

采集文件数据到Kafka

编写配置文件

vim /usr/local/program/flume/conf/exec2kafka.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1


a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumeTopic
a1.sinks.k1.brokerList = node001:9092,node002:9092,node003:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 10
a1.sinks.k1.channel = c1


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

创建Topic

kafka-topics.sh --zookeeper node001:2181 --create --replication-factor 3 --partitions 3 --topic flumeTopic

启动消费者

kafka-console-consumer.sh --zookeeper node001:2181 --from-beginning --topic flumeTopic

启动Flume

flume-ng agent -n a1 -f  /usr/local/program/flume/conf/exec2kafka.conf -Dflume.root.logger=INFO,console

多个Agent级联

Agent 级联过程:

1.第一个agent(1号服务器)收集数据,通过网络发送到第二个agent(2号服务器)

2.第二个agent(2号服务器)接收第一个agent(1号服务器)发送的数据,并将数据保存到hdfs

配置Agent1

配置第一个Agent实例

agent1.sources = r1
agent1.sinks = k1
agent1.channels = c1


agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt
agent1.sources.r1.channels = c1

agent1.sinks = k1
# sink端的avro是一个数据发送者
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = node01
agent1.sinks.k1.port = 4141
agent1.sinks.k1.batch-size = 10


agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100


agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1

配置Agent2

配置第二个Agent实例,由于使用一台服务器,故修改Agent2实例名称位agent2,Agent三组件名称应该可以同Agent1一样。

agent2.sources = r2
agent2.sinks = k2
agent2.channels = c2


## source中的avro组件是一个接收者服务
agent2.sources.r2.type = avro
agent2.sources.r2.channels = c2
agent2.sources.r2.bind = node01
agent2.sources.r2.port = 4141
# Describe the sink
agent2.sinks.k2.type = hdfs
agent2.sinks.k2.hdfs.path = hdfs://node01:9000/flume/avro/files/%y-%m-%d/%H%M/
agent2.sinks.k2.hdfs.filePrefix = events-
agent2.sinks.k2.hdfs.round = true
agent2.sinks.k2.hdfs.roundValue = 10
agent2.sinks.k2.hdfs.roundUnit = minute
agent2.sinks.k2.hdfs.rollInterval = 3
agent2.sinks.k2.hdfs.rollSize = 20
agent2.sinks.k2.hdfs.rollCount = 5
agent2.sinks.k2.hdfs.batchSize = 1
agent2.sinks.k2.hdfs.useLocalTimeStamp = true
agent2.sinks.k2.hdfs.fileType = DataStream


agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100


agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2

启动Agent实例

先启动第二个Agent实例,再启动第一个Agent实例。

bin/flume-ng agent -c conf -f conf/avro2.conf -n agent2 -Dflume.root.logger=INFO,console

2022-03-11 21:32:36,296 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c2
2022-03-11 21:32:36,551 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c2: Successfully registered new MBean.
2022-03-11 21:32:36,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c2 started
2022-03-11 21:32:36,552 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2
2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2022-03-11 21:32:36,555 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2022-03-11 21:32:36,560 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2
2022-03-11 21:32:36,562 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 4141 }...
2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2022-03-11 21:32:37,051 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started
2022-03-11 21:32:37,056 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.

bin/flume-ng agent -c conf -f conf/avro1.conf -n agent1 -Dflume.root.logger=INFO,console

2022-03-11 21:33:42,883 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2022-03-11 21:33:43,056 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2022-03-11 21:33:43,057 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-11 21:33:43,058 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-11 21:33:43,062 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:294)] Starting RpcSink k1 { host: 172.22.4.21, port: 4141 }...
2022-03-11 21:33:43,063 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-11 21:33:43,063 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k1: Building RpcClient with hostname: 172.22.4.21, port: 4141
2022-03-11 21:33:43,064 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:170)] Exec source starting with command: tail -F /usr/local/program/flume/tempData/test.txt
2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-11 21:33:43,066 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2022-03-11 21:33:43,065 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client.
2022-03-11 21:33:43,090 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers
2022-03-11 21:33:43,578 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:308)] Rpc sink k1 started.

执行测试

/usr/local/program/flume/tempData/test.txt写入数据

echo "Hello World" >> test.txt

第二个Agent实例

2022-03-11 21:38:26,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2022-03-11 21:38:26,865 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp
2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-03-11 21:38:32,515 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing hdfs://node01:9000/flume/avro/files/22-03-11/2130//events-.1647005906610.tmp
2022-03-11 21:38:32,551 (hdfs-k2-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610.tmp to hdfs://node01:9000/flume/avro/files/22-03-11/2130/events-.1647005906610

查看HDFS
在这里插入图片描述

Flume的高可用

Flume高可用搭建过程:

Agent1(1号服务器)采集数据,Agent2(2号服务器)与Agent3(3号服务器)接收Agent1发送的数据并写入HDFS

配置Agent1

配置第一台服务器,Agent1

agent1.channels = c1
agent1.sources = r1
# 指定sink有2个
agent1.sinks = k1 k2

# 2个sink分配一个组
agent1.sinkgroups = g1


agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test.txt

agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

# 配置sink1,数据发送到指定服务器
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 4141


# 配置sink1,数据发送到指定服务器
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 4141

# 指定sink组配置
agent1.sinkgroups.g1.sinks = k1 k2

# 配置sink组的参数;failover:开启故障转移
agent1.sinkgroups.g1.processor.type = failover
# sink权值设置,权值高的优先接收数据
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000


agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

配置Agent2与Agent3

Agent2与Agent3的配置几乎一致


a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100


a2.sources.r1.type = avro
a2.sources.r1.bind = node03
a2.sources.r1.port = 5151
a2.sources.r1.channels = c1

a2.sinks.k1.type=hdfs
a2.sinks.k1.hdfs.path= hdfs://node01:9000/flume/cluster/files/%y-%m-%d/%H%M/
a2.sinks.k1.hdfs.fileType=DataStream
a2.sinks.k1.hdfs.writeFormat=TEXT
a2.sinks.k1.hdfs.rollInterval=10
a2.sinks.k1.channel=c1
a2.sinks.k1.hdfs.filePrefix=%Y-%m-%d

启动各Agent

注意启动顺序

分别先启动Agent2与Agent3实例,再启动Agent1实例

启动后报错:

org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

由于写入hfds使用到了时间戳区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳,再Agent2与Agent3中配置
a2.sinks.k2.hdfs.useLocalTimeStamp = true

Agent2

2022-03-11 22:48:39,568 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1
2022-03-11 22:48:39,572 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Waiting for channel: c1 to start. Sleeping for 500 ms
2022-03-11 22:48:39,814 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2022-03-11 22:48:39,815 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2022-03-11 22:48:40,073 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1
2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2022-03-11 22:48:40,076 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2022-03-11 22:48:40,077 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r1
2022-03-11 22:48:40,082 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r1: { bindAddress: 172.22.4.21, port: 4141 }...
2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2022-03-11 22:48:40,588 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started

# Agent1启动后
2022-03-11 22:48:40,614 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r1 started.
2022-03-11 22:48:48,804 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] OPEN
2022-03-11 22:48:48,808 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] BOUND: /172.22.4.21:4141
2022-03-11 22:48:48,809 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x512e205c, /172.22.4.21:35448 => /172.22.4.21:4141] CONNECTED: /172.22.4.21:35448

Agent3

2022-03-11 22:46:08,010 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k2
2022-03-11 22:46:08,012 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:207)] Starting Source r2
2022-03-11 22:46:08,014 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:193)] Starting Avro source r2: { bindAddress: 172.22.4.21, port: 5151 }...
2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2022-03-11 22:46:08,018 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r2: Successfully registered new MBean.
2022-03-11 22:46:08,689 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r2 started
2022-03-11 22:46:08,699 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:219)] Avro source r2 started.

# Agent1启动后
2022-03-11 22:48:49,210 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] OPEN
2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] BOUND: /172.22.4.21:5151
2022-03-11 22:48:49,215 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x05a177f3, /172.22.4.21:55952 => /172.22.4.21:5151] CONNECTED: /172.22.4.21:55952

故障转移Failover测试

由于node02权重值大于node03,故node02优先采集数据上传HDFS。

当node02宕机,node03将复制采集数据上传HDFS。

当node02服务恢复,node02将优先采集数据上传HDFS。

Flume的负载均衡

Flume的负载均衡就是:某Agent路由节点,负责将 Channel 暂存的Event 均衡到对应的多个 Sink组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上

参考flume高可用Agent1的配置开启Flume的负载均衡

# 配置sink组的参数;failover:开启故障转移
agent1.sinkgroups.g1.processor.type = failover
# sink权值设置,权值高的优先接收数据
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000

# 负载均衡
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
# 轮询
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

Agent2与Agent3配置参考flume高可用的Agent2余Agent3;最终,当Agent1采集数据时,Agent2与Agent3轮询接受数据

Flume拦截器

配置数据采集Agent1

vim interceptors1.con

# 代表source源有3个
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# 第一个source源配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/program/flume/tempData/test1.txt
# 指定拦截器名称
a1.sources.r1.interceptors = i1
# 拦截器类型
a1.sources.r1.interceptors.i1.type = static
## static拦截器作用:往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = test1

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /usr/local/program/flume/tempData/test2.txt
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = test2

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /usr/local/program/flume/tempData/test3.txt
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = test3

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node01
a1.sinks.k1.port = 41414

a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

配置数据接受Agent2

vim interceptors2.conf

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = node01
a2.sources.r1.port =41414

# 添加时间拦截器
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

a2.channels.c1.type = memory
a2.channels.c1.capacity = 20000
a21.channels.c1.transactionCapacity = 10000


a2.sinks.k1.type = hdfs
# %{type}%获取Agent1中配置的Key-Value对的值
a2.sinks.k1.hdfs.path=hdfs://node001:9000/flume/interceptors/files/%{type}%/%y-%m-%d/%H%M/
a2.sinks.k1.hdfs.filePrefix =events
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.writeFormat = Text

# 时间类型,使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a21.sinks.k1.hdfs.rollCount = 0
# 生成的文件按时间生成
a2.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a2.sinks.k1.hdfs.rollSize = 10485760
# 批量写入hdfs的个数
a2a1.sinks.k1.hdfs.batchSize = 10000
# flume操作hdfs的线程数(包括新建,写入等)
a21.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a2.sinks.k1.hdfs.callTimeout=30000

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

启动Agent

bin/flume-ng agent -c conf -f conf/interceptors2.conf -n a2 -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf -f conf/interceptors1.conf -n a1 -Dflume.root.logger=INFO,console

执行测试

[root@administrator tempData]# echo "Hello world"  >> test1.txt
[root@administrator tempData]# echo "Hello world"  >> test2.txt
[root@administrator tempData]# echo "Hello world"  >> test3.txt

在这里插入图片描述

Flume的监控

Ganglia是一款大规模集群监控软件,适合分布式集群或并行集群监控

官网地址:http://ganglia.info/

安装Ganglia

使用Ganglia镜像创建容器配置Flume监控或安装Ganglia相关软件进行Flume的监控

拉取Ganglia相关镜像

docker pull jackchencoding/ganglia

创建Ganglia容器

docker run -itd --name ganglia -p 8080:80 -p 8081:8649  --privileged=true ganglia:latest

配置Flume

修改/usr/local/program/flume/conf目录下的flume-env.sh,进行如下配置:

JAVA_OPTS="-Dflume.monitoring.type=ganglia  -Dflume.monitoring.hosts=IP:8649 -Xms100m -Xmx200m"

启动Flume任务

bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

执行测试

访问:IP:8080
在这里插入图片描述

访问:IP:8080/ganglia,模拟发送数据,操作Flume测试监控

[root@administrator ~]# telnet node001 44444
Trying  172.22.4.21...
Connected to node01.
Escape character is '^]'.
Hello World
OK

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136953.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!