Hadoop中MapReduce快速入门

因为研究生的方向是数据挖掘,所以免不了要接触到Hadoop,Hadoop是一个用Java语言实现开源软件框架,通过大量计算机组成的集群对海量数据进行分布式计算。

Hadoop中两个重要组成部分为HDFS和MapReduce。其中HDFS用于存储海量的数据,MapRudece则负责处理这些数据,从中获取所需的信息。

HDFS简单介绍

HDFS(Hadoop Distributed File System)翻译过来就是"Hadoop 分布式文件系统",用于存储海量的数据。从“分布式文件系统”的名字可以知道这个文件系统运行在集群上。对于一个文件,Hadoop会将其先分成若干个block(每个block的大小默认为64M,当然也可以自己指定block的大小),然后再将block存储到集群上。为了保证数据的冗余性,HDFS会为每个block创建2个副本,然后将这三个相同的block分别存储在不同的机器上。

例如下图就是将data1分成了1、2、3共三个block,为每个block创建副本后再存储在不同的机器上;同理将data2分成了4、5共两个block

MapReduce介绍

有了数据就可以对其进行处理,从中提取出我们所需的信息。在Hadoop中是通过MapReduce来实现的。

MapReduce任务过程被分为两个阶段:Map阶段和Reduce阶段。每个阶段都用key/value作为输入和输出;每个阶段都需要定义函数,也就是map函数和reduce函数;可以简单认为map函数是对原始数据提出出有用的部分,而reduce函数则是对提取出来的数据进行处理。

所以实际编写程序时需要编写三个函数:Map函数,Reduce函数和调用他们执行任务的主函数,在编写程序时必须要有这个整体的概念

下面会以Hadoop官方文档中的WordCount任务为例阐述MapReduce,WordCount的任务很简单,就是计算出一个文本中每个单词出现了多少次。下面分别来分析这几个函数:

需要注意的而是在编写这三个函数时均需要用到Hadoop本身提供的jar包 下面的实例是Hadoop 1.2.1 版本提供的jar包

Map函数

在本例中map函数的主要作用就是以k-v形式记录所有出现过的词,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
*WordCount的map程序
*/

package com.lc.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

//引入Hadoop本身提供的jar包
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*继承Mapper类,<Object,Text,Text,IntWritable>表示输入输出的key-value 类型*/
public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
IntWritable one=new IntWritable(1);
Text text=new Text();

public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
/*key为输入的key,value为输入的value,因为用不上输入的key的类型,所以直接定义为Object类型,而Context是定义在Mapper类内部的,用于存储key-value键值对*/
StringTokenizer tokenizer=new StringTokenizer(value.toString());
while(tokenizer.hasMoreTokens()){
text.set(tokenizer.nextToken());
context.write(text,one);
}
}
}

关于程序的几点解释:

  • StringTokenizer类的作用是根据某一分隔符将String分隔开,默认是采用空格。
  • IntWritable 类表示的是一个整数,是一个以类表示的可序列化的整数
  • Text 类代表的是可序列化的String类型
  • Mapper 类将输入键值对映射到输出键值对,也就是 MapReduce 里的 Map 过程

经过map过程后,文章被分割成大量的k-v对,k为实际的单词,v均为1,下一步就是要将相同的单词合并在一起。

Reduce函数

Reduce函数的作用就是将相同的单词出现的次数合并在一起,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
*WordCount的reduce程序
*/

package com.lc.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CountReducer extends Reducer<Text ,IntWritable,Text,IntWritable>{

IntWritable result=new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum=0;
for(IntWritable iw : values){
sum+=iw.get();
}
result.set(sum);
context.write(key,result);
}
}

Reduce与Map函数有很多地方比较相似,均是继承了hadoop提供的jar包中的类,只是map函数继承了Mapper类,而reduce函数继承了Reducer类,输入输出的类型均是k-v键值对。而且reduce函数的输入就是map函数的输出。

主函数

主函数的任务就是要创建一个任务,并且把map和reduce类都引进来,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
*WordCount的主程序
*/

package com.lc.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount{

public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();//从hadoop配置文件中读取参数
//从命令行读取参数
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();

if(otherArgs.length!=2){
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}

Job job=new Job(conf,"WordCount");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(CountReducer.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit( (job.waitForCompletion(true)?0:1));

}
}

关于程序有几点需要注意的地方:

  • Configuration 类用于读写和保存各种配置资源
  • Path 类保存文件或者目录的路径字符串
  • Job 类:在hadoop中每个需要执行的任务是一个 Job,这个 Job 负责很多事情,包括参数配置,设置 MapReduce 细节,提交到 Hadoop 集群,执行控制,查询执行状态,等等
  • FileInputFormat和FileOutputFormat用于处理文件的输入和输入(针对MapReduce而言)
  • GenericOptionsParser 类负责解析 hadoop 的命令行参数

执行任务

编写好源程序后,需要在hadoop上执行我们在源程序中写好的代码,大致的过程如下:编译->打包->执行,下面分别介绍。为了程序的规范性,首先建立一个wordcount的文件夹,下面再建两个子文件夹src和classes,分别放置源程序文件和编译好后的class文件。且默认是在Linux上执行这些操作的。

编译

首先将上面写好的三个源文件放到wordcount的src目录下,同时拷贝安装hadoop后提供的两个jar包hadoop-core-1.2.1.jar和commons-cli-1.2.jar。进入wordcount目录,采用下面命令进行编译

javac -classpath hadoop-core-1.2.1.jar:commons-cli-1.2.jar -d ./classes ./src/*

这条命令的作用是将src目录下的所有文件进行编译,生成的class文件放到classes目录下,编译过程中需要引入的hadoop-core-1.2.1.jar和commons-cli-1.2.jar两个包,里面包含了上面源文件中导入的hadoop的类。 编译完成后,可以在classes目录下发现以下子目录的结构classes->com->lc->hadoop,最后在目录hadoop下会有三个class文件,分别对应上面的的三个源文件。

打包

打包需要用到 jar 命令,jar 命令是 JDK 的打包命令行工具,跟 tar 非常像。

先切换到WordCount目录,再执行下面的命令:

jar -cvf WordCount.jar -C ./classes/* .

在命令里,-C 是指需要打包的class文件的路径,。打包结果是 wordcount.jar 文件,放在当前目录下。

执行

执行hadoop任务需要在HDFS上进行,所以文件的输入输出路径也就是在HDFS上的路径

首先需要将待处理的文件放入到HDFS中,可以按顺序输入以下命令: hadoop fs -mkdir in //在HDFS中创建一个名为in的文件夹

hadoop fs -put Readme.txt readme.txt //将Linux当前目录下的Readme.txt文件放置到HDFS中的in目录

hadoop jar WordCount.jar com.lc.hadoop WordCount in/readme.txt out//执行Linux当前目录下的WordCount jar包里面的WordCount类,输入文件是HDFS中in目录下的readme.txt文件,输出文件放到HDFS中的out目录

hadoop fs -cat out/part-r-0000 //查看得到的结果

需要注意的是HDFS中的文件路径不能够在Linux下直接通过cdls进行切换或查看,而必须要通过hadoop fs进行操作。

以上就是Hadoop中MapReduce的流程,针对不同的应用会有不同的变化,但是总体上的流程是一致的,就是先编写好三个函数(map函数,reduce函数和主函数),然后要经历编译->打包->执行的流程。再查看得到的结果即可。

参考资料:最短路径系列之一从零开始学习Hadoop