上一节我们搭好了 Hadoop 2.7.4 环境,其中包括分布式存储系统 HDFS 和资源管理平台 YARN ,这一节在此基础上我们开发一个 MapReduce 作业:WordCount
什么是 WordCount ?
我们首先提一个实际的需求,比如要分析今年十九大报告中的热词,什么是热词?就是那些出现频率高的词汇。首先将整篇报告分词,然后统计每个词语的词频,最后再进行排序,这样就可以得到热词了。可以看到这个过程中统计词频是很关键的一步。WordCount 就是这样的一个小应用,我们以英语文本为例,英文是不需要分词的,我们只需要统计每个单词出现的次数即可。
比如『hello world hello java』,那么输出结果该是
hello 2
world 1
java 1
首先创建一个 maven 工程,导入以下依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-common</artifactId > <version > 2.7.4</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-mapreduce-client-core</artifactId > <version > 2.7.4</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.16.14</version > </dependency > </dependencies >
hadoop-common 包含了基础组件,client-core 包含了 MR 计算模型,lombok 是一个 Java 开发神器,可以简化代码等等一系列神器的功能
少说废话直接上代码 WordCountApp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package lab.zlren.hadoop;import lombok.extern.slf4j.Slf4j;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;@Slf 4jpublic class WordCountApp { public static class MyMapper extends Mapper <LongWritable , Text , Text , LongWritable > { LongWritable one = new LongWritable(1 ); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" " ); for (String word : words) { context.write(new Text(word), one); } } } public static class MyReducer extends Reducer <Text , LongWritable , Text , LongWritable > { @Override protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0 ; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "wordcount" ); job.setJarByClass(WordCountApp.class); FileInputFormat.setInputPaths(job, new Path(args[0 ])); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1 ])); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); log.info("哈哈" ); } }
MR 计算模型有着严格的规定,必须是 Map 操作 + Reduce 操作,定义的两个类 MyMap 和 MyReducer 分别继承了 Mapper 和 Reducer,注意我们使用的是 Hadoop v2,包的路径是 org.apache.hadoop.mapreduce.*
,这一点要注意,不要导入错误了。
我们先来看 Mapper,分析其源码可知有 setup、map、cleanup 和 run 四个方法。其中 setup 一般是用来进行一些 map 前的准备工作,map 承担主要的处理工作,cleanup 是收尾工作如关闭文件或者执行 map 后的键值对分发等。run 方法提供了 setup->map->cleanup 的执行模板。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 protected void setup (Context context ) throws IOException, InterruptedException { } @SuppressWarnings ("unchecked" )protected void map (KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } protected void cleanup (Context context ) throws IOException, InterruptedException { } public void run (Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
对于 MyMapper,输入的 key 是偏移量(LongWritable),value 是这一行的文本内容(Text),我们要做的是对这一行文本根据空格进行切分,然后将切分出来的每个单词映射成(key,1)的形式。读者可以思考下为什么要这样做
对于 MyReducer,输入的 key 是每个单词,value 是固定值 1,我们要做的是将这些 1 累加起来,这样就计算出来每个单词出现的频率。到这里你可能会问,为什么可以直接累加?你怎么知道这些 key 都一样?
这就要提到 map 和 reduce 阶段中间的 shuffle 过程,shuffle 保证 key 相同的键值对会被分发给同一个 reducer,所以我们才可以直接累加。所以写 MR 应用的时候,头脑中要时刻结合物理执行流程进行思考
最后我们打包应用,由于我们是 maven 工程,可以直接用下面的命令打包
在项目的 target 文件夹下会出现打包好的 jar 文件,将其上传到 Hadoop 服务器,运行如下命令
1 hadoop jar hadoop-learn-1.0.jar lab.zlren.hadoop.WordCountApp hdfs://data:9000/hello.txt hdfs://data:9000/wc/
在程序中我们将输入和输出设置成可配置的,所以最后的两个参数就是输入文件和输出文件夹的路径,需要提前将一个文本文件上传到 HDFS 上
运行完成后结果如下
查看结果文件