Big Data Lab: Apache Spark Dataframes and SQL
We will take a look at how to process data using Parallel Spark DataFrames and SQL language and will use the Yelp dataset. A smaller sample of the Yelp dataset has been prepared for developing and testing data processing applications in the lab.
References
- Spark DataFrame
- Spark DataFrame Python API: https://spark.apache.org/docs/2.4.8/api/python/pyspark.sql.html#pyspark.sql.DataFrame
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/2.4.8/sql-programming-guide.html
- Spark DataFrame Lecture slides
- Spark SQL:
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/latest/sql-getting-started.html
- Spark SQL Functions - https://spark.apache.org/docs/latest/api/sql/index.html
- Spark (Python) SQL examples: https://github.com/apache/spark/blob/master/examples/src/main/python/sql/
Dataset Description
The dataset that we will analyze is the Yelp Dataset. It contains Yelp businesses, reviews, and user data for use in personal, educational, and academic purposes. It is freely available for teaching about databases and NLP. The dataset consists of 6 JSON files: Businesses, reviews, users, checkins, tips, and photos.
NB! Using this dataset requires that you read and agree with the Dataset Challenge Dataset Agreement.
- Name: Yelp Dataset
- Location: https://www.yelp.com/dataset
- Download links for the new smaller samples of the dataset for testing in the lab:
- A: General sample: https://owncloud.ut.ee/owncloud/s/B7fWxJdN9WBkcof
- Ask for the password from the lab supervisor (hint: code of the course)
- Documentation: https://www.yelp.com/dataset/documentation/main
Dataset tables are: | |||
Review
|
User
|
Business
|
Checkin
|
Tip
| |||
Photo
|
The join keys are marked red. As we are dealing with JSON files, the order of the attributes may differ from the list here. Some attributes might in turn contain nested structures. Check the dataset documentation or Spark's df.printSchema()
command output for the complete list of (nested) attributes.
Preparing Spark
This section contains the steps needed to use Spark on your computer.
We will use Docker to set up Spark and use the Jupyter Notebook for coding and deploying. A similar approach can be used in the Cloud (E.g., Azure Databricks service).
We will use the official Jupyter Spark jupyter/pyspark-notebook
Docker image.
- DockerHub link: https://hub.docker.com/r/jupyter/pyspark-notebook
- Documentation on how to use the Docker container: https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#apache-spark
- Documentation and examples: https://jupyter-docker-stacks.readthedocs.io/en/latest/index.html
- You can use your computer if Docker is installed, or you can create a Virtualmachine (For example, in ETAIS, Azure cloud, etc) to run Jupyter.
- Create a local new folder for storing Jupyter notebook files and input and output files.
- Download the dataset files and move them into the input folder
- You will later use other folders inside this created folder as input and output of Python scripts
- Use the command:
docker run --detach -p 8888:8888 -p 4040:4040 -p 4041:4041 --name spark -v local_data_folder:/home/jovyan/data quay.io/jupyter/pyspark-notebook
- Replace
local_data_folder
with a full path to a folder where you will keep input and output data.- NB! Make sure to use full path to the folder.
- For example, in Windows:
docker run --detach -p 8888:8888 -p 4040:4040 -p 4041:4041 --name spark -v C:\Users\pelle\courses\cloud\2024\spark\data:/home/jovyan/data quay.io/jupyter/pyspark-notebook
- Replace
- Open the Jupyter web interface:
- Use localhost and port 8888 if running Docker locally - http://localhost:8888/
- The page explains what to do to configure a password.
- Run the following docker command to fetch the current security token value:
docker exec -it spark jupyter server list
- Copy the token part of the command output and use it on the Jypyter Notebook web interface when entering a new password.
- NB! Make sure to remember & save the password so you do not lose it.
- Run the following docker command to fetch the current security token value:
Exercise I: Running the first Spark script
You will download and run an example Spark Python DataFrame script to check that everything works. You can use this script as a template in the following exercises.
- Download the following Python Spark DataFrame dataframe_example.py file and copy its content into the Jupyter notebook.
- Download the smaller sample of the dataset for development and testing in the lab from here: https://owncloud.ut.ee/owncloud/s/B7fWxJdN9WBkcof (Ask for the password from the lab supervisor)
- Create a new folder for input files (keep it inside the folder shared with Docker container) and move the Yelp dataset JSON folders there.
- Run the Jupyter notebook to check that everything is working as needed.
- When you first try the example script, you should use the location of the Yelp small_review folder as the
input_folder
- The script expects the path of the folder inside the container to be:
input_folder = "/home/jovyan/data/yelp/small_review"
- Adjust it as needed, if the path is slightly different inside your container.
- When you first try the example script, you should use the location of the Yelp small_review folder as the
Familiarize yourself with the content of the example Spark DataFrame script.
- Dataset files are loaded directly as a Spark DataFrame using the
spark.read
command.- Read command supports many different formats like CSV, xml, json, or plaintext.
- We can automatically detect data types for columns by using the “inferSchema” option
- In addition, when the CSV file contains a header line, which specifies the names of the column, we can use the “header” option to automatically read the column names from there
dataset = spark.read \ .option("inferSchema", True) \ .option("header", True) \ .csv(input_folder)
- This means we do not need to worry about removing the header lines from input files.
- To print out the first 10 rows of the DataFrame without truncating any values (the second argument is set to
False
), the script uses the command:dataset.show(10, False)
- To show DataFrame schema (structure), consisting of the names and types of all columns, the script uses:
dataset.printSchema()
- At the end of the script Spark session is stopped with
spark.stop()
command. - NB! When modifying the script later, make sure you add any new code after the
spark.read
, but before thespark.stop()
code lines.
Exercise II: Processing data using Spark DataFrames
Use the Spark DataFrame API to solve the following data processing tasks.
- Extend the previous example to load in all Yelp tables as different DataFrames
- Meaning not only the review dataframe
- Something like:
review = spark.read \ .option("inferSchema", True) \ .option("header", True) \ .csv(small_review)
business = spark.read \ .option("inferSchema", True) \ .option("header", True) \ .csv(small_business)
- Find the top 5 users who gave out the most 5 star reviews.
- Write a select statement that queries the review table
- Group the review table by user_id and review stars S (S = 1,2,3,4 or 5) and compute the count of rows inside group as starcount (starcount = the number of times user gave S stars).
- This grouped table should look something like this:
+----------------------+-----+---------+ |user_id |stars|starcount| +----------------------+-----+---------+ |Zckxoi9_9btUFld8X10x-g|5 |1 | |ZcVOYdRznoCpQx62Sl0mAA|2 |1 | +----------------------+-----+---------+
- Filter the result by stars value is equal to 5 and order by the count of review stars.
- Find the average amount of stars for Businesses that offer takeout and have a picture of the menu available.
- You will need to join 3 tables: photo, business and review
- Takeout availability can be found from the nested attributes field in business table. Check the full schema of the business table using
business.printSchema()
- Create a Pivot table over cities and review date month and year, with average stars as the aggregation.
- You will need to join review table (stars, date) with Business (city) before applying pivot.
- If you have a conflict between same labeled columns from two tables, You can remove columns (before joining) by calling
df.drop("column")
- If you have a conflict between same labeled columns from two tables, You can remove columns (before joining) by calling
- Check the Lecture slides for a Pivot example.
- This will allow us to get an overview of how the review score of businesses in cities have changed over time.
- Pivot table
- Rows: City
- Columns: Month & Year ("2009--7" for example)
- Cell value: Average stars
- Example (limited) output:
+---------------+-------+-------+-------+-------+-------+-------+ | city|2018-02|2018-03|2018-04|2018-05|2018-06|2018-07| +---------------+-------+-------+-------+-------+-------+-------+ | Tempe| 5.0| null| 4.0| 4.0| null| null| |North Las Vegas| null| null| null| 5.0| null| null| | Phoenix| 3.0| 4.4| 4.0| 3.63| 3.67| null| | Savoy| null| null| null| null| null| null| | Wickliffe| null| null| null| null| null| null| | Monroe| null| null| null| null| null| null| +---------------+-------+-------+-------+-------+-------+-------+
- You will need to join review table (stars, date) with Business (city) before applying pivot.
Exercise III: Processing data using Spark SQL
We will download and run an example Spark Python SQL script to check that everything is working. You can use this script as a template in the following tasks.
- Create a new Jupyter notebook.
- Download the and copy the content of the following Python Spark SQLexample sql_example.py.
- Run the SQL example Jupyter notebook to check that everything is working as needed.
- When you first try the example script, you should use the location of the Yelp small_review folder as the
input_folder
- The script expects the path of the folder inside the container to be:
input_folder = "/home/jovyan/data/yelp/small_review"
- Adjust it as needed, if the path is slightly different inside your container.
- When you first try the example script, you should use the location of the Yelp small_review folder as the
NB! You must solve the data processing tasks of the following tasks by using SQL statements, and you should avoid using DataFrame API and Spark RDD commands. However, you can still use the Dataframe read/write command and other commands which provide information about the structure and content of the tables, such as:
df.printSchema()
df.show(nrOfRows)
df.registerTempTable("table_name")
spark.read
&df.write
Tasks:
- Find the top 5 users who gave out most 5 star reviews.
- Write a select statement that queries the review table
- Group the review table by user_id and review stars S (S = 1,2,3,4 or 5) and compute the count of rows inside group as starcount (starcount = the number of times user gave S stars).
- This grouped table should look something like this:
+----------------------+-----+---------+ |user_id |stars|starcount| +----------------------+-----+---------+ |Zckxoi9_9btUFld8X10x-g|5 |1 | |ZcVOYdRznoCpQx62Sl0mAA|2 |1 | +----------------------+-----+---------+
- Filter the result by stars value is equal to 5 and order by the count of review stars.
- For each user and amount of review stars S (S = 1,2,3,4 or 5) - compute the percentage of total S-star reviews that this person gave.
- You can reuse the intermediate (user_id, stars, starcount) table from the the previous task, but remove ordering or filtering.
- Compute the total sum of review stars count for each possible stars value (S = 1,2,3,4 or 5) as a new column total.
- You can use the
... OVER (PARTITION by ...) ...
statement. Check Lecture slides for an example.
- You can use the
- Compute what is the percentage of user's S-star (S = 1,2,3,4 or 5) reviews out of the total number of S-star reviews in the dataset.
- Example result:
+----------------------+-----+---------+-----+-----------------+ |user_id |stars|starcount|total|percent | +----------------------+-----+---------+-----+-----------------+ |ZcYNpemiHw2OOpQxkVbrVg|4 |10 |332 |3.01204819277108 | +----------------------+-----+---------+-----+-----------------+
- Where in this example row:
- stars - 4-star reviews
- starcount - this user gave 10 4-star reviews
- total - There were total of 332 4-star reviews in the reviews table
- percent - 10 out of a total of 322 is approximately 3%.
- Find the average amount of stars for Businesses that offer takeout and have a picture of the menu available.
- You will need to join 3 tables: photo, business and review
- Takeout availability can be found from the nested attributes field in business table. Check the full schema of the business table using
business.printSchema()
Deliverables:
- Jupyter notebook from Exercise II and III
- Output of the Jupyter notebook from Exercise II and III should be included inside the submitted Jupyter notebook. (Or provided separately)
- Output should contain respective
df.show()
command, which shows (prints out) the results of each of the tasks.
- Output should contain respective
Potential issues and solutions
- Be careful with the code indentation in Python script. Python is very strict when it comes to mixing tabs and spaces.
- Check the indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html