DataFrames in Apache Spark
We will take a look at how to process data using Spark DataFrames and continue using the Yelp dataset. Some of the tasks will be similar to last weeks tasks to give you a opportunity to directly compare using SQL and DataFrame API in Spark. In addition, you will learn how to execute Python Spark applications in the Hadoop cluster through Hue user interface.
References
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/2.3.2/sql-programming-guide.html
- Spark DataFrame/SQL Functions - https://spark.apache.org/docs/2.3.2/api/sql/index.html
- Spark Lecture slides: https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L10_2018.pdf
Dataset Description
The dataset that we will analyze is the Yelp Dataset. It contains Yelp businesses, reviews, and user data for use in personal, educational, and academic purposes. It is freely available to for teaching about databases and NLP. The dataset consists of 6 JSON files: Businesses, reviews, users, checkins, tips and photos.
NB! Using this dataset requires that you read and agree with the Dataset Challenge Dataset Agreement .
- Name: Yelp Dataset
- Location: https://www.yelp.com/dataset
- Download link for a smaller sample of the dataset for testing in the lab: https://owncloud.ut.ee/owncloud/index.php/s/HWqreZxGRA6Q45d (Ask for password from the lab supervisor)
- Documentation: https://www.yelp.com/dataset/documentation/main
- Location of dataset in the cluster: /tmp/yelp
Dataset tables are: | |||
Review
|
User
|
Business
|
Checkin
|
Tip
| |||
Photo
|
Join keys are marked red. As we are dealing with JSON files, order of the attributes may differ from the list here. Some attributes might in turn contain nested structures. Check the dataset documentation or Spark's df.printSchema()
command output for the complete list of (nested) attributes.
Preparation
We will continue using Python PyCharm IDE to simplify working with DataFrames in Spark. You will download and run an example Spark Python DataFrame script to check that everything is working. You can use this script as a template in the following exercises.
- Either create a new Python Spark project in PyCharm (just like in the previous lab) or reuse your previous project.
- Download the following Python Spark DataFrame dataframe_example.py file and move it inside your PySpark project.
- Download the smaller sample of the dataset for development and testing in the lab from here: https://owncloud.ut.ee/owncloud/index.php/s/HWqreZxGRA6Q45d (Ask for password from the lab supervisor)
- Create a new folder for input files (Does not have to be inside your Spark Python project) and move the Yelp dataset JSON files there.
- Run the dataframe_example.py Python script to check that everything is working as needed.
- Its argument should be:
input_folder
- When you first try the example script, you should use the location of the yelp small_review folder as the
input_folder
- Its argument should be:
NB! You must solve the data processing tasks of the following exercises by using Spark DataFrame API and avoid using Spark RDD commands and SQL.
Exercise 10.1. Processing data using Spark DataFrames
Use the Spark DataFrame API to solve the following data processing tasks. First two tasks are same from the last, SQL practical session.
You should keep in mind that while you can take a very similar approach to previous practice session, order of data processing operations will be different from SQL statements. For example, groupBy() always has to be performed before applying aggregation functions, while inside SQL statements groupBy() is usually located the end of statements.
- Find the top 5 users who gave out most 5 star reviews.
- Write a select statement that queries the review table
- Group the review table by user_id and review stars S (S = 1,2,3,4 or 5) and compute the count of rows inside group as starcount (starcount = the number of times user gave S stars).
- Filter the result by stars value is equal to 5 and order by the count of review stars.
- Find the average amount of stars for Businesses that offer takeout and have a picture of the menu available.
- You will need to join 3 tables: photo, business and review
- Takeout availability can be found from the nested attributes field in business table. Check the full schema of the business table using
business.printSchema()
- Create a Pivot table over cities and review date month and year, with average stars as the aggregation.
- You will need to join review table (stars, date) with Business (city) before applying pivot.
- if you have a conflict between same labelled columns from two tables, You can remove columns (before joining) by calling
df.drop("column")
- if you have a conflict between same labelled columns from two tables, You can remove columns (before joining) by calling
- Check the Lecture slides for a Pivot example.
- This will allow us to get an overview of how the review score of businesses in cities have changed over time.
- Pivot table
- Rows: City
- Columns: Month & Year ("2009--7" for example)
- Cell value: Average stars
- Example (limited) output:
+---------------+-------+-------+-------+-------+-------+-------+ | city|2018-02|2018-03|2018-04|2018-05|2018-06|2018-07| +---------------+-------+-------+-------+-------+-------+-------+ | Tempe| 5.0| null| 4.0| 4.0| null| null| |North Las Vegas| null| null| null| 5.0| null| null| | Phoenix| 3.0| 4.4| 4.0| 3.63| 3.67| null| | Savoy| null| null| null| null| null| null| | Wickliffe| null| null| null| null| null| null| | Monroe| null| null| null| null| null| null| +---------------+-------+-------+-------+-------+-------+-------+
- You will need to join review table (stars, date) with Business (city) before applying pivot.
Exercise 10.2. Using User Defined Functions in Python DataFrame API
Create a Python function for computing the average and median length of words in the review text field and call it as a User Defined Function (UDF) inside DataFrame statements to solve the following task:
- Find the correlation between the usefulness rating of a review in comparison to the average or median length of words in the review text.
- Create a UDF, which gets a block of text, divides text into words and computes both the average and median word length and outputs a tuple of (average, median)
- Compute the correlation between:
- Average word length of review and useful column
- Median word length of review and useful column
- You can use the
corr(col1, col2)
column function from the spark.sql.functions library.
Exercise 10.3. Running Spark applications in the Hadoop cluster
Your task will be to apply the Exercise 10.2 Spark script on the whole Yelp dataset in the cluster. (PS! It would also be ok to Use Exercise 10.1 script instead)
NB! Cluster is running Spark 2.7! If you use 3.X version of Python you may have to modify your Python script to be compatible with 2.7.
- New Hadoop Cluster is available here: http://172.17.64.225:8889/
- NB! Each student has a new Cloudera cluster account.
- You can access your new Cloudera credentials from here if you are logged in to the courses.cs.ut.ee website
- Contact lab supervisor if there are any issues with your credentials or accessing the cluster.
Tasks in the cluster:
- Log into the new cluster
- Upload your python script into the HDFS using Hue File Browser: http://172.17.64.225:8889/hue/filebrowser/view
- Upload small Yelp dataset (for initial testing) into your folder in HDFS
- Create a new Spark application at
Quesry -> Editor -> Spark
- Add the location of your python script as Library entry
- Add each argument to your Python script as a separate Arguments entry
- Example:
- Run the script and check if output folder is created properly or correct information is printed out.
- NB! If you see an error about the date format and "XXX", then you should modify the dataframe.read.json() and dataframe.write.save() commands to add an option that specifies correct time stamp format, like this:
counts.write.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").format("csv").save(output_folder)
spark.read.option("inferSchema", True).option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").json(input_folder")
- NB! In some cases you may see Errors even when your application worked successfully.
- Such as
javax.jdo.JDODataStoreException: Error executing SQL query "select "DB_ID" from "DBS""
- You should also ignore WARN messages, such as unable to return groups for user ...
- Such as
- NB! If you see an error about the date format and "XXX", then you should modify the dataframe.read.json() and dataframe.write.save() commands to add an option that specifies correct time stamp format, like this:
- Check that everything is working with the small dataset
- Execute your script on the whole dataset (
/tmp/yelp
) in the cluster.- Each Yelp dataset (User, Review, etc. ) has its own folder
- NB! Make sure to verify the correct names for the sub folders. Sub folders DO NOT contain the string "small_".
Take screenshots of the result as the deliverable.
You can check Spark specific information about the running and finished jobs at the Spark History Server, which is available here: http://172.17.64.221:18088/
Bonus Exercise
NB! As a result of not having cluster and a bonus task available last week: This week's bonus task is worth 2 times more bonus points and its deadline is extended by 1 week. However, this does not affect the deadline of the rest of the exercises from this practical session!
- Create a UDF to compute the sentiment score of Yelp reviews.
- You can find list of word-to-sentiment datasets/corporas at:
- https://github.com/xiamx/awesome-sentiment-analysis
- In particular, this source appears to have a relatively simple to use dataset http://www2.imm.dtu.dk/pubdb/views/publication_details.php?id=6010
- You can find list of word-to-sentiment datasets/corporas at:
- Find the top 10 Yelp businesses with the lowest average review sentiment and which have at least 10 reviews (On small dataset, you should set it down to 3 instead of 10).
- Also find and separately (different file) write out the most negative (lowest sentiment score) review for each of these companies.
- Deliverables of the bonus task
- Spark Python script(s)
- Output files
- Any additional files that are required (such as the sentiment dataset you used)
Deadline of the bonus task: 4th December
Deliverables:
- Spark Python scripts from Exercises 10.1, 10.2
- Output of the scripts from Exercises 10.1, 10.2
- The output should contain the resulting DataFrame content.
- Output should be provided either as:
- JSON output file(s)
- or the respective
df.show()
command's output in a text file.
- Screenshots from the cluster web interface which show that you have successfully finished Exercise 10.3
Potential issues and solutions
- If you get an error about UnicodeEncodeError: 'ascii' codec can't encode character
- Add:
# -*- coding: utf-8 -*-
at the start of your script
- Add:
- Cluster has Python version 2.7! When executing your Python Spark scripts in the cluster,make sure they are compatible with 2.7.
- NB! If you see an error about the date format and "XXX", then you should modify the dataframe.read.json() and dataframe.write.save() commands to add an option that specifies correct time stamp format, like this:
counts.write.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").format("csv").save(output_folder)
spark.read.option("inferSchema", True).option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").json(input_folder")
- NB! In some cases you may see Errors even when your application worked successfully.
- Such as
javax.jdo.JDODataStoreException: Error executing SQL query "select "DB_ID" from "DBS""
- You should also ignore WARN messages, such as unable to return groups for user ...
- Such as
- The system cannot find the path specified when executing your Spark application the first time.
- Check that your SPARK_HOME environment variable value is correct
- You can also set SPARK_HOME location programmatically inside your script in the following way:
os.environ['SPARK_HOME'] = "/home/pelle/soft/spark-2.3.2"
- It is suggested to use Java 8 as the default Java in your computer.
- NB! Java 9 and 10 will not work without issues!
- Be careful with the code indentation in Python script. Python is very strict when it comes to mixing tabs and spaces.
- Check indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html
- Python in worker has different version 2.7 than that in driver 3.6
- Check that the default Python version in your computer is the same version you use inside your PyCharm project.