Flume定义
- Flume最初是 Cloudera 公司推出的一个高可用、高可靠的,分布式的海量日志采集、聚合和传输的系统,于2009年被捐赠给了Apche基金会,成为Hadoop相关组件之一
- Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时Flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力
- Fluem基于流式架构,灵活简单
Flume基本架构
Flume的基本组件包括 Event 和 Agent
Event
- Event 是Flume中具有有效负载的字节数据流和可选的字符串属性集,是Flume传送数据的基本单位
- Event由 Header 和 Body 组成。
- Header是一个Map<String, Stirng> ,存储字符串属性集,为K-V结构
- Body是一个字节数组,存储字节数据
简单来说,Event其实就是Flume框架作者写的一个类似于类的东西
Agent
Flume运行的核心是 Agent。Flume 已 Agent 为最小的独立运行单位,一个Agent 就是一个JVM(java虚拟机,Java Virtual Machine),它是一个完整的数据采集工具,包含三个核心组件,分别是
- Source(数据源)
- Channel(数据通道)
- Sink(数据槽)
Agent主要功能:以事件的形式将数据从源头送至目的地
Source
-
Source 是负责接收数据到Flume Agent 的组件,即 Source是数据的接收端,将数据捕获后进行特殊的格式化,将数据封装到 Event 里,然后将Event推入到 Channel。
-
Source组件可以处理各种类型、各种格式的日志数据,如下
类型 | 简介 |
---|---|
Netcat Source | 监控某个端口,读取流经端口的每一个文本行数据 |
Exec Source | Source启动的时候会运行一个设置好的Linux命令,该命令不断往标准输出(stdout)中输出数据,这些数据被打包成Event进行处理**(该source不支持断点续传)** |
Spooling Directoy Source | 监听指定目录,当该目录有新文件出现时,把文件的数据打包成 Event进行处理**(该source支持断点续传,但是时效性不太好)** |
Syslog Source | 读取 Sylog数据,产生Event,支持UDP和TCP两种协议 |
Stress Source | 用于可以配置要发送的事件总数以及要传递的最大事件数,多用于负载测试 |
HTTP Source | 基于 HTTP POST或 GET方式的数据源,支持JSON、BLOB表示形式 |
Avro Source | 支持Avro RPC协议,提供了一个 Avro的接口,往设置的地址和端口发送Avro消息,Source就能接收到,例如,Log4j Appender通过Avro Source 将消息发送到Agent |
Taildir Source | 监听实时追加内容的文件 |
Thrift Souce | 支持 Thrift 协议,提供一个 Thrift接口,类似Avro |
JMS Source | 从Java消息服务读取数据 |
Kafka Source | 从Kafka消息队列中读取中数据,官方描述:Kafka Source 其实就是一个 Kafka Consumer |
- 每个Source 可以发送Event到多个Channel中
Sink
- Sink 是不断地轮询 Channel 中事件(Event)并且批量地移除它们,并将这些事件(Event)批量写入到存储或索引系统、或者被发送到另一个 Flume Agent
- Sink常见类型如下
类型 | 简介 |
---|---|
HDFS Sink | 将数据写入HDFS,默认格式为 SequenceFile |
Logger Sink | 将数据写入日志文件 |
Hive Sink | 将数据写入Hive |
File Roll Sink | 将数据存储到本地文件系统,多用作数据收集 |
HBase Sink | 将数据写入到HBase |
Thrift Sink | 将数据转换到Thrift Event后,发送到配置的RPC端口上 |
Avro Sink | 将数据转换到Avro Event后,发送到配置的RPC端口上 |
Null Sink | 丢弃所有的数据 |
ElasticSearch Sink | 将数据发送到 ElasticSearch 集群上 |
Kite Dataset Sink | 写数据到 Kite Dataset,试验性质 |
Kafka Sink | 官方描述:Kafka Sink 能向Kafka 的topic 写入数据(Kafka Sink其实就是Kafka Producer 的实现) |
- 每个Sink只能从一个Channel中获取数据
Channel
-
Channel 是位于 Source 和 Sink 之间的组件,可以看做是数据的缓冲区(数据队列),它可以将事件暂存到内存中,也可以将事件持久化到本地磁盘上,直到 Sink处理完该事件。
-
Channel 允许 Source 和 Sink运作在不同的速率上。
-
Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作
-
Channel的常见类型如下 (其中 Memory、File是Flume自带的)
类型 | 简介 |
---|---|
Memory Channel | 数据存储到内存的队列中,可以实现高速的数据吞吐,Flume出现故障时,数据会丢失 |
File Channel | 数据存储到磁盘文件中,可以持久化所有的Event,Flume出现故障时,数据不会丢失**(该File在内存中有索引机制,加快读取速率,并且索引会在磁盘做两次备份)** |
JDBC Channel | 数据持久化到数据库中 |
Kafka Channel | 数据存储到Kafka集群中 |
Custom Channel | 自定义Channel |
小结
Flume 提供了大量内置的 Source、Channel、Sink 类型。不同类型的 Source、Channel、Sink 可以自由组合。组合方式基于用户设置的配置文件,非常灵活。例如,Channel可以把Event(事件)暂存在内存里,也可以将Event(事件)持久化到本地磁盘上;Channel 可以把日志写入HDFS、HBase,甚至另外一个 Source。
Flume 安装部署
1)上次压缩包
将 apache-flume-1.9.0-bin.tar.gz 压缩包上传至Linux 的 /opt/software目录下
2)解压压缩包
将 apache-flume-1.9.0-bin.tar.gz 解压到 /opt/software目录下
[root@kk01 software]# pwd
/opt/software
[root@kk01 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/software/
3)将apache-flume-1.9.0-bin 重命名为 flume
因为flume只是一个工具,它的版本没有那么重要,因此我们可以改名
[root@kk01 software]# mv apache-flume-1.9.0-bin flume
4)将 lib 目录下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.2.2
[root@kk01 software]# rm /opt/software/flume/lib/guava-11.0.2.jar
注意:删除guava-11.0.2.jar的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 1 more
5)修改 conf 目录下的 log4j.properties 确定日志打印的位置
[root@kk01 software]# cd /opt/software/flume/conf/
[root@kk01 conf]# vim log4j.properties
# 修改内容如下
# console表示同时将日志输出到控制台
flume.root.logger=INFO,LOGFILE,console
# 固定日志输出的位置
flume.log.dir=/opt/software/flume/logs
# 日志文件的名称
flume.log.file=flume.log
6)配置flume环境变量
[root@kk01 conf]# vim /etc/profile
# 在文件末尾加上如下内容
# flume环境变量
export FLUME_HOME=/opt/software/flume
export PATH=$PATH:$FLUME_HOME/bin
# 使环境变量生效
[root@kk01 conf]# source /etc/profile
7)将flume-env.ps1.template 重命名为 flume-env.ps1
在flume安装目录下的conf目录下有个 flume-env.ps1.template文件,需要将它重命名为flume-env.ps1,有两种做法:
(保守做法)将flume-env.ps1.template 文件 复制出 flume-env.ps1
(常规做法)将flume-env.ps1.template 文件 重命名为 flume-env.ps1
# 下面我们采取保守做法
/opt/software/flume/conf
[root@kk01 conf]# cp flume-env.ps1.template flume-env.ps1
[root@kk01 conf]# ll
total 20
-rw-r--r--. 1 nhk nhk 1661 Nov 16 2017 flume-conf.properties.template
-rw-r--r--. 1 root root 1455 May 11 23:10 flume-env.ps1 # 我们复制出来的文件
-rw-r--r--. 1 nhk nhk 1455 Nov 16 2017 flume-env.ps1.template
-rw-r--r--. 1 nhk nhk 1568 Aug 30 2018 flume-env.sh.template
-rw-rw-r--. 1 nhk nhk 3237 May 11 22:57 log4j.properties
8)将 flume-env.sh.template 重命名为 flume-env.sh
在flume安装目录下的conf目录下有个 flume-env.sh.template文件,需要将它重命名为flume-env.sh,有两种做法:
(保守做法)将flume-env.sh.template 文件 复制出 flume-env.sh
(常规做法)将flume-env.sh.template 文件 重命名为 flume-env.sh
# 下面我们采取保守做法
[root@kk01 conf]# cp flume-env.sh.template flume-env.sh
[root@kk01 conf]# ll
total 24
-rw-r--r--. 1 nhk nhk 1661 Nov 16 2017 flume-conf.properties.template
-rw-r--r--. 1 root root 1455 May 11 23:10 flume-env.ps1
-rw-r--r--. 1 nhk nhk 1455 Nov 16 2017 flume-env.ps1.template
-rw-r--r--. 1 root root 1568 May 11 23:12 flume-env.sh
-rw-r--r--. 1 nhk nhk 1568 Aug 30 2018 flume-env.sh.template
-rw-rw-r--. 1 nhk nhk 3237 May 11 22:57 log4j.properties
9)修改 flume-env.sh
[root@kk01 conf]# vim flume-env.sh
# 在文件内添加如下内容
export JAVA_HOME=/opt/software/jdk1.8.0_152
10)查看Flume版本信息
[root@kk01 conf]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
至此,Flume部署完成
Flume 数据流模型
Flume 常见的数据流模型有 单Agent数据流模型、多Agent串行数据流模型、多Agent汇聚数据流模型、单Agent多路数据流模型
1) 单Agent数据流模型
单Agent数据流模型,一个Agent由一个Source、一个Channel、一个Sink组成
2)多Agent串行数据流模型
假设有两个Agent,Agent1、Agent2,为了使数据在多个Agent(我们以两个为例)中流通,Agent1中的Sink 和 Agent2 中的 Source 需要是Avro类型,Agent1中的Sink 指向 Agent2 这的 Source的主机名(或IP地址)和端口
3)多Agent汇聚数据流模型
多Agent汇聚数据流模型是采集大量日志时常用的数据流模型。例如,将从数百个web服务器采集的日志数据发送给写入HDFS集群中的十几个Agent,此时就可以采用此模型。
4)单Agent多路数据流模型
单Agent多路数据流模型,一个Agent由一个Source、多个Channel、一个Sink组成。
一个Source接收Event,将 Event 发送到 多个Channel中,Channel对应的Sink处理各自Channel 内的 Event,然后将数据分别存储到指定的位置。
Source 将 Event 发送到 Channel 中可以采取两种不同的策略:
-
Replicating(复制通道选择器):即Source将每个Event发送到每个与它连接的 Channel 中,也就是将 Event复制多份发送到不同的 Channel 中
-
Multiplexing(多路复用通道选择器):即Source根据Hader中的一个键决定将 Event 发送到哪个Channel中
Flume 案例
官方文档 https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
演示 Netcat Source
1)安装netcat
[root@kk01 ~]# yum install -y nc
2)测试netcat
打开两个终端,第一个用于向9999端口发送消息,开启后终端会卡住,表示进入了等待输入消息的状态。
[root@kk01 ~]# nc -lk 9999
查看端口9999是否被占用(可选)
[root@kk01 ~]# netstat -nlp | grep 9999
第二个用于接收9999端口收到的消息
[root@kk01 ~]# nc localhost 9999
第一个终端发送消息
[root@kk01 ~]# nc -lk 9999
hello
world
第二个终端收到了消息
[root@kk01 ~]# nc localhost 9999
hello
world
此致,就验证了nc的可用性,说明它可以进行双向通信
按 ctrl+z 退出
3)创建 Flume Agent 文件 nc-flume-log.conf
为方便之后的执行,该配置文件需要放在flume根目录的job(自己创建)的文件夹下,其他位置也行,后面命令相应变更即可。文章来源:https://www.toymoban.com/news/detail-516904.html
[root@kk01 flume]# pwd
/opt/software/flume
[root@kk01 flume]# mkdir job
[root@kk01 flume]# cd job/
[root@kk01 job]# vim nc-flume-log.conf
nc-flume-log.conf 文件内容如下文章来源地址https://www.toymoban.com/news/detail-516904.html
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 9999
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to
到了这里,关于flume快速上手的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!