Flume 搭建教程

基于上一节搭建的 Spark 环境,这一节我们继续搭建 Flume 相关的环境。在开始之前我们先介绍一个什么是 Flume?引用官网对于 Flume 的阐述:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

简而言之:Flume 是一个 分布式的可靠的可用的 服务,用于 收集聚合移动 大规模的数据

本节搭建的 Flume 版本为 1.7.0

搭建过程

JDK

Java 的安装可以参考《可能是坑最少的Spark环境搭建教程2》一节,安装好以后记得检查一下 Java 的版本,确保为 1.8

1
java -version

出现以下表示 JDK 的配置没问题

解压、安装

在系统的 /opt 目录下找到之前存放好的安装文件:apache-flume-1.7.0-bin.tar.gz

执行以下命令,将 flume 安装包解压到 /root 下

1
tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /root/

配置环境变量

1
vim /etc/profile
1
export FLUME_HOME=/root/apache-flume-1.7.0-bin

修改完成后使之生效

1
source /etc/profile
配置 Flume

首先进入 flume 的配置文件目录

1
2
cd /root/apache-flume-1.7.0-bin/conf
cp flume-env.sh.template flume-env.sh

修改 flume-env.sh

1
vim flume-env.sh

配置 JAVA_HOME 路径

至此就基本配置好了,是不是很简单?这里我们可以通过查看 Flume 的版本来对其进行简单的检查

1
flume-ng version

这里可以看到输出了 1.7.0,表示 flume 基本配置没问题

flume 的安装配置较为基础,接下来通过几个小案例来简单说明 flume 的使用。在这之前我们要了解 flume 的组成,flume 以 agent 为基本单位,每个 agent 由三个组件构成,它们是

  • source
  • channel
  • sink

从名字也可以看出,source 指定了数据来源,channel 是数据的通道,最后 sink 将收集的数据送到目的地址。这是一个 agent 的示意图。在下图中,数据源为 Web Server 产生的日志,首先通过 source 组件对日志进行收集,载通过 channel 组件送到 sink 中,可以看出这是一个 HDFS 类型的 sink,最终会把收集好的日志数据持续地保存到 HDFS 中,便于进行下一步的分析处理。

使用案例

这一节通过一个比较简单的案例来讲解如何开发 flume,实际上 flume 的开发就是写配置文件。从以上知道 flume 要配置 agent,根据组件构成我们要做的工作包含:

  • 定义一个 agent
  • 具体声明 agent 中的三大组件:source、channel 和 sink
  • 最后将三个组件串起来

这个案例的需求很简单:

  • 从指定网络端口采集数据输出到控制台

第一步我们定义一个 agent,并且定义好此 agent 中包含的三个组件

下面这个文件以 a1 开头,a1 就是这个 agent 的名称,这三行分别指定了组件的名称为 r1、k1 和 c1

1
2
3
a1.sources = r1
a1.sinks = k1
a1.channels = c1

定义 source,这是一个 netcat 类型的 source,它监听本地的 44444 端口

1
2
3
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

定义 sink,这里比较简单,将收集的数据直接打印在控制台上就好了,这是一个 logger 类型的 sink

1
a1.sinks.k1.type = logger

最后定义 channel,这里使用内存类型的

1
a1.channels.c1.type = memory

最后一步,将三个组件的关系串起来

1
2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

观察最后一步的书写,一个 source 是可以将收集的数据传送给多个 channel 的,所以 r1 后面的 channel 是复数,而一个 sink 只能从一个 channel 中取数据,所以 channel 是单数

将以上配置内容存放在一个文件中 example.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行脚本

1
2
3
4
5
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file example.conf \
-Dflume.root.logger=INFO,console

这里简单解释下:name 指定的是 agent 的名字,conf 需要指定配置文件,默认就是 flume 目录下的 conf 文件夹,conf-file 需要指定我们自己编写的 conf 文件,最后指定本次运行的 flume 程序的 log 级别为 INFO,形式为控制台

看到标红处,表示已经成功监听了本地端口

再打开一个终端,使用 telnet 命令向本地 44444 端口发送数据

1
telnet localhost 44444

每发送一次信息就看一下 flume 的日志输出,正常情况下 flume 就会捕获到用户的输入,由于 sink 指定的是控制台,所以我们会在控制台上看到在 telnet 处的输入

到这里 flume 的一个入门级的案例就完成了

拓展

实际上 flume agent 的组织形式多种多样,这里简单列举几个

  • agent 连接到另一个 agent 上

  • 一个 agent 收集多个 agent 的数据来源,这种情况下对于实际使用场景是很有帮助的

flume 灵活的组织结构和对多种数据来源、数据去向(source 和 sink 的类型)的支持,使得它的用途较为广泛。在生产环境中往往将 sink 输出到 kafka 中缓冲,进一步用 Spark Streaming 进行实时的流计算

请读者思考,如何利用 flume 实现对一个文件的增量监控?