Arvutiteaduse instituut
  1. Kursused
  2. 2018/19 sügis
  3. Hajusandmetöötlus pilves (LTAT.06.005)
EN
Logi sisse

Hajusandmetöötlus pilves 2018/19 sügis

  • HomePage
  • Lectures
  • Practicals
  • Submit Homework

Practice 6 - Joins in Hadoop MapReduce

In this practice session we take a look at how to join data from multiple different sources when processing data with MapReduce. We will apply Reduce side and In-memory joins described in the lecture slides to process data from multiple dataset tables.

Dataset Description

The dataset that we will analyze using MapReduce is taken from Kaggle dataset repository. The dataset contains weekly sales data from 45 different shops in three relational tables.

  • Name: Retail Data Analytics - Historical sales data from 45 stores
  • Location: https://www.kaggle.com/manjeetsingh/retaildataset
  • Alternative Download link: https://owncloud.ut.ee/owncloud/index.php/s/Fw4qDZHfegxT2pE

Dataset tables are:

Features

  1. Store
  2. Date
  3. Temperature
  4. Fuel_Price
  5. MarkDown1
  6. MarkDown2
  7. MarkDown3
  8. MarkDown4
  9. MarkDown5
  10. CPI
  11. Unemployment
  12. IsHoliday

Sales

  1. Store
  2. Dept
  3. Date
  4. Weekly_Sales
  5. IsHoliday

Stores

  1. Store
  2. Type
  3. Size

We will use data only from Features and Sales tables. Join keys are marked with bold.

NB! You should rename the input files so that they contain no spaces because it can cause several issues when working with MapReduce.

Exercise 6.1 Reduce side join

Create a new MapReduce application which processes data from Features and Sales table. You can download the Lab6Join file as a MapReduce job skeleton for this exercise and modify it as needed.

It should perform the following data processing task:

  • Compute the min,average, maximum of Weekly_Sales of departments when the Fuel_Price is larger than 3.0 for each Store and week Date.
  • Map task should read and parse the comma separated lines from the input files. Assign fields you want data from two tables to be joined by as the map output Key. Assign the field on which the aggregation function should be applied on as the map output Value.
  • All Map output values with the same Key will be grouped (joined) together and processed inside the same reduce method call.
  • Reduce task should loop over the incoming values, filter out values that are not required and perform the min,average, maximum operations on the remaining values.

Save the current MapReduce code and output file as the first deliverable

Exercise 6.2 In-memory Map side join

Lets now use the In-memory join instead or Reduce side join. We will give the larger data table as input to MapReduce and read in the shorter table separately through MapReduce Job Cache, which will make it available to Map and Reduce tasks through the job context.

Create a new MapReduce program for this exercise.

  • It should perform the same task as 6.1 solution:
    1. Compute the min,average, maximum of Weekly_Sales or departments when the Fuel_Price is larger than 3.0 for each Store and week Date.
  • Input to the In-memory MapReduce join program should be only the Sales table.
  • Features table is provided separately as a MapReduce Job Cache File.
    • You can files to the job cache in the main() method when configuring the MapReduce Job object like this:
      • String file = "/home/pelle/insales/features.csv";
        URI uri = new URI(file);
        job.addCacheFile(uri);
      • NB! In windows you have to use Linux like path. For example: String file = "/Users/jakovits/data/insales/features.csv";
      • Files added the the Job Cache are automatically distributed to all the Map and Reduce tasks and their list is available through the job context object.
  • Create a global features variable in the Mapper class to contain the content of the Features table.
    • Suggested data structure for the features variable is HashMap<String,Double> but you can use other data structures if you wish. HashMap key should be the join keys and value should contain whatever values we need to extract from Features table.
  • Add a public void setup(Context context) throws IOException {} method to the Mapper class. In both Mapper and Reducer classes, setup method will be executed once before any data is processed.
    • We will use it to read in the Features table content into the memory from the MapReduce Job Cache.
    • You can access the list of available files using the following code:
      • if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
           URI[] localPaths = context.getCacheFiles();
           for (URI file : localPaths) {
              String filename = (new Path(file.getPath())).getName();
              File f = new File(filename);
              String fileloc =  filename;
              if(!f.exists())
                 fileloc =  file.getPath();
              BufferedReader reader = new BufferedReader(new FileReader(fileloc));
              String line;
              while ((line = reader.readLine()) != null) {
                   //parse the csv file content one line at a time to populate features HashMap
              }
           }
        }
    • Value of local variable should be passed to the program from command line. Same with file path to the features csv file.
  • Map Task should perform the join by extracting necessary features table data directly from the features HashMap, perform (fuel price) filtering and assign correct fields for Key and Value just like in the last exercise.
  • Reduce task should simply perform min,average, maximum operations on the input values.

NB! If you get stuck with using MapReduce job Cache, you can check one full working example from the MapReduce tutorial: https://hadoop.apache.org/docs/r2.7.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html#Example:_WordCount_v2.0

Save the resulting MapReduce code and output file as the second deliverable

Exercise 6.3 Execute created application in the cluster

Execute both versions of the application in the cluster.

Hadoop Cluster links:

  • Java job configuration editor
  • Distributed file system explorer
  • Cluster Job Browser

NB! To access local cloud resources you will have to be inside the university network. When working from home or dormitory, you will need to set up VPN connection to university network.

Take the following screenshots to show you completed this task

  1. Screenshots of your Java job configurations.
  2. Screenshots that shows your executed jobs in the Cluster Job Browser. Example screenshot.

Bonus home exercise (Additional lab credit points)

Your task is to investigate the performance differences between your Reduce-side and In-memory join applications in the cluster. Because our input data set is rather small, you need to generate additional data, similarly to practice session 3 Bonus task.

  1. Generate additional bonus data. It should be at least:
    • 5GB - Sales table
    • 50MB - Features table
    • But feel free to generate more
  2. Execute both of your applications on the generated dataset and investigate the job statistics in the Cluster Job Browser (TIP: check counters tab under your job in the Job Browser)
  3. Analyse their differences
    1. Run time
    2. Intermediate data (Map output records, size of data read in, etc)
    3. What other differences do you notice?
  4. Which version of your application performs better in your opinion?
  • Deliverables
    • How much data did you to generate?
    • Describe the experiments you executed
    • Describe the results
    • Screenshots from the Job Browser counter Tab.

Be careful when generating additional data to make sure that the two data sets retain their relational structure.

Deliverables

Deliverables for this practice session are:

  1. MapReduce source code from exercises 6.1 and 6.2.
  2. Output of the applications (6.1 and 6.2)
  3. Screenshots from 6.3
6. Practice session 6
Sellele ülesandele ei saa enam lahendusi esitada.

Solutions to common issues and errors.

  • If you get an error about Job Cache file Path being incorrect.
    • In windows you have to use Linux like path. For example: String file = "/Users/jakovits/data/insales/features.csv";
    • Make sure you dont have any spaces in file path
  • If you get an FileNotFoundException in the cluster about the cache file, check that you are reading file correctly in the setup function and that the file exists in HDFS in the correct path.
  • If you get an error about missing libraries in IntelliJ IDEA, don't forget to activate the include dependencies with "provided" scope in the run configuration of your application.
  • If you get error java.io.FileNotFoundException: Featuresdataset.csv then you can change for Task 6.2 temprorarily line (For Task 6.3 running in cluster it should still be as stated in lab manual.)
  BufferedReader reader = new BufferedReader(new FileReader(filename));  

to

  BufferedReader reader = new BufferedReader(new FileReader(localpath.toString())); 

NB! If you run into any issues not covered here then contact the lab supervisor.

  • Arvutiteaduse instituut
  • Loodus- ja täppisteaduste valdkond
  • Tartu Ülikool
Tehniliste probleemide või küsimuste korral kirjuta:

Kursuse sisu ja korralduslike küsimustega pöörduge kursuse korraldajate poole.
Õppematerjalide varalised autoriõigused kuuluvad Tartu Ülikoolile. Õppematerjalide kasutamine on lubatud autoriõiguse seaduses ettenähtud teose vaba kasutamise eesmärkidel ja tingimustel. Õppematerjalide kasutamisel on kasutaja kohustatud viitama õppematerjalide autorile.
Õppematerjalide kasutamine muudel eesmärkidel on lubatud ainult Tartu Ülikooli eelneval kirjalikul nõusolekul.
Courses’i keskkonna kasutustingimused