Big Data Lab: Apache Spark Dataframes and SQL
We will take a look at how to process data using Parallel Spark DataFrames and SQL language and will use the Yelp dataset. A smaller sample of the Yelp dataset has been prepared for developing and testing data processing applications in the lab.
References
- Spark DataFrame
- Spark DataFrame Python API: https://spark.apache.org/docs/2.4.8/api/python/pyspark.sql.html#pyspark.sql.DataFrame
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/2.4.8/sql-programming-guide.html
- Spark DataFrame Lecture slides
- Spark SQL:
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/latest/sql-getting-started.html
- Spark SQL Functions - https://spark.apache.org/docs/latest/api/sql/index.html
- Spark (Python) SQL examples: https://github.com/apache/spark/blob/master/examples/src/main/python/sql/
Dataset Description
The dataset that we will analyze is the Yelp Dataset. It contains Yelp businesses, reviews, and user data for use in personal, educational, and academic purposes. It is freely available for teaching about databases and NLP. The dataset consists of 6 JSON files: Businesses, reviews, users, checkins, tips, and photos.
NB! Using this dataset requires that you read and agree with the Dataset Challenge Dataset Agreement.
- Name: Yelp Dataset
- Location: https://www.yelp.com/dataset
- Download links for the new smaller samples of the dataset for testing in the lab:
- A: General sample: https://owncloud.ut.ee/owncloud/s/ZEJbHL8NYAT5JKy
- Ask for the password from the lab supervisor
- Documentation: https://www.yelp.com/dataset/documentation/main
Dataset tables are: | |||
Review
|
User
|
Business
|
Checkin
|
Tip
| |||
Photo
|
The join keys are marked red. As we are dealing with JSON files, the order of the attributes may differ from the list here. Some attributes might in turn contain nested structures. Check the dataset documentation or Spark's df.printSchema()
command output for the complete list of (nested) attributes.
Preparing Spark
This section contains the steps needed to use Spark on your computer.
Option 1: Linux or Mac
- Use Visual Studio code or Pytcharm IDE
- Create a new Python project and Install the
pyspark
Python library
- Create a new Python project and Install the
Option 2: Docker
- Use Vcode or Pytcharm IDE
- Install
pyspark
Python library - Do not run the code directly in IDE.
- We will instead run the code inside Docker container.
- Install
- Preparing a Docker container with Spark Python:
docker run -it -d --rm -v "$(pwd)"/docker_directory:/mnt --name spark apache/spark-py /opt/spark/bin/spark-shell
- This will mount docker_directory inside the container into the /mnt folder Make sure everything needed, python script, input files, etc. are inside that folder)
- You may need to use full path in Windows instead of "$(pwd)" or change
"$(pwd)"/docker_directory
to another path/directory in your computer
- An alternative command for Windows, if the previous does not work:
docker run -it -d --rm -v ${PWD}:/mnt --name spark apache/spark-py /opt/spark/bin/spark-shell
- Running a Python script inside the container:
docker exec -it spark /opt/spark/bin/spark-submit /mnt/spark_run.py /mnt/input
- Notice that both the Python script and input folder are now accessed through the /mnt folder we mounted previously. Output folder should also be created inside /mnt
- If some additional libraries are needed, then they should be installed or prepared inside the container.
Option 3: Manual setup in Windows
- NB! It is not suggested approach
- Would be significantly easier to use Docker instead or Linux if you have access to a Linux machine or virtual machine.
- Follow the setting up Spark in Pycharm IDE in this lab tutorial in the Cloud computing course: https://courses.cs.ut.ee/2023/cloud/spring/Main/Practice7
- Only do Task 7.1
Exercise I: Running the first Spark script
You will download and run an example Spark Python DataFrame script to check that everything works. You can use this script as a template in the following exercises.
- Download the following Python Spark DataFrame dataframe_example.py file and move it inside your PySpark project.
- Download the smaller sample of the dataset for development and testing in the lab from here: https://owncloud.ut.ee/owncloud/s/ZEJbHL8NYAT5JKy (Ask for the password from the lab supervisor)
- Create a new folder for input files (It does not have to be inside your Spark Python project) and move the Yelp dataset JSON files there.
- Run the dataframe_example.py Python script to check that everything is working as needed.
- Its argument should be:
input_folder
- When you first try the example script, you should use the location of the Yelp small_review folder as the
input_folder
- Its argument should be:
Familiarize yourself with the content of the example Spark DataFrame script.
- Dataset files are loaded directly as a Spark DataFrame using the
spark.read
command.- Read command supports many different formats like CSV, xml, json, or plaintext.
- We can automatically detect data types for columns by using the “inferSchema” option
- In addition, when the CSV file contains a header line, which specifies the names of the column, we can use the “header” option to automatically read the column names from there
dataset = spark.read \ .option("inferSchema", True) \ .option("header", True) \ .csv(input_folder)
- This means we do not need to worry about removing the header lines from input files.
- To print out the first 10 rows of the DataFrame without truncating any values (the second argument is set to
False
), the script uses the command:dataset.show(10, False)
- To show DataFrame schema (structure), consisting of the names and types of all columns, the script uses:
dataset.printSchema()
- At the end of the script Spark session is stopped with
spark.stop()
command. - NB! When modifying the script later, make sure you add any new code after the
spark.read
, but before thespark.stop()
code lines.
Exercise II: Processing data using Spark DataFrames
Use the Spark DataFrame API to solve the following data processing tasks.
- Extend the previous example to load in all Yelp tables as different DataFrames
- Meaning not only the review dataframe
- Something like: **
review = spark.read \ .option("inferSchema", True) \ .option("header", True) \ .csv(small_review)
- Find the top 5 users who gave out the most 5 star reviews.
- Write a select statement that queries the review table
- Group the review table by user_id and review stars S (S = 1,2,3,4 or 5) and compute the count of rows inside group as starcount (starcount = the number of times user gave S stars).
- This grouped table should look something like this:
+----------------------+-----+---------+ |user_id |stars|starcount| +----------------------+-----+---------+ |Zckxoi9_9btUFld8X10x-g|5 |1 | |ZcVOYdRznoCpQx62Sl0mAA|2 |1 | +----------------------+-----+---------+
- Filter the result by stars value is equal to 5 and order by the count of review stars.
- Find the average amount of stars for Businesses that offer takeout and have a picture of the menu available.
- You will need to join 3 tables: photo, business and review
- Takeout availability can be found from the nested attributes field in business table. Check the full schema of the business table using
business.printSchema()
- Create a Pivot table over cities and review date month and year, with average stars as the aggregation.
- You will need to join review table (stars, date) with Business (city) before applying pivot.
- If you have a conflict between same labeled columns from two tables, You can remove columns (before joining) by calling
df.drop("column")
- If you have a conflict between same labeled columns from two tables, You can remove columns (before joining) by calling
- Check the Lecture slides for a Pivot example.
- This will allow us to get an overview of how the review score of businesses in cities have changed over time.
- Pivot table
- Rows: City
- Columns: Month & Year ("2009--7" for example)
- Cell value: Average stars
- Example (limited) output:
+---------------+-------+-------+-------+-------+-------+-------+ | city|2018-02|2018-03|2018-04|2018-05|2018-06|2018-07| +---------------+-------+-------+-------+-------+-------+-------+ | Tempe| 5.0| null| 4.0| 4.0| null| null| |North Las Vegas| null| null| null| 5.0| null| null| | Phoenix| 3.0| 4.4| 4.0| 3.63| 3.67| null| | Savoy| null| null| null| null| null| null| | Wickliffe| null| null| null| null| null| null| | Monroe| null| null| null| null| null| null| +---------------+-------+-------+-------+-------+-------+-------+
- You will need to join review table (stars, date) with Business (city) before applying pivot.
Exercise III: Processing data using Spark SQL
We will download and run an example Spark Python SQL script to check that everything is working. You can use this script as a template in the following tasks.
- Either create a new Python Spark project in IDE or reuse your previous project.
- Download the following Python Spark SQLexample sql_example.py file and move it inside your PySpark project.
- Run the sql_example.py Python script to check that everything is working as needed.
- Its arguments should be:
input_folder output_folder
- When you first try the example script, you should use the location of the Yelp small_review folder as the
input_folder
- Its arguments should be:
NB! You must solve the data processing tasks of the following tasks by using SQL statements, and you should avoid using DataFrame API and Spark RDD commands. However, you can still use the Dataframe read/write command and other commands which provide information about the structure and content of the tables, such as:
df.printSchema()
df.show(nrOfRows)
df.registerTempTable("table_name")
spark.read
&df.write
Tasks:
- Find the top 5 users who gave out most 5 star reviews.
- Write a select statement that queries the review table
- Group the review table by user_id and review stars S (S = 1,2,3,4 or 5) and compute the count of rows inside group as starcount (starcount = the number of times user gave S stars).
- This grouped table should look something like this:
+----------------------+-----+---------+ |user_id |stars|starcount| +----------------------+-----+---------+ |Zckxoi9_9btUFld8X10x-g|5 |1 | |ZcVOYdRznoCpQx62Sl0mAA|2 |1 | +----------------------+-----+---------+
- Filter the result by stars value is equal to 5 and order by the count of review stars.
- For each user and amount of review stars S (S = 1,2,3,4 or 5) - compute the percentage of total S-star reviews that this person gave.
- You can reuse the intermediate (user_id, stars, starcount) table from the the previous task, but remove ordering or filtering.
- Compute the total sum of review stars count for each possible stars value (S = 1,2,3,4 or 5) as a new column total.
- You can use the
... OVER (PARTITION by ...) ...
statement. Check Lecture slides for an example.
- You can use the
- Compute what is the percentage of user's S-star (S = 1,2,3,4 or 5) reviews out of the total number of S-star reviews in the dataset.
- Example result:
+----------------------+-----+---------+-----+-----------------+ |user_id |stars|starcount|total|percent | +----------------------+-----+---------+-----+-----------------+ |ZcYNpemiHw2OOpQxkVbrVg|4 |10 |332 |3.01204819277108 | +----------------------+-----+---------+-----+-----------------+
- Where in this example row:
- stars - 4-star reviews
- starcount - this user gave 10 4-star reviews
- total - There were total of 332 4-star reviews in the reviews table
- percent - 10 out of a total of 322 is approximately 3%.
- Find the average amount of stars for Businesses that offer takeout and have a picture of the menu available.
- You will need to join 3 tables: photo, business and review
- Takeout availability can be found from the nested attributes field in business table. Check the full schema of the business table using
business.printSchema()
Deliverables:
- Spark Python scripts from Exercise II and III
- Output of the scripts from Exercise II and III
- The output should contain the resulting DataFrame content.
- Output should be provided either as:
- CSV output file(s)
- Or the respective
df.show()
command's output in a text file.
Potential issues and solutions
- Be careful with the code indentation in Python script. Python is very strict when it comes to mixing tabs and spaces.
- Check the indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html