Java 版本的 MapReduce 入门作业:WordCount

上一节我们搭好了 Hadoop 2.7.4 环境,其中包括分布式存储系统 HDFS资源管理平台 YARN,这一节在此基础上我们开发一个 MapReduce 作业:WordCount

什么是 WordCount ?

我们首先提一个实际的需求,比如要分析今年十九大报告中的热词,什么是热词?就是那些出现频率高的词汇。首先将整篇报告分词,然后统计每个词语的词频,最后再进行排序,这样就可以得到热词了。可以看到这个过程中统计词频是很关键的一步。WordCount 就是这样的一个小应用,我们以英语文本为例,英文是不需要分词的,我们只需要统计每个单词出现的次数即可。

比如『hello world hello java』,那么输出结果该是

hello 2

world 1

java 1

首先创建一个 maven 工程,导入以下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>

hadoop-common 包含了基础组件,client-core 包含了 MR 计算模型,lombok 是一个 Java 开发神器,可以简化代码等等一系列神器的功能

少说废话直接上代码 WordCountApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package lab.zlren.hadoop;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 使用MapReduce开发WordCount应用
*
* @author zlren
* @date 2017-11-17
*/
@Slf4j
public class WordCountApp {

public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

LongWritable one = new LongWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
// 通过context将map作业的结果输出
context.write(new Text(word), one);
}
}
}

/**
* 归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

/**
* @param key
* @param values shuffle之后,这里是个集合
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
// 将次数累加
sum += value.get();
}

// 结果交给上下文
context.write(key, new LongWritable(sum));
}
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcount");

job.setJarByClass(WordCountApp.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

log.info("哈哈");
}
}

MR 计算模型有着严格的规定,必须是 Map 操作 + Reduce 操作,定义的两个类 MyMap 和 MyReducer 分别继承了 Mapper 和 Reducer,注意我们使用的是 Hadoop v2,包的路径是 org.apache.hadoop.mapreduce.*,这一点要注意,不要导入错误了。

我们先来看 Mapper,分析其源码可知有 setup、map、cleanup 和 run 四个方法。其中 setup 一般是用来进行一些 map 前的准备工作,map 承担主要的处理工作,cleanup 是收尾工作如关闭文件或者执行 map 后的键值对分发等。run 方法提供了 setup->map->cleanup 的执行模板。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}

/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}

/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}

/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
*
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

对于 MyMapper,输入的 key 是偏移量(LongWritable),value 是这一行的文本内容(Text),我们要做的是对这一行文本根据空格进行切分,然后将切分出来的每个单词映射成(key,1)的形式。读者可以思考下为什么要这样做

对于 MyReducer,输入的 key 是每个单词,value 是固定值 1,我们要做的是将这些 1 累加起来,这样就计算出来每个单词出现的频率。到这里你可能会问,为什么可以直接累加?你怎么知道这些 key 都一样?

这就要提到 map 和 reduce 阶段中间的 shuffle 过程,shuffle 保证 key 相同的键值对会被分发给同一个 reducer,所以我们才可以直接累加。所以写 MR 应用的时候,头脑中要时刻结合物理执行流程进行思考

最后我们打包应用,由于我们是 maven 工程,可以直接用下面的命令打包

1
mvn clean package

在项目的 target 文件夹下会出现打包好的 jar 文件,将其上传到 Hadoop 服务器,运行如下命令

1
hadoop jar hadoop-learn-1.0.jar lab.zlren.hadoop.WordCountApp hdfs://data:9000/hello.txt hdfs://data:9000/wc/

在程序中我们将输入和输出设置成可配置的,所以最后的两个参数就是输入文件和输出文件夹的路径,需要提前将一个文本文件上传到 HDFS 上

运行完成后结果如下

查看结果文件