Practice 5 - Graph Processing in MapReduce
In this session we will take a look at how MapReduce model can be used for processing Graph data. We will create a new MapReduce application for finding shortest paths using the Breath First Search algorithm, which was discussed in the lecture.
Graph example for BFS shortest path algorithm
Here is an example graph from the lecture slides that you can use to test your solution. A larger input file is available here: graphAdjacency.txt
Raw graph data in adjacency list format for the example:
0 1 2 3 1 2 7 2 4 5 6 3 4 4 9 5 6 8 9 6 1 7 8 7 9 5
Every line contains a Node id and a list of all outgoing vertices (neighbors). One line for each node in the Graph.
The goal of the BFS shortest path program is to compute the shortest path from the starting node (n0
in the example) for all nodes in the graph.
Example shortest path output, when the starting node is node 0
:
0 0;1 2 3 1 1;2 7 2 1;4 5 6 3 1;4 4 2;9 5 2;6 8 9 6 2;1 7 2; 8 3;7 9 3;5
Every line in the output contains the Node id, shortest path from the source node and a list of all outgoing vertices.
Breath First Search Shortest Path in MapReduce
Input to the MapReduce application is (Node_id, Adjancency list) as (key, value).
You can download the Lab5Depth file as a MapReduce job skeleton for this exercise and modify it as needed. You can use the small example graph initially as input, but a larger input file example is available here: graphAdjacency.txt
Breath First Search is an iterative algorithm, we need to repeat the algorithm multiple times in a sequence, once for each depth step in the algorithm. We will create an iterative MapReduce program, where the Map and Reduce tasks will be repeated N times, where N is the number of steps in BFS and is a user configurable value.
At every iteration, our MapReduce program will scan through the Graph data twice: once in Map and once in Reduce. The Map method processes single node at a time (in parallel), checks whether the node is an active node and sends distance updates to its neighbors. Reduce method also processes a single node at a time (in parallel) and computes minimum between the previously known shortest path and all distance updates that were sent from other nodes.
Things to keep in mind before you start
- Until now, our map and reduce tasks have had only one type of input and output key-value pairs. For BFS, we need to use variable content (two different cases) key and value pairs.
- Our Map method will perform a slightly different task in the first iteration.
- Input to our application is a set of files, where each line consists of a Graph Node ID and an adjacency list of that node. A different program is usually needed to convert other Graph data file formats into this format.
- We will use
KeyValueTextInputFormat
which uses TAB (\t
) to separate Key and Value fields in input and output. Thus, there should be TAB between Node_ID and adjacency_list in the input file. - Be wary of input file entries where the Adjancency_list is empty, meaning there are no outgoing vertices in that node.
Exercise 5.1 Map Function
The main goal of the Map function is to generate distance update messages from active nodes to their neighbors. For simplification (check bonus task for more advanced case), lets consider node active if it has a non negative (>= 0) distance. In this context, neighbors are the nodes in the Adjancency list (nodes directly reachable through outgoing vertices).
Until now, our Map and Reduce tasks have had only one type of input and output key-value pairs. This time, Map task will write out two different types of Key-Value pairs:
- Node information messages: (Node_id, D;Adjancency_list)
- Distance update messages: (Neighbor_id, DISTANCE;D)
Node information messages are used to pass along the graph node data and to remember the length of the current shortest path to that node. Distance update messages are used to pass along distance updates to neighboring nodes. DISTANCE is a string TAG to mark that this message is distance update message, D is the actual distance value.
The first time the Map function is executed, it needs to perform extra work to set up initial distances D
(Initial D
value is 0
for the BFS starting node, -1
for all others) for each of the nodes.
The MapReduce job Map function should be the following:
- Input:
- In the first MR job: (Node_id, Adjacency list) as (key, value)
- From second MR job onward: (Node_id, D;Adjacency list)
- Map function:
- Set the initial shortest path distances
- If the node_id is equal to starting node, set distance
D
to 0. In all other cases, set distanceD
to -1. - Should be executed only once, in the first MapReduce job
- One way to check whether we are dealing with first MapReduce job is to check whether the input value contains
;
or not.
- One way to check whether we are dealing with first MapReduce job is to check whether the input value contains
- If the node_id is equal to starting node, set distance
- Send distance update messages
- If the current distance of the node is
not -1
: then for each neighbor in the adjacency list, write out a distance message (Neighbor_id, DISTANCE;D+1)D+1
is the distance of the path through the current node.- DISTANCE is a string tag to mark that we are dealing with a distance message.
- If the current distance of the node is
- Send node information message
- We also need to pass along the node data itself so it would be available for the Reduce task.
- Write out (Node_id, D;Adjacency list)
- Set the initial shortest path distances
- Output:
- Node information message: (Node_id, D;Adjacency list)
- Distance update messages: (Neighbor_id, DISTANCE;D)
Exercise 5.2 Reduce Function
All the node information messages and distance update messages will be grouped by node_id (as it is the map output key) and reduce function processes each such group (or node) at a time. The goal of the Reduce function is to update the length of the shortest path for each node in the Graph.
Input consists of two types of values: one node message (Node_id, D;Adjancency list) and a set of incoming distance messages (Node_id, DISTANCE;D). Reduce function should compute the minimum distance between the currently known distance and all the distance messages. If the node distance is currently -1
then it should be replaced with the minimum of received distance (if there are any).
The Reduce function should be the following:
- Input:
- Key: Node_id
- Values contain:
- One node message: D;Adjancency list
- Zero or more Distance update messages: DISTANCE;D
- Reduce Function:
- Compute the
minimum
distance value of all distance messages and compare it to the currently known shortest path distanceD
.- If
D
equals -1 then it should be replaced with the minimum value of distances messages (unless there are none). Otherwise use the smallest value as new shortest path D.
- If
- Compute the
- Output: (Node_id, D;Adjancency list)
Exercise 5.3 Passing additional parameters to MapReduce BFS
- Configure the starting node_id value to be passed to the program as the third argument (
Lab5Depth <input_folder> <output_folder> start_node_id
) of the program and use MapReduce job configuration to pass it along to the Map method. - Also configure the BFS depth (or the number of Mapreduce jobs to be executed) to be read as the fourth argument (
Lab5Depth <input_folder> <output_folder> start_node_id depth
)
Bonus home exercise (Additional lab credit points)
Bonus task will be to optimize your MapReduce Shortest Path application in two ways (Each will give half of the extra bonus points possible from this lab)
- Modify the application to write out new distance updates only after the distance of the current node has changed. The goal is to reduce the amount of duplicate work our program needs to perform and also the amount of transferred data.
- Currently our application considers all edges in the to be the same length (1 step). Add positive weights to each edge and modify the application to take edge weights into account when finding shortest paths.
- Input to the application should be (Node_id, Adjancency_list), where Adjancency_list elements contain node neighbor_id and distance to that neighbor separated by a colon. (Example of an Adjancency_list with weights:
1:9 2:2 3:1
) - Example input file:
0 1:9 2:2 3:1 1 2:1 7:4 2 4:1 5:1 6:2 3 4:2 4 9:3 5 6:2 8:5 9:2 6 1:2 7 8 7:3 9 5:2
- Input to the application should be (Node_id, Adjancency_list), where Adjancency_list elements contain node neighbor_id and distance to that neighbor separated by a colon. (Example of an Adjancency_list with weights:
- Deliverables:
- Modified MapReduce source code (You do not need to submit non-bonus source code separately)
- High level description of the modifications that needed to be done.
Deliverables
This time we will not execute our program in the cluster, but we will continue using it again in the future.
Deliverables for this practice session are:
- MapReduce application source code.
- Sample output files of your application (1 for each job, but keep the number of iterations at 5-10) when running on the larger input file, which is available from here: graphAdjacency.txt
Solutions to common issues and errors.
- If you have an issue with the input file, check that there are actually TAB's between key and value fields and not multiple spaces. When you try to insert a tab manually, IDE's or file editors might insert multiple spaces instead.
- If you get an error about missing libraries in IntelliJ IDEA, don't forget to activate the
include dependencies with "provided" scope
in the run configuration of your application.
NB! If you run into any issues not covered here then contact the lab supervisor.