基于上一节搭建的 Spark 环境,这一节我们继续搭建 Flume 相关的环境。在开始之前我们先介绍一个什么是 Flume?引用官网对于 Flume 的阐述:
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
简而言之:Flume 是一个 分布式的
、可靠的
和 可用的
服务,用于 收集
、聚合
和 移动
大规模的数据
本节搭建的 Flume 版本为 1.7.0
搭建过程
JDK
Java 的安装可以参考《可能是坑最少的Spark环境搭建教程2》一节,安装好以后记得检查一下 Java 的版本,确保为 1.8
1 | java -version |
出现以下表示 JDK 的配置没问题
解压、安装
在系统的 /opt 目录下找到之前存放好的安装文件:apache-flume-1.7.0-bin.tar.gz
执行以下命令,将 flume 安装包解压到 /root 下
1 | tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /root/ |
配置环境变量
1 | vim /etc/profile |
1 | export FLUME_HOME=/root/apache-flume-1.7.0-bin |
修改完成后使之生效
1 | source /etc/profile |
配置 Flume
首先进入 flume 的配置文件目录
1 | cd /root/apache-flume-1.7.0-bin/conf |
修改 flume-env.sh
1 | vim flume-env.sh |
配置 JAVA_HOME
路径
至此就基本配置好了,是不是很简单?这里我们可以通过查看 Flume 的版本来对其进行简单的检查
1 | flume-ng version |
这里可以看到输出了 1.7.0,表示 flume 基本配置没问题
flume 的安装配置较为基础,接下来通过几个小案例来简单说明 flume 的使用。在这之前我们要了解 flume 的组成,flume 以 agent 为基本单位,每个 agent 由三个组件构成,它们是
- source
- channel
- sink
从名字也可以看出,source 指定了数据来源,channel 是数据的通道,最后 sink 将收集的数据送到目的地址。这是一个 agent 的示意图。在下图中,数据源为 Web Server 产生的日志,首先通过 source 组件对日志进行收集,载通过 channel 组件送到 sink 中,可以看出这是一个 HDFS 类型的 sink,最终会把收集好的日志数据持续地保存到 HDFS 中,便于进行下一步的分析处理。
使用案例
这一节通过一个比较简单的案例来讲解如何开发 flume,实际上 flume 的开发就是写配置文件。从以上知道 flume 要配置 agent,根据组件构成我们要做的工作包含:
- 定义一个 agent
- 具体声明 agent 中的三大组件:source、channel 和 sink
- 最后将三个组件串起来
这个案例的需求很简单:
- 从指定网络端口采集数据输出到控制台
第一步我们定义一个 agent,并且定义好此 agent 中包含的三个组件
下面这个文件以 a1 开头,a1 就是这个 agent 的名称,这三行分别指定了组件的名称为 r1、k1 和 c1
1 | a1.sources = r1 |
定义 source,这是一个 netcat 类型的 source,它监听本地的 44444 端口
1 | a1.sources.r1.type = netcat |
定义 sink,这里比较简单,将收集的数据直接打印在控制台上就好了,这是一个 logger 类型的 sink
1 | a1.sinks.k1.type = logger |
最后定义 channel,这里使用内存类型的
1 | a1.channels.c1.type = memory |
最后一步,将三个组件的关系串起来
1 | a1.sources.r1.channels = c1 |
观察最后一步的书写,一个 source 是可以将收集的数据传送给多个 channel 的,所以 r1 后面的 channel 是复数,而一个 sink 只能从一个 channel 中取数据,所以 channel 是单数
将以上配置内容存放在一个文件中 example.conf
1 | a1.sources = r1 |
执行脚本
1 | flume-ng agent \ |
这里简单解释下:name 指定的是 agent 的名字,conf 需要指定配置文件,默认就是 flume 目录下的 conf 文件夹,conf-file 需要指定我们自己编写的 conf 文件,最后指定本次运行的 flume 程序的 log 级别为 INFO,形式为控制台
看到标红处,表示已经成功监听了本地端口
再打开一个终端,使用 telnet 命令向本地 44444 端口发送数据
1 | telnet localhost 44444 |
每发送一次信息就看一下 flume 的日志输出,正常情况下 flume 就会捕获到用户的输入,由于 sink 指定的是控制台,所以我们会在控制台上看到在 telnet 处的输入
到这里 flume 的一个入门级的案例就完成了
拓展
实际上 flume agent 的组织形式多种多样,这里简单列举几个
- agent 连接到另一个 agent 上
- 一个 agent 收集多个 agent 的数据来源,这种情况下对于实际使用场景是很有帮助的
flume 灵活的组织结构和对多种数据来源、数据去向(source 和 sink 的类型)的支持,使得它的用途较为广泛。在生产环境中往往将 sink 输出到 kafka 中缓冲,进一步用 Spark Streaming 进行实时的流计算
请读者思考,如何利用 flume 实现对一个文件的增量监控?