Hadoop 中 MapReduce 快速入门
因为研究生的方向是数据挖掘,所以免不了要接触到 Hadoop,Hadoop 是一个用 Java 语言实现开源软件框架,通过大量计算机组成的集群对海量数据进行分布式计算。
Hadoop 中两个重要组成部分为 HDFS 和 MapReduce。其中 HDFS 用于存储海量的数据,MapRudece 则负责处理这些数据,从中获取所需的信息。
HDFS 简单介绍
HDFS(Hadoop Distributed File System)翻译过来就是 "Hadoop 分布式文件系统 ",用于存储海量的数据。从 “分布式文件系统” 的名字可以知道这个文件系统运行在集群上。对于一个文件,Hadoop 会将其先分成若干个 block(每个 block 的大小默认为 64M, 当然也可以自己指定 block 的大小),然后再将 block 存储到集群上。为了保证数据的冗余性,HDFS 会为每个 block 创建 2 个副本,然后将这三个相同的 block 分别存储在不同的机器上。
例如下图就是将 data1 分成了 1、2、3 共三个 block,为每个 block 创建副本后再存储在不同的机器上;同理将 data2 分成了 4、5 共两个 block
MapReduce 介绍
有了数据就可以对其进行处理,从中提取出我们所需的信息。在 Hadoop 中是通过 MapReduce 来实现的。
MapReduce 任务过程被分为两个阶段:Map 阶段和 Reduce 阶段。每个阶段都用 key/value 作为输入和输出;每个阶段都需要定义函数,也就是 map 函数和 reduce 函数;可以简单认为 map 函数是对原始数据提出出有用的部分,而 reduce 函数则是对提取出来的数据进行处理。
所以实际编写程序时需要编写三个函数:Map 函数,Reduce 函数和调用他们执行任务的主函数,在编写程序时必须要有这个整体的概念。
下面会以 Hadoop 官方文档中的 WordCount 任务为例阐述 MapReduce,WordCount 的任务很简单,就是计算出一个文本中每个单词出现了多少次。下面分别来分析这几个函数:
需要注意的而是在编写这三个函数时均需要用到 Hadoop 本身提供的 jar 包
下面的实例是 Hadoop 1.2.1 版本提供的 jar 包
Map 函数
在本例中 map 函数的主要作用就是以 k-v 形式记录所有出现过的词,代码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114/*
*WordCount的map程序
*/
package com.lc.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
//引入Hadoop本身提供的jar包
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*继承Mapper类,<Object,Text,Text,IntWritable>表示输入输出的key-value 类型*/
public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
IntWritable one=new IntWritable(1);
Text text=new Text();
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
/*key为输入的key,value为输入的value,因为用不上输入的key的类型,所以直接定义为Object类型,而Context是定义在Mapper类内部的,用于存储key-value键值对*/
StringTokenizer tokenizer=new StringTokenizer(value.toString());
while(tokenizer.hasMoreTokens()){
text.set(tokenizer.nextToken());
context.write(text,one);
}
}
}
```
关于程序的几点解释:
- StringTokenizer类的作用是根据某一分隔符将String分隔开,默认是采用空格。
- IntWritable 类表示的是一个整数,是一个以类表示的**可序列化的整数**
- Text 类代表的是**可序列化的String类型**
- Mapper 类将输入键值对映射到输出键值对,也就是 MapReduce 里的 Map 过程
经过map过程后,文章被分割成大量的k-v对,k为实际的单词,v均为1,下一步就是要将相同的单词合并在一起。
### Reduce函数
Reduce函数的作用就是将相同的单词出现的次数合并在一起,代码如下:
```java
/*
*WordCount的reduce程序
*/
package com.lc.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CountReducer extends Reducer<Text ,IntWritable,Text,IntWritable>{
IntWritable result=new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum=0;
for(IntWritable iw : values){
sum+=iw.get();
}
result.set(sum);
context.write(key,result);
}
}
```
Reduce与Map函数有很多地方比较相似,均是继承了hadoop提供的jar包中的类,只是map函数继承了Mapper类,而reduce函数继承了Reducer类,输入输出的类型均是k-v键值对。而且reduce函数的输入就是map函数的输出。
### 主函数
主函数的任务就是要创建一个任务,并且把map和reduce类都引进来,代码如下:
```java
/*
*WordCount的主程序
*/
package com.lc.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount{
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();//从hadoop配置文件中读取参数
//从命令行读取参数
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2){
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}
Job job=new Job(conf,"WordCount");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(CountReducer.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit( (job.waitForCompletion(true)?0:1));
}
}
关于程序有几点需要注意的地方:
Configuration 类用于读写和保存各种配置资源
Path 类保存文件或者目录的路径字符串
Job 类:在 hadoop 中每个需要执行的任务是一个 Job,这个 Job 负责很多事情,包括参数配置,设置
MapReduce 细节,提交到 Hadoop 集群,执行控制,查询执行状态,等等FileInputFormat 和 FileOutputFormat 用于处理文件的输入和输入(针对 MapReduce 而言)
GenericOptionsParser 类负责解析 hadoop 的命令行参数
执行任务
编写好源程序后,需要在 hadoop 上执行我们在源程序中写好的代码,大致的过程如下:编译->打包->执行
,下面分别介绍。为了程序的规范性,首先建立一个 wordcount 的文件夹,下面再建两个子文件夹 src 和 classes,分别放置源程序文件和编译好后的 class 文件。且默认是在 Linux 上执行这些操作的。
编译
首先将上面写好的三个源文件放到 wordcount 的 src 目录下,同时拷贝安装 hadoop 后提供的两个 jar 包 hadoop-core-1.2.1.jar 和 commons-cli-1.2.jar。进入 wordcount 目录,采用下面命令进行编译
javac -classpath hadoop-core-1.2.1.jar:commons-cli-1.2.jar -d ./classes ./src/*
这条命令的作用是将 src 目录下的所有文件进行编译,生成的 class 文件放到 classes 目录下,编译过程中需要引入的 hadoop-core-1.2.1.jar 和 commons-cli-1.2.jar 两个包,里面包含了上面源文件中导入的 hadoop 的类。
编译完成后,可以在 classes 目录下发现以下子目录的结构 classes->com->lc->hadoop
, 最后在目录 hadoop 下会有三个 class 文件,分别对应上面的的三个源文件。
打包
打包需要用到 jar 命令,jar 命令是 JDK 的打包命令行工具,跟 tar 非常像。
先切换到 WordCount 目录,再执行下面的命令:
jar -cvf WordCount.jar -C ./classes/* .
在命令里,-C 是指需要打包的 class 文件的路径,。打包结果是 wordcount.jar 文件,放在当前目录下。
执行
执行 hadoop 任务需要在 HDFS 上进行,所以文件的输入输出路径也就是在 HDFS 上的路径
首先需要将待处理的文件放入到 HDFS 中,可以按顺序输入以下命令:
hadoop fs -mkdir in
// 在 HDFS 中创建一个名为 in 的文件夹
hadoop fs -put Readme.txt readme.txt
// 将 Linux 当前目录下的 Readme.txt 文件放置到 HDFS 中的 in 目录
hadoop jar WordCount.jar com.lc.hadoop WordCount in/readme.txt out
// 执行 Linux 当前目录下的 WordCount
jar 包里面的 WordCount 类,输入文件是 HDFS 中 in 目录下的 readme.txt 文件,输出文件放到 HDFS 中的 out 目录
hadoop fs -cat out/part-r-0000
// 查看得到的结果
需要注意的是 HDFS 中的文件路径不能够在 Linux 下直接通过 cd
或 ls
进行切换或查看,而必须要通过 hadoop fs
进行操作。
以上就是 Hadoop 中 MapReduce 的流程,针对不同的应用会有不同的变化,但是总体上的流程是一致的,就是先编写好三个函数(map 函数,reduce 函数和主函数),然后要经历编译->打包->执行
的流程。再查看得到的结果即可。
参考资料:最短路径系列之一从零开始学习 Hadoop