Hadoop 中 MapReduce 快速入门

因为研究生的方向是数据挖掘,所以免不了要接触到 Hadoop,Hadoop 是一个用 Java 语言实现开源软件框架,通过大量计算机组成的集群对海量数据进行分布式计算。

Hadoop 中两个重要组成部分为 HDFS 和 MapReduce。其中 HDFS 用于存储海量的数据,MapRudece 则负责处理这些数据,从中获取所需的信息。

HDFS 简单介绍

HDFS(Hadoop Distributed File System)翻译过来就是 "Hadoop 分布式文件系统 ",用于存储海量的数据。从 “分布式文件系统” 的名字可以知道这个文件系统运行在集群上。对于一个文件,Hadoop 会将其先分成若干个 block(每个 block 的大小默认为 64M, 当然也可以自己指定 block 的大小),然后再将 block 存储到集群上。为了保证数据的冗余性,HDFS 会为每个 block 创建 2 个副本,然后将这三个相同的 block 分别存储在不同的机器上。

例如下图就是将 data1 分成了 1、2、3 共三个 block,为每个 block 创建副本后再存储在不同的机器上;同理将 data2 分成了 4、5 共两个 block

MapReduce 介绍

有了数据就可以对其进行处理,从中提取出我们所需的信息。在 Hadoop 中是通过 MapReduce 来实现的。

MapReduce 任务过程被分为两个阶段:Map 阶段和 Reduce 阶段。每个阶段都用 key/value 作为输入和输出;每个阶段都需要定义函数,也就是 map 函数和 reduce 函数;可以简单认为 map 函数是对原始数据提出出有用的部分,而 reduce 函数则是对提取出来的数据进行处理。

所以实际编写程序时需要编写三个函数:Map 函数,Reduce 函数和调用他们执行任务的主函数,在编写程序时必须要有这个整体的概念

下面会以 Hadoop 官方文档中的 WordCount 任务为例阐述 MapReduce,WordCount 的任务很简单,就是计算出一个文本中每个单词出现了多少次。下面分别来分析这几个函数:

需要注意的而是在编写这三个函数时均需要用到 Hadoop 本身提供的 jar 包
下面的实例是 Hadoop 1.2.1 版本提供的 jar 包

Map 函数

在本例中 map 函数的主要作用就是以 k-v 形式记录所有出现过的词,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/*  
*WordCount的map程序
*/

package com.lc.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

//引入Hadoop本身提供的jar包
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*继承Mapper类,<Object,Text,Text,IntWritable>表示输入输出的key-value 类型*/
public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
IntWritable one=new IntWritable(1);
Text text=new Text();

public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
/*key为输入的key,value为输入的value,因为用不上输入的key的类型,所以直接定义为Object类型,而Context是定义在Mapper类内部的,用于存储key-value键值对*/
StringTokenizer tokenizer=new StringTokenizer(value.toString());
while(tokenizer.hasMoreTokens()){
text.set(tokenizer.nextToken());
context.write(text,one);
}
}
}
```
关于程序的几点解释:

- StringTokenizer类的作用是根据某一分隔符将String分隔开,默认是采用空格。
- IntWritable 类表示的是一个整数,是一个以类表示的**可序列化的整数**
- Text 类代表的是**可序列化的String类型**
- Mapper 类将输入键值对映射到输出键值对,也就是 MapReduce 里的 Map 过程

经过map过程后,文章被分割成大量的k-v对,k为实际的单词,v均为1,下一步就是要将相同的单词合并在一起。

### Reduce函数
Reduce函数的作用就是将相同的单词出现的次数合并在一起,代码如下:

```java
/*
*WordCount的reduce程序
*/

package com.lc.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CountReducer extends Reducer<Text ,IntWritable,Text,IntWritable>{

IntWritable result=new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum=0;
for(IntWritable iw : values){
sum+=iw.get();
}
result.set(sum);
context.write(key,result);
}
}
```
Reduce与Map函数有很多地方比较相似,均是继承了hadoop提供的jar包中的类,只是map函数继承了Mapper类,而reduce函数继承了Reducer类,输入输出的类型均是k-v键值对。而且reduce函数的输入就是map函数的输出。

### 主函数
主函数的任务就是要创建一个任务,并且把map和reduce类都引进来,代码如下:

```java
/*
*WordCount的主程序
*/

package com.lc.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount{

public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();//从hadoop配置文件中读取参数
//从命令行读取参数
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length!=2){
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}

Job job=new Job(conf,"WordCount");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(CountReducer.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit( (job.waitForCompletion(true)?0:1));

}
}

关于程序有几点需要注意的地方:

  • Configuration 类用于读写和保存各种配置资源

  • Path 类保存文件或者目录的路径字符串

  • Job 类:在 hadoop 中每个需要执行的任务是一个 Job,这个 Job 负责很多事情,包括参数配置,设置
    MapReduce 细节,提交到 Hadoop 集群,执行控制,查询执行状态,等等

  • FileInputFormat 和 FileOutputFormat 用于处理文件的输入和输入(针对 MapReduce 而言)

  • GenericOptionsParser 类负责解析 hadoop 的命令行参数

执行任务

编写好源程序后,需要在 hadoop 上执行我们在源程序中写好的代码,大致的过程如下:编译->打包->执行,下面分别介绍。为了程序的规范性,首先建立一个 wordcount 的文件夹,下面再建两个子文件夹 src 和 classes,分别放置源程序文件和编译好后的 class 文件。且默认是在 Linux 上执行这些操作的。

编译

首先将上面写好的三个源文件放到 wordcount 的 src 目录下,同时拷贝安装 hadoop 后提供的两个 jar 包 hadoop-core-1.2.1.jar 和 commons-cli-1.2.jar。进入 wordcount 目录,采用下面命令进行编译

javac -classpath hadoop-core-1.2.1.jar:commons-cli-1.2.jar -d ./classes ./src/*

这条命令的作用是将 src 目录下的所有文件进行编译,生成的 class 文件放到 classes 目录下,编译过程中需要引入的 hadoop-core-1.2.1.jar 和 commons-cli-1.2.jar 两个包,里面包含了上面源文件中导入的 hadoop 的类。
编译完成后,可以在 classes 目录下发现以下子目录的结构 classes->com->lc->hadoop, 最后在目录 hadoop 下会有三个 class 文件,分别对应上面的的三个源文件。

打包

打包需要用到 jar 命令,jar 命令是 JDK 的打包命令行工具,跟 tar 非常像。

先切换到 WordCount 目录,再执行下面的命令:

jar -cvf WordCount.jar -C ./classes/* .

在命令里,-C 是指需要打包的 class 文件的路径,。打包结果是 wordcount.jar 文件,放在当前目录下。

执行

执行 hadoop 任务需要在 HDFS 上进行,所以文件的输入输出路径也就是在 HDFS 上的路径

首先需要将待处理的文件放入到 HDFS 中,可以按顺序输入以下命令:
hadoop fs -mkdir in // 在 HDFS 中创建一个名为 in 的文件夹

hadoop fs -put Readme.txt readme.txt // 将 Linux 当前目录下的 Readme.txt 文件放置到 HDFS 中的 in 目录

hadoop jar WordCount.jar com.lc.hadoop WordCount in/readme.txt out// 执行 Linux 当前目录下的 WordCount
jar 包里面的 WordCount 类,输入文件是 HDFS 中 in 目录下的 readme.txt 文件,输出文件放到 HDFS 中的 out 目录

hadoop fs -cat out/part-r-0000 // 查看得到的结果

需要注意的是 HDFS 中的文件路径不能够在 Linux 下直接通过 cdls 进行切换或查看,而必须要通过 hadoop fs 进行操作。

以上就是 Hadoop 中 MapReduce 的流程,针对不同的应用会有不同的变化,但是总体上的流程是一致的,就是先编写好三个函数(map 函数,reduce 函数和主函数),然后要经历编译->打包->执行的流程。再查看得到的结果即可。

参考资料:最短路径系列之一从零开始学习 Hadoop