Practice 6 - Joins in Hadoop MapReduce
In this practice session we take a look at how to join data from multiple different sources when processing data with MapReduce. We will apply Reduce side and In-memory joins described in the lecture slides to process data from multiple dataset tables.
Dataset Description
The dataset that we will analyze using MapReduce is taken from Kaggle dataset repository. The dataset contains weekly sales data from 45 different shops in three relational tables.
- Name: Retail Data Analytics - Historical sales data from 45 stores
- Location: https://www.kaggle.com/manjeetsingh/retaildataset
- Alternative Download link: https://owncloud.ut.ee/owncloud/index.php/s/Fw4qDZHfegxT2pE
Dataset tables are: | ||
Features
|
Sales
|
Stores
|
We will use data only from Features and Sales tables. Join keys are marked with bold.
NB! You should rename the input files so that they contain no spaces because it can cause several issues when working with MapReduce.
Exercise 6.1 Reduce side join
Create a new MapReduce application which processes data from Features and Sales table. You can download the Lab6Join file as a MapReduce job skeleton for this exercise and modify it as needed.
It should perform the following data processing task:
- Compute the
min,average, maximum
of Weekly_Sales of departments when the Fuel_Price islarger than 3.0
for each Store and week Date. - Map task should read and parse the comma separated lines from the input files. Assign fields you want data from two tables to be joined by as the map output Key. Assign the field on which the aggregation function should be applied on as the map output Value.
- All Map output values with the same Key will be grouped (joined) together and processed inside the same reduce method call.
- Reduce task should loop over the incoming values, filter out values that are not required and perform the
min,average, maximum
operations on the remaining values.
Save the current MapReduce code and output file as the first deliverable
Exercise 6.2 In-memory Map side join
Lets now use the In-memory join instead or Reduce side join. We will give the larger data table as input to MapReduce and read in the shorter table separately through MapReduce Job Cache, which will make it available to Map and Reduce tasks through the job context.
Create a new MapReduce program for this exercise.
- It should perform the same task as 6.1 solution:
- Compute the
min,average, maximum
of Weekly_Sales or departments when the Fuel_Price islarger than 3.0
for each Store and week Date.
- Compute the
- Input to the In-memory MapReduce join program should be only the Sales table.
- Features table is provided separately as a MapReduce Job Cache File.
- You can files to the job cache in the
main()
method when configuring the MapReduceJob
object like this:String file = "/home/pelle/insales/features.csv"; URI uri = new URI(file); job.addCacheFile(uri);
- NB! In windows you have to use Linux like path. For example:
String file = "/Users/jakovits/data/insales/features.csv";
- Files added the the Job Cache are automatically distributed to all the Map and Reduce tasks and their list is available through the job
context
object.
- You can files to the job cache in the
- Create a global
features
variable in the Mapper class to contain the content of the Features table.- Suggested data structure for the
features
variable isHashMap<String,Double>
but you can use other data structures if you wish. HashMap key should be the join keys and value should contain whatever values we need to extract from Features table.
- Suggested data structure for the
- Add a
public void setup(Context context) throws IOException {}
method to the Mapper class. In both Mapper and Reducer classes, setup method will be executed once before any data is processed.- We will use it to read in the Features table content into the memory from the MapReduce Job Cache.
- You can access the list of available files using the following code:
if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { URI[] localPaths = context.getCacheFiles(); for (URI file : localPaths) { String filename = (new Path(file.getPath())).getName(); File f = new File(filename); String fileloc = filename; if(!f.exists()) fileloc = file.getPath(); BufferedReader reader = new BufferedReader(new FileReader(fileloc)); String line; while ((line = reader.readLine()) != null) { //parse the csv file content one line at a time to populate features HashMap } } }
- Value of
local
variable should be passed to the program from command line. Same withfile
path to the features csv file.
- Map Task should perform the join by extracting necessary features table data directly from the features HashMap, perform (fuel price) filtering and assign correct fields for
Key
andValue
just like in the last exercise. - Reduce task should simply perform
min,average, maximum
operations on the input values.
NB! If you get stuck with using MapReduce job Cache, you can check one full working example from the MapReduce tutorial: https://hadoop.apache.org/docs/r2.7.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html#Example:_WordCount_v2.0
Save the resulting MapReduce code and output file as the second deliverable
Exercise 6.3 Execute created application in the cluster
Execute both versions of the application in the cluster.
Hadoop Cluster links:
NB! To access local cloud resources you will have to be inside the university network. When working from home or dormitory, you will need to set up VPN connection to university network.
Take the following screenshots to show you completed this task
- Screenshots of your Java job configurations.
- Screenshots that shows your executed jobs in the Cluster Job Browser. Example screenshot.
Bonus home exercise (Additional lab credit points)
Your task is to investigate the performance differences between your Reduce-side and In-memory join applications in the cluster. Because our input data set is rather small, you need to generate additional data, similarly to practice session 3 Bonus task.
- Generate additional bonus data. It should be at least:
- 5GB - Sales table
- 50MB - Features table
- But feel free to generate more
- Execute both of your applications on the generated dataset and investigate the job statistics in the Cluster Job Browser (TIP: check counters tab under your job in the Job Browser)
- Analyse their differences
- Run time
- Intermediate data (Map output records, size of data read in, etc)
- What other differences do you notice?
- Which version of your application performs better in your opinion?
- Deliverables
- How much data did you to generate?
- Describe the experiments you executed
- Describe the results
- Screenshots from the Job Browser counter Tab.
Be careful when generating additional data to make sure that the two data sets retain their relational structure.
Deliverables
Deliverables for this practice session are:
- MapReduce source code from exercises 6.1 and 6.2.
- Output of the applications (6.1 and 6.2)
- Screenshots from 6.3
Solutions to common issues and errors.
- If you get an error about Job Cache file Path being incorrect.
- In windows you have to use Linux like path. For example:
String file = "/Users/jakovits/data/insales/features.csv";
- Make sure you dont have any
spaces
infile path
- In windows you have to use Linux like path. For example:
- If you get an FileNotFoundException in the cluster about the cache file, check that you are reading file correctly in the
setup
function and that the file exists in HDFS in the correct path. - If you get an error about missing libraries in IntelliJ IDEA, don't forget to activate the
include dependencies with "provided" scope
in the run configuration of your application. - If you get error
java.io.FileNotFoundException: Featuresdataset.csv
then you can change for Task 6.2 temprorarily line (For Task 6.3 running in cluster it should still be as stated in lab manual.)
BufferedReader reader = new BufferedReader(new FileReader(filename));
to
BufferedReader reader = new BufferedReader(new FileReader(localpath.toString()));
NB! If you run into any issues not covered here then contact the lab supervisor.