Distributed Machine Learning (4) - Implement Your MapReduce

When mentioning MapReduce, one naturally thinks of Hadoop MapReduce, but MapReduce is just a programming paradigm, and Hadoop MapReduce is a well-known implementation of this paradigm. Actually, MapReduce can be implemented in various ways. This article introduces how to implement a MapReduce program in Linux bash, with both single-machine and multi-machine versions. The original video is here (requires VPN).

Below, we use the classic WordCount example in MapReduce, first implementing the single-machine version, then the multi-machine version.

Single-Machine MapReduce

Below is a single-machine WordCount program implemented through the MapReduce paradigm in bash:

1
2
3
4
5
6
7
8
9
10
11
12
text=$(cat <<EOF
This is my Cup
It is not your cup
My cup is white
Your cup is blue
EOF
)

echo $text\
| awk '{for (i=0; i<=NF; i++) print $i, 1}' \
| sort \
| awk '{if ($1 != prev) {print prev, c; c=0; prev=$1} c+=$2}'

There are some syntax details to understand in this bash script:

The cat <<EOF command in bash is mainly for handling multi-line string related tasks. “Real-time” means the multi-line string is input when the command executes (ending when EOF is encountered). This command can generally be used for:

  1. Assigning a multi-line string to a variable
  2. Writing a multi-line string to a file
  3. Passing a multi-line string to a pipe command

The code above assigns a multi-line string to the variable text. Examples for these three tasks can be found here.

awk is a language and a common Linux tool for processing text data, especially table-like data. awk processes text line by line until traversing the entire input stream (can be standard input or file stream—the code above uses standard input). Each line is split by spaces or tabs into fields, indexed from 1—$1 is the first field’s value, etc. NF is a special variable in awk indicating how many fields are in the current line. awk’s operation on each line is the command inside {}.

Therefore, in the code above, the first awk implements the map process, sort implements the shuffling process, and the second awk implements the reduce process.

Multi-Machine MapReduce

Above is a single-machine MapReduce, but MapReduce shows its power more on multiple machines. Multi-machine MapReduce first needs to consider communication between machines—here we use SSH communication.

Besides opening a shell on a remote machine, SSH can also directly start a process on a remote machine through commands to run a specified program. For example, running the following code will display hello world on the local machine:

echo "hello world" | ssh 192.168.1.10 'cat'

The communication process: first, the local machine connects to the remote machine via SSH, passing hello world as an input stream to the remote machine. The sshd process on the remote machine intercepts the input stream, starts a cat process to read it, and returns the output stream to the local machine. The local sshd process also intercepts the output stream and outputs it locally.

This distributed communication pattern is very common in various distributed systems (YARN, Mesos, K8s). Each node needs a daemon to communicate with other nodes and manage resources—here sshd acts as the daemon, just without resource management functionality, but the basic principle is the same.

Using this method, we can put the map process on other machines. The code below puts the awk '{for (i=0; i<=NF; i++) print $i, 1}' program on machine 192.168.1.10:

1
2
3
4
5
6
7
8
9
10
11
12
text=$(cat <<EOF
This is my Cup
It is not your cup
My cup is white
Your cup is blue
EOF
)

echo $text\
| ssh 192.168.1.10 'awk '{for (i=0; i<=NF; i++) print $i, 1}'' \
| sort \
| awk '{if ($1 != prev) {print prev, c; c=0; prev=$1} c+=$2}'

We can put the map process on other machines and store results there because by default one machine cannot store all data, and input data is distributed across machines. The specific code:

1
2
3
4
5
Map = '{for (i=0; i<=NF; i++) print $i, 1}'

ssh worker1 'awk $Map /input*.txt > /tmp/o1 && echo worker1 ok'
ssh worker2 'awk $Map /input*.txt > /tmp/o1 && echo worker2 ok'
ssh worker3 'awk $Map /input*.txt > /tmp/o1 && echo worker3 ok'

The code above has three workers process their local files and store processed files locally, returning messages when done. Note awk $Map /input*.txt > /tmp/o1 && echo worker1 ok needs to be enclosed in quotes, indicating the entire command executes on the remote machine.

When starting remote programs via SSH as above, you typically configure key-based access to avoid entering passwords each time.

Besides the map process, the shuffling process also needs distributed execution because data cannot fit on one machine. Previously, shuffling was a sort operation on all data—this approach obviously won’t work now. Actually, one purpose of shuffling is to hand the same key to the same worker. We can use the following method for distributed shuffling:

Assume there are n reduce workers. Each map worker performs hash(key)%n on each key,value record it processes. Let the modulo result be i (0<=i<n), and write the record to the i-th local file. At most n files will be generated locally. Then copy these n files remotely (via scp, etc.) to the n reduce workers. This ensures the same key is handled by the same worker. Reduce workers just need to sort and reduce the files copied to their machines. Reduce results are also stored on various machines (or consider storing on one machine if the post-MapReduce data can fit on one machine).

Note two points in the above process:

  1. Map and shuffle can overlap, but map and reduce cannot overlap. There are many ways for map and shuffle to overlap—one is: when each map worker generates n files as above, instead of sending all records to reduce workers at once, copy them when reaching a certain count. Reduce workers use insertion sort—for each file received from map workers, insert into the already sorted sequence.

  2. Some workers may be much slower than others, possibly due to load balancing issues—too many records assigned to that worker. You can further split these records, but ensure the same key is handled by the same reduce worker.

There’s a bashreduce project on GitHub that implements MapReduce in bash—the idea is similar to what we discussed, just with many other details considered.

The author also implemented a C++ version mapreduce-lite, not considering storage issues, quite fast—feel free to check it out.

Also, the Hadoop project has Hadoop Streaming, allowing users to implement MapReduce operations in other languages, just by specifying mapper and reducer.