Practice 7 - Parallel Spark DataFrames
In this Practice session, you will work with the Apache Spark framework in Python. You will learn how to create Spark DataFrame applications using the PJupyter notebook.
References
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
- Spark Python DataFrame API - https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation
- Spark DataFrame/SQL Functions - https://spark.apache.org/docs/latest/api/sql/index.html
- Spark DataFrame/SQL CheatSheet! - https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf
Dataset Description
The dataset we will analyze using Spark DataFrame API is the Beach Weather Stations - Automated Sensors dataset from the City of Chicago. The data is collected by the weather sensors at beaches along Chicago's Lake Michigan lakefront.
- Name: Beach Weather Stations - Automated Sensors
- Dataset source: https://data.cityofchicago.org/d/k7hf-8y75
- You can download by selecting
Export -> CSV
- If the download does not work, ask for the dataset file in Zulip.
- You can download by selecting
- Dataset attributes (column names) are:
- Station Name: string
- Measurement Timestamp: string
- Air Temperature: double
- Wet Bulb Temperature: double
- Humidity: integer
- Rain Intensity: double
- Interval Rain: double
- Total Rain: double
- Precipitation Type: integer
- Wind Direction: integer
- Wind Speed: double
- Maximum Wind Speed: double
- Barometric Pressure: double
- Solar Radiation: integer
- Heading: integer
- Battery Life: double
- Measurement Timestamp Label: string
- Measurement ID: string
Exercise 7.1. Configuring Spark Environment
We will use Docker to set up Spark and use the Jupyter Notebook for coding and deploying. A similar approach can be used in the Cloud (E.g., Azure Databricks service).
We will use the official Jupyter Spark jupyter/pyspark-notebook
Docker image.
- DockerHub link: https://hub.docker.com/r/jupyter/pyspark-notebook
- Documentation on how to use the Docker container: https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#apache-spark
- Documentation and examples: https://jupyter-docker-stacks.readthedocs.io/en/latest/index.html
- You can use your own computer if docker is installed on it, or you can create an OpenStack instance to run Jupyter.
- If you use instance, than create a virtual machine of
ubuntu22.04
OS, flavorm2.tiny
and set the root Volume Size to be 25GB (Source tab). - Install Docker as done previously in Practice Session 2.
- If you use instance, than create a virtual machine of
- Create a new folder for storing input and output files.
- Download the dataset
beach-weather-stations-automated-sensors-1.csv
file and move it into the input folder (either inside your PC or inside the OpenStack instance if you use VM) - You will later use this folder as
local_data_folder
in the following commands.
- Download the dataset
- Use the command:
docker run --detach -p 8888:8888 -p 4040:4040 -p 4041:4041 --name spark -v local_data_folder:/home/jovyan/data quay.io/jupyter/pyspark-notebook
- Replace
local_data_folder
with a path to a folder where you will keep input and output data.- NB! Make sure to use full path to the folder.
- For example, in Windows:
docker run --detach -p 8888:8888 -p 4040:4040 -p 4041:4041 --name spark -v C:\Users\pelle\courses\cloud\2024\spark\data:/home/jovyan/data quay.io/jupyter/pyspark-notebook
- Replace
- Open the Jupyter web interface:
- Use OpenStack instance IP and port 8888 if running on OpenStack - http://VIRTUAL_MACHINE_IP:8888/
- Use localhost and port 8888 if running Docker locally - http://localhost:8888/
- The page explains what to do to configure a password.
- Run the following docker command to fetch the current security token value:
docker exec -it spark jupyter server list
- Copy the token part of the command output and use it on the Jypyter Notebook web interface when entering a new password.
- NB! Make sure to remember & save the password so you do not lose it.
- Run the following docker command to fetch the current security token value:
- Download the following Python Spark DataFrame example dataframe_example.py file
- Create a new Python notebook and copy the content of the dataframe_example.py there.
- It would be best to create the notebook (or move it into) the shared folder so that it is also stored outside the Docker container.
- You can create input and output directories as subdirectories inside the shared folder.
- Make sure the input file is visible inside the
data
folder inside the Jupyter Notebook web interface- If it is not, you might not have copied the input file into the correct folder, or you might have mounted the wrong folder when starting the Docker container.
- The example script expects the input file to be located in the
data/in
folder, you may7 have to change that part of the code:input_folder = "data/in"
- It would be best to fix it by recreating the container so that the correct folder is mounted.
- The example script expects the input file to be located in the
- If it is not, you might not have copied the input file into the correct folder, or you might have mounted the wrong folder when starting the Docker container.
- Run the Python DataFrame notebook
- This example prints the results out directly
- If you want to store the results in a filesystem instead, you could use the
df.write.format("csv")
DataFrame command
Exercise 7.2. Familiarizing with the example Spark DataFrame script
Let's take a look at the example dataframe_example.py script.
- Dataset files are loaded directly as a Spark DataFrame using the
spark.read
command.- Read command supports many different formats like CSV, XML, JSON or plaintext.
- We can automatically detect data types for columns by using the “inferSchema” option
- In addition, when the CSV file contains a header line, which specifies the names of the column, we can use the “header” option to automatically read the column names from there
weather_df = spark.read \ .option("inferSchema", True) \ .option("header", True) \ .csv(input_folder)
- This means we do not need to worry about removing the header lines from input files.
- To print out the first 10 rows of the DataFrame without truncating any values (the second argument is set to
False
), the script uses the command:weather_df.show(10, False)
- To show DataFrame schema (structure), consisting of the names and types of all columns, the script uses:
weather_df.printSchema()
- At the end of the script Spark session is stopped with
spark.stop()
command. - NB! When modifying the script later, add any new code after the
spark.read
, but before thespark.stop()
code lines.
Exercise 7.3. Extending the example DataFrame script
Let's take a look at and try out some typical DataFrame manipulation and data analytics operations on the dataframe:
- To filter the number of columns in the DataFrame, use the select command to select only the Station Name and Humidity columns, like this:
result_df = weather_df.select("Station Name", "Humidity")
result_df.show(10, False)
- To filter the content based on some values inside the DataFrame columns, we can use the filter command and provide a conditional statement as an argument to the filter command.
result_df= weather_df.filter("Humidity < 40")
result_df.show(10, False)
- The conditional statement supports the same kind of language as SQL conditional statements, and you can refer to the columns by their labels.
- You can use backticks (`) around column labels when you need to address ones that include spaces, like this:
result_df = weather_df.filter('Humidity < 40 and `Air Temperature` > 10')
result_df.show(10, False)
- Creating new Columns
- Spark
withColumn(new_column_name, expression)
method can be used to create new columns. - For example, if we want to create a new column by multiplying two existing columns:
weather_df= weather_df.withColumn("newColumn", weather_df["Wet Bulb Temperature"] * weather_df["Humidity"])
- In here, when we need to address a specific column in a typical Python operation (multiplication), then we can use
weather_df["Wet Bulb Temperature"]
to address the Wet Bulb Temperature column. Another way to address specific DataFrame columns is with a dot, like this:weather_df.newColumn
, but it does not work when column names include spaces!
- In here, when we need to address a specific column in a typical Python operation (multiplication), then we can use
- Spark
- We can also apply aggregation methods to compute statistics on the dataframe through the
agg()
operation, like this:result_df = weather_df.agg(sparkFun.avg("Humidity"))
result_df.show()
- The
agg()
function takes the aggregation function as an argument, which in this case is the average (avg) function. The average function takes the column's name as an argument, specifying which column the aggregation function should be applied to. - NB! For the above command to work, you must import additional functions from the pyspark.sql.functions library. Add the following line to the start of the script:
import pyspark.sql.functions as sparkFun
- The
- When we want to aggregate data separately for different weather stations, we can use the group by the statement:
result_df = weather_df.groupBy("Station Name").agg(sparkFun.avg("Humidity"))
result_df.show()
- It is also possible to apply multiple different aggregation functions inside a single agg() method call by separating the functions by commas. Spark will create a separate column in the resulting DataFrame for each of the specified functions.
- A nice overview of available aggregation functions is here:
- Saving DataFrames as files.
- To save the DataFrame into a file, we can use the write statement:
result_df.write.format("csv") \ .option("header", True) \ .option("compression", "gzip") \ .save("output_folder")
- Similarly, you can change the save format into XML, JSON, or plain text.
- To force Spark to write output as a single file, you can use:
result_df.coalesce(1).write.format("json").save(output_folder)
coalesce(N)
re-partitions the DataFrame or RDD into N partitions.- NB! But be careful when using coalesce(N); your program will crash if the whole DataFrame does not fit into the memory of N processes.
Individual task
- Using the Spark operations explained in this exercise, perform the following task:
- Compute the Average Humidity of all stations (group by stations) when the Air Temperature was higher than 20.
- Store the results as CSV files.
Exercise 7.4. Using Spark DataFrame API to perform simple statistical analysis
Using the knowledge gained up to this point, solve the following individual tasks using Apache Spark DataFrame API:
- For each Station, compute the average Solar Radiation.
- Also compute the minimum and maximum Solar Radiation in addition to average.
- Compute these three statistics for each day and station.
- You can extract the day value from the Measurement Timestamp field using date formatting or string manipulation (removing everything but the date string) functions in the pyspark.sql.functions library
- Spark DataFrame/SQL Functions - https://spark.apache.org/docs/latest/api/sql/index.html
- NB! If you run into issues using the date formatting functions, use string manipulation functions instead.
- You can extract the day value from the Measurement Timestamp field using date formatting or string manipulation (removing everything but the date string) functions in the pyspark.sql.functions library
- Order the result by average solar radiation in descending order
- Answer: What station had the highest average solar radiation on which day? What was the value?
If you need some more examples of which Spark DataFrame functions are available and how to use them, then the Spark Python API has a very nice overview of available operations with simple examples here:
And DataFrame/SQL functions here:
The resulting DataFrame, BEFORE sorting, should look something like this:
+--------------------+----------+-------------------+-----------------------+-------------------+ | Station Name| day|Min Solar radiation|Average Solar radiation|Max Solar radiation| +--------------------+----------+-------------------+-----------------------+-------------------+ |Foster Weather St...|07/15/2015| 0| 248.69565217391303| 843| |Oak Street Weathe...|11/07/2015| 1| 88.82608695652173| 535| |Foster Weather St...|12/03/2015| 0| 19.416666666666668| 87| |Foster Weather St...|02/16/2016| 0| 86.91304347826087| 708| |Oak Street Weathe...|03/11/2016| 2| 194.26315789473685| 717| |Foster Weather St...|03/24/2016| 0| 29.05| 329| |63rd Street Weath...|06/26/2016| 0| 323.0| 1123| |63rd Street Weath...|06/27/2016| 0| 321.0833333333333| 881| |Foster Weather St...|08/17/2016| 0| 0.0| 0| |Oak Street Weathe...|11/04/2016| 0| 85.21739130434783| 506| +--------------------+----------+-------------------+-----------------------+-------------------+
- NB! This is the output using a different dataset. Your output can be different from this result.
Bonus exercise
The goal of the bonus task is to investigate the Spark DataFrame-based Machine learning library, cluster the dataset records into K different clusters using the Spark K-means clustering method, and visualize the results as a graph for the user.
This requires you to:
- Import additional libraries that may be needed, such as pandas, matplotlib
- Define features based on which the clustering is performed. You should use the "Humidity", "Wind Speed" columns.
- It would also be good to filter out (can use filter()) obvious outliers from the source data.
- Configure the kMeans model (set the value of k to be 4) and train/fit it to the dataset
- Apply the kMeans model to the dataset to compute into which cluster each dataset record belongs (this creates a prediction column, prediction = cluster id).
- Convert the Spark DataFrame to Pandas DataFrame
- Use Pandas DataFrame df.plot.scatter() method to generate a scatterplot graph with features as axis' and cluster id as color.
- Use matplotlib.pyplot
show()
function to display the graph visually to the user.
Also, the dataset contains a large number of null values. Some machine learning methods can not properly deal with such values. To avoid removing all rows containing null values, we can convert all null values to 0 instead for the pure sake of simplicity. This can be done globally in the DataFrame with the following operation:
weather_df = weather_df.na.fill(0)
- NB! You should note that this is often not a smart thing to do in machine learning, as assuming 0 values when data is missing may result in a completely wrong model!
Deliverables:
- Python code (.py or .ipynb)
- Output of the program (use
show()
to print out the snapshot of the data frames) - Screenshot of the generated graph
Deliverables:
- Python script (.py or .ipynb) from Exercise 7.3
- Python script (.py or .ipynb) from Exercise 7.4 (only the final version; no need to save intermediate versions of the script separately)
- Output of the Python scripts from Exercises 7.3, 7.4
- The output should contain the resulting DataFrame content.
- (It is ok, if the output is visible in .ipynb)
- Answer to the question raised at the end of Exercise 7.4.
Potential issues and solutions
- Be careful with the code indentation in Python script. Python is stringent when it comes to mixing tabs and spaces.
- Check the indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html