分布式机器学习(4)-Implement Your MapReduce

提到 MapReduce,很自然想到的是 Hadoop MapReduce ,但是 MapReduce 只是一个编程范式,而 Hadoop MapReduce 则是这个编程范式的一个比较出名的实现。实际上,可以通过多种方式实现 MapReduce,本文要介绍的就是如何在 Linux 的 bash 下实现一个 MapReduce 程序,并且分别实现了单机版本和多机器版本。原视频见这里,需要自备梯子。

下面以 MapReduce 中的经典例子 WordCount 为例进行讲述, 先实现单机版本,再实现多机版本

单机版 MapReduce

下面是在 bash 下通过 MapReduce 范式实现的单机版本的 WordCount 程序

1
2
3
4
5
6
7
8
9
10
11
12
text=$(cat <<EOF
This is my Cup
It is not your cup
My cup is white
Your cup is blue
EOF
)

echo $text\
| awk '{for (i=0; i<=NF; i++) print $i, 1}' \
| sort \
| awk '{if ($1 != prev) {print prev, c; c=0; prev=$1} c+=$2}'

上面的 bash 脚本有几个需要了解的语法细节

cat <<EOF 命令在 bash 中主要用于处理与实时多行string相关的任务,实时指的是多行string要在命令执行的时候输入(遇到 EOF 结束),这个命令一般可用于以下几个任务

1. 将变量的值赋为多行 string 2. 将多行 string 写入文件 3. 将多行 string 传入管道命令

上面的代码中就是将多行的 string赋值给变量 text,这三个任务的例子可参考这里

awk 是一门语言,也是 Linux 下一个常用工具,用于处理文本相关的数据,尤其是表格类的数据。awk 会逐行处理文本直至遍历完整个输入流(可以是标准输入流,也可以是文件流,上面的代码是标准输入流),每一行默认根据空格或tab分割文本为若干的 fields,从下标 1 开始,$1 表示第一个 field 的值,其他同理;NF 则是 awk 中特殊变量,表示这一行共有几个 field。 awk 对每行的操作是包含在 {} 中的命令。

因此,上面的代码中第一个 awk 实现了 map 过程,sort 实现了 shuffling 的过程,而第二个 awk 实现了 reduce 过程。

多机器版本 MapReduce

上面是一个单机版本的 MapReduce,然而 MapReduce 在多台机器上更能显示其威力。多机器版本的 MapReduce 首先要考虑的是不同机器间的通信问题,这里采用的是 ssh 通信方式。

ssh 除了可以开一个远程机器的 shell 外,还可以通过命令直接在远程机器起一个进程来运行指定程序。如运行下面的代码会在本地机器上显示 hello world

echo "hello world" | ssh 192.168.1.10 'cat'

其通信过程首先是本机通过 ssh 连接到远程机器上,同时将 hello world 作为输入流传到远程机器,远程机器上的 sshd 进程截获了输入流,同时启动 cat 进程读取输入流,并将输出流返回给本地机器,本地机器的 sshd 进程同样会截获输出流,然后在本地机器输出。

这种分布式通信的模式在各个分布式系统中(yarn,mesos,k8s)都非常常见,每个节点都要有一个 deamon 与其他节点进行通信并进行资源管理,在这里 sshd 就相当于 daemon,只是没有资源管理功能,但是基本原理是一样的。

因此,利用这种方式,可将map过程放到其他机器上,如下代码所示就是将 awk '{for (i=0; i<=NF; i++) print $i, 1}' 这段程序放到了 192.168.1.10 这台机器上执行。

1
2
3
4
5
6
7
8
9
10
11
12
text=$(cat <<EOF
This is my Cup
It is not your cup
My cup is white
Your cup is blue
EOF
)

echo $text\
| ssh 192.168.1.10 'awk '{for (i=0; i<=NF; i++) print $i, 1}'' \
| sort \
| awk '{if ($1 != prev) {print prev, c; c=0; prev=$1} c+=$2}'

因此可将 map 过程放到其他机器上执行,并将结果存储在其他机器上,因为默认一台机器无法存储所有的数据,而输入的数据也是分布在各台机器上,这个过程具体代码如下所示

1
2
3
4
5
Map = '{for (i=0; i<=NF; i++) print $i, 1}'

ssh worker1 'awk $Map /input*.txt > /tmp/o1 && echo worker1 ok'
ssh worker2 'awk $Map /input*.txt > /tmp/o1 && echo worker2 ok'
ssh worker3 'awk $Map /input*.txt > /tmp/o1 && echo worker3 ok'

上面的代码分别令三个 worker 处理它们本地的文件并将处理后的文件存储在本地,在处理完后返回消息。注意 awk $Map /input*.txt > /tmp/o1 && echo worker1 ok 需要用分号括起来,表示整条命令都在远程机器执行。

上面通过 ssh 启动远程程序时,一般会配置密钥访问,从而避免每次都要输入密码。

除了 map 过程,shuffling 过程也需要分布式执行,原因是数据无法容纳在一台机器上。之前shuffling 操作是对所有的数据进行 sort 操作,现在这种方案显然行不通,实际上 shuffling 的一个目的是将相同的 key 交给相同的worker进行处理。因此可以采取下面的方法进行分布式 shuffling

假设有 n 个 reduce worker,则每个 map worker 对其所处理的数据的每条 key,value 记录进行 hash(key)%n 操作,记取模后的值为 i (0<=i < n),并将记录写入到本地第 i 个文件中,则最多会在本地生成 n 个文件,然后分别将这 n 个文件远程复制(scp 等)到 n 个reduce worker的机器上。这样就会令相同的 key 被同一个 worker 处理。reduce worker 只需要对复制到其机器上的若干个文件进行 sort 和 reduce 操作即可,reduce 后的结果也是存储在各台机器上的(也可以考虑存放在一台机器,如果经过 MapReduce 后的数据量能够存放在一台机器上)。

上面的过程需要注意以下两点

  1. map 和 shuffle 可以重叠,但是 map 和 reduce 不能重叠。map 和 shuffling 的重叠方法有很多,其中的一种是每个 map worker 通过上面的方法生成 n 个文件时,不是一次性将所有的 record 传送给 reduce worker,而是达到一定数量后就复制,reduce worker 端则通过插入排序进行 sort 操作,每次接收到 map worker 传过来的文件时,就在已排序的序列上进行插入排序

  2. 某个 worker 的可能会被比其他的要慢很多,可能原因 load balance 问题,也就是分到这个 worker 的 record 数量太多,可以对这些 record 进行进一步的切分,但是要保证同一个 key 需要被同一个 reduce worker 处理。

Github 上有个 bashreduce 的项目就是在 bash 上实现了 MapReduce,思路与我们前面讲的差不多,只是还考虑了很多其他细节。

作者本人也实现了一个 C++ 版本的 mapreduce-lite,没有考虑存储问题,速度较快,感兴趣可参考。

另外,Hadoop 项目中也有 Hadoop Streaming,允许用于用其他语言实现 MapReduce 操作,只要指定好 mapper 和 reducer 即可。