Institute of Computer Science
Courses.cs.ut.ee Institute of Computer Science University of Tartu
  1. Courses
  2. 2025/26 spring
  3. Cloud Computing (LTAT.06.008)
ET
Log in

Cloud Computing 2025/26 spring

  • Main
  • Lectures
  • Practicals
    • Plagiarism Policy
  • Results
  • Submit Homework

Practice 7 - Parallel Spark DataFrames

In this Practice session, you will work with the Apache Spark framework in Python. You will learn how to create Spark DataFrame applications using the PJupyter notebook.

References

  • Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
  • Spark Python DataFrame API - https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation
  • Spark DataFrame/SQL Functions - https://spark.apache.org/docs/latest/api/sql/index.html
  • Spark DataFrame/SQL CheatSheet! - https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf

Scoring & grading

  • Scoring server: https://cloudscoring.cs.ut.ee/
  • Check that Lab07-X scoring checks become green
  • Check the Deliverable section for additional things you need to submit by the end of the lab.

Dataset Description

The dataset we will analyze using Spark DataFrame API is the Beach Weather Stations - Automated Sensors dataset from the City of Chicago. The data is collected by the weather sensors at beaches along Chicago's Lake Michigan lakefront.

  • Name: Beach Weather Stations - Automated Sensors
  • Dataset source: https://data.cityofchicago.org/d/k7hf-8y75
    • You can download by selecting Export -> CSV
    • If the download does not work, ask for the dataset file in Zulip.
  • Dataset attributes (column names) are:
    1. Station Name: string
    2. Measurement Timestamp: string
    3. Air Temperature: double
    4. Wet Bulb Temperature: double
    5. Humidity: integer
    6. Rain Intensity: double
    7. Interval Rain: double
    8. Total Rain: double
    9. Precipitation Type: integer
    10. Wind Direction: integer
    11. Wind Speed: double
    12. Maximum Wind Speed: double
    13. Barometric Pressure: double
    14. Solar Radiation: integer
    15. Heading: integer
    16. Battery Life: double
    17. Measurement Timestamp Label: string
    18. Measurement ID: string

Exercise 7.1. Preparing Storage for Docker on OpenStack

Before setting up Docker, you should attach an additional volume to your OpenStack instance to ensure there is enough storage for Docker images and container data.

  • Step 1: Create and attach a new volume in OpenStack
    • In Openstack first unlock your instance if you locked it in Lab1.
    • Then in the OpenStack dashboard, navigate to Volumes -> Volumes and click Create Volume.
      • Give the volume a descriptive name (e.g., Pseudonym-volume) and set the size to 30 GB.
      • Leave the other settings as default and click Create Volume.
    • Once the volume is created, click the dropdown arrow next to it and select Manage Attachments.
      • Select your virtual machine instance from the list and click Attach Volume.
      • Take note of the device name shown after attaching (e.g., /dev/sdb). You will need this in the next steps.
  • Step 2: Format and mount the volume
    • SSH into your virtual machine and identify the newly attached disk: lsblk
    • You should see the new volume listed (e.g., sdb). Format it with the ext4 filesystem (replace sdb with your actual device name if different):
    • sudo mkfs.ext4 /dev/sdb
    • Create the mount point and mount the volume:
    • sudo mkdir -p /mount
      sudo mount /dev/sdb /mount
      
  • Step 3: Make the mount persistent across reboots
    • Find the UUID of the new volume:
    • sudo blkid /dev/sdb
    • Copy the UUID value from the output, then open the fstab file for editing:
    • sudo nano /etc/fstab
    • Add the following line at the end of the file, replacing YOUR-UUID-HERE with the UUID you copied:
    • UUID=YOUR-UUID-HERE /mount ext4 defaults,nofail 0 2
    • Save the file and verify the configuration is correct by running:
    • sudo mount -a
      • If the command produces no errors, the volume is correctly configured and will automatically mount on reboot.
    • Your /etc/fstab file should look similar to this:
  • Step 4: Configure Docker to store all data on the new volume
    • By default, Docker stores all of its data (images, containers, volumes) in /var/lib/docker. We will redirect it to /mount to make use of the extra storage.
    • Stop the Docker and containerd services if they are already running:
    •  sudo systemctl stop docker.socket
       sudo systemctl stop docker 
       sudo systemctl stop containerd
      
    • Edit the Docker configuration file made in Lab1 and open the daemon configuration file:
    • sudo nano /etc/docker/daemon.json
    • Modify the the file to include the data-root like this:
    • {
        "default-address-pools": [{ "base":"172.80.0.0/16","size":24 }],
        "data-root": "/mount/docker"
      }
      
    • Before starting Docker, move the existing Docker data directory to the new volume:
    • sudo mv /var/lib/docker /mount/docker
      • This ensures any images or containers already present are preserved and accessible from the new location.
    • Containerd (the container runtime used by Docker) also stores data separately in /var/lib/containerd. Move it to the new volume as well:
    • sudo mv /var/lib/containerd /mount/containerd
    • Create the containerd configuration directory and generate the default configuration file:
    • sudo mkdir -p /etc/containerd
      containerd config default | sudo tee /etc/containerd/config.toml
      
    • Open the configuration file and find the root line near the top, and change it to point to the new location:
    • sudo nano /etc/containerd/config.toml
    • Find and modify the root line:
    • root = "/mount/containerd"
    • Create the new containerd data directory on the volume:
    • sudo mkdir -p /mount/containerd
    • Now restart containerd and Docker and verify they are using the new data roots:
    • sudo systemctl start containerd
      sudo systemctl start docker
      docker info | grep "Docker Root Dir"
      sudo containerd config dump | grep root
      
      • docker info should show Docker Root Dir: /mount/docker.
      • containerd config dump should show root = "/mount/containerd".
    • NB! If your VM is running low on disk space at any point (e.g., when pulling large images such as pyspark-notebook), you can free up space by removing unused Docker images and build cache with:
    • docker image prune -a
      docker builder prune -a
      
      • docker image prune -a removes all images not referenced by any existing container.
      • docker builder prune -a removes all build cache.
      • Make sure you do not need any of the listed images before running these commands.
    • If the root disk (/) is still low on space, you can also clean up system logs:
    • sudo journalctl --vacuum-size=500M
      sudo rm -f /var/log/auth.log.*
      sudo rm -f /var/log/syslog.*
      
      • journalctl --vacuum-size=500M trims the systemd journal to at most 500 MB.
      • The rm commands delete old log archives.

Exercise 7.2. Configuring Spark Environment

We will use Docker to set up Spark and use the Jupyter Notebook for coding and deploying. A similar approach can be used in the Cloud (E.g., Azure Databricks service).

We will use the official Jupyter Spark jupyter/pyspark-notebook Docker image.

  • DockerHub link: https://hub.docker.com/r/jupyter/pyspark-notebook
  • Documentation on how to use the Docker container: https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#apache-spark
  • Documentation and examples: https://jupyter-docker-stacks.readthedocs.io/en/latest/index.html
  • Use your OpenStack instance form the previous labs to run Jupyter.
  • Create a new folder for Lab7 in the home directory.
  • Create a new folder for storing input and output files mkdir /home/ubuntu/Lab7/data.
    • Download the dataset beach-weather-stations-automated-sensors-1.csv file and move it into the input folder.
    • You will later use this folder as local_data_folder in the following commands.
  • Use the command: docker run --detach -p 8888:8888 -p 4040:4040 -p 4041:4041 --name spark -v local_data_folder:/home/jovyan/data quay.io/jupyter/pyspark-notebook
    • Replace local_data_folder with a path to a folder where you will keep input and output data.
      • NB! Make sure to use full path to the folder.
    • For example: docker run --detach -p 8888:8888 -p 4040:4040 -p 4041:4041 --name spark -v /home/ubuntu/Lab7/data:/home/jovyan/data quay.io/jupyter/pyspark-notebook
  • Open the Jupyter web interface:
    • Use OpenStack instance IP and port 8888 - http://VIRTUAL_MACHINE_IP:8888/
    • The page explains what to do to configure a password.
      • Run the following docker command to fetch the current security token value:
        • docker exec -it spark jupyter server list
      • Copy the token part of the command output and use it on the Jypyter Notebook web interface when entering a new password.
        • NB! Make sure to remember & save the password so you do not lose it.
  • Download the following Python Spark DataFrame example dataframe_example.py file
  • Create a new Python notebook and copy the content of the dataframe_example.py there.
    • It would be best to create the notebook (or move it into) the shared folder so that it is also stored outside the Docker container.
    • You can create input and output directories as subdirectories inside the shared folder.
  • Make sure the input file is visible inside the data folder inside the Jupyter Notebook web interface
    • If it is not, you might not have copied the input file into the correct folder, or you might have mounted the wrong folder when starting the Docker container.
      • The example script expects the input file to be located in the data/in folder, you may have to change that part of the code: input_folder = "data/in"
      • It would be best to fix it by recreating the container so that the correct folder is mounted.
  • Run the Python DataFrame notebook
    • This example prints the results out directly
    • If you want to store the results in a filesystem instead, you could use the df.write.format("csv") DataFrame command

Exercise 7.3. Familiarizing with the example Spark DataFrame script

Let's take a look at the example dataframe_example.py script.

  • Dataset files are loaded directly as a Spark DataFrame using the spark.read command.
    • Read command supports many different formats like CSV, XML, JSON or plaintext.
    • We can automatically detect data types for columns by using the “inferSchema” option
    • In addition, when the CSV file contains a header line, which specifies the names of the column, we can use the “header” option to automatically read the column names from there
    •     weather_df = spark.read \
                        .option("inferSchema", True) \
                        .option("header", True) \
                        .csv(input_folder)
    • This means we do not need to worry about removing the header lines from input files.
  • To print out the first 10 rows of the DataFrame without truncating any values (the second argument is set to False), the script uses the command:
    • weather_df.show(10, False)
  • To show DataFrame schema (structure), consisting of the names and types of all columns, the script uses:
    • weather_df.printSchema()
  • At the end of the script Spark session is stopped with spark.stop() command.
  • NB! When modifying the script later, add any new code after the spark.read, but before the spark.stop() code lines.

Exercise 7.4. Extending the example DataFrame script

Let's take a look at and try out some typical DataFrame manipulation and data analytics operations on the dataframe:

  1. To filter the number of columns in the DataFrame, use the select command to select only the Station Name and Humidity columns, like this:
    • result_df = weather_df.select("Station Name", "Humidity")
    • result_df.show(10, False)
  2. To filter the content based on some values inside the DataFrame columns, we can use the filter command and provide a conditional statement as an argument to the filter command.
    • result_df= weather_df.filter("Humidity < 40")
    • result_df.show(10, False)
    • The conditional statement supports the same kind of language as SQL conditional statements, and you can refer to the columns by their labels.
    • You can use backticks (`) around column labels when you need to address ones that include spaces, like this:
      • result_df = weather_df.filter('Humidity < 40 and `Air Temperature` > 10')
      • result_df.show(10, False)
  3. Creating new Columns
    • Spark withColumn(new_column_name, expression) method can be used to create new columns.
    • For example, if we want to create a new column by multiplying two existing columns:
      • weather_df= weather_df.withColumn("newColumn", weather_df["Wet Bulb Temperature"] * weather_df["Humidity"])
        • In here, when we need to address a specific column in a typical Python operation (multiplication), then we can use weather_df["Wet Bulb Temperature"] to address the Wet Bulb Temperature column. Another way to address specific DataFrame columns is with a dot, like this: weather_df.newColumn, but it does not work when column names include spaces!
  4. We can also apply aggregation methods to compute statistics on the dataframe through the agg() operation, like this:
    • result_df = weather_df.agg(sparkFun.avg("Humidity"))
    • result_df.show()
      • The agg() function takes the aggregation function as an argument, which in this case is the average (avg) function. The average function takes the column's name as an argument, specifying which column the aggregation function should be applied to.
      • NB! For the above command to work, you must import additional functions from the pyspark.sql.functions library. Add the following line to the start of the script:
        • import pyspark.sql.functions as sparkFun
    • When we want to aggregate data separately for different weather stations, we can use the group by the statement:
      • result_df = weather_df.groupBy("Station Name").agg(sparkFun.avg("Humidity"))
      • result_df.show()
    • It is also possible to apply multiple different aggregation functions inside a single agg() method call by separating the functions by commas. Spark will create a separate column in the resulting DataFrame for each of the specified functions.
    • A nice overview of available aggregation functions is here:
      • https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-aggregate-functions.html
  5. Saving DataFrames as files.
    • To save the DataFrame into a file, we can use the write statement:
    • result_df.write.format("csv")  \
               .option("header", True) \
               .option("compression", "gzip") \
               .save("output_folder")
    • Similarly, you can change the save format into XML, JSON, or plain text.
    • To force Spark to write output as a single file, you can use:
      • result_df.coalesce(1).write.format("json").save(output_folder)
      • coalesce(N) re-partitions the DataFrame or RDD into N partitions.
      • NB! But be careful when using coalesce(N); your program will crash if the whole DataFrame does not fit into the memory of N processes.

Individual task

  • Using the Spark operations explained in this exercise, perform the following task:
    1. Compute the Average Humidity of all stations (group by stations) when the Air Temperature was higher than 20.
    2. Store the results as CSV files.

Exercise 7.5. Using Spark DataFrame API to perform simple statistical analysis

Using the knowledge gained up to this point, solve the following individual tasks using Apache Spark DataFrame API:

  1. For each Station, compute the average Solar Radiation.
  2. Also compute the minimum and maximum Solar Radiation in addition to average.
  3. Compute these three statistics for each day and station.
    • You can extract the day value from the Measurement Timestamp field using date formatting or string manipulation (removing everything but the date string) functions in the pyspark.sql.functions library
      • Spark DataFrame/SQL Functions - https://spark.apache.org/docs/latest/api/sql/index.html
      • NB! If you run into issues using the date formatting functions, use string manipulation functions instead.
  4. Order the result by average solar radiation in descending order
    • Answer: What station had the highest average solar radiation on which day? What was the value?

If you need some more examples of which Spark DataFrame functions are available and how to use them, then the Spark Python API has a very nice overview of available operations with simple examples here:

  • https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation

And DataFrame/SQL functions here:

  • https://spark.apache.org/docs/latest/api/sql/index.html

The resulting DataFrame, BEFORE sorting, should look something like this:

+--------------------+----------+-------------------+-----------------------+-------------------+
|        Station Name|       day|Min Solar radiation|Average Solar radiation|Max Solar radiation|
+--------------------+----------+-------------------+-----------------------+-------------------+
|Foster Weather St...|07/15/2015|                  0|     248.69565217391303|                843|
|Oak Street Weathe...|11/07/2015|                  1|      88.82608695652173|                535|
|Foster Weather St...|12/03/2015|                  0|     19.416666666666668|                 87|
|Foster Weather St...|02/16/2016|                  0|      86.91304347826087|                708|
|Oak Street Weathe...|03/11/2016|                  2|     194.26315789473685|                717|
|Foster Weather St...|03/24/2016|                  0|                  29.05|                329|
|63rd Street Weath...|06/26/2016|                  0|                  323.0|               1123|
|63rd Street Weath...|06/27/2016|                  0|      321.0833333333333|                881|
|Foster Weather St...|08/17/2016|                  0|                    0.0|                  0|
|Oak Street Weathe...|11/04/2016|                  0|      85.21739130434783|                506|
+--------------------+----------+-------------------+-----------------------+-------------------+
  • NB! This is the output using a different dataset. Your output can be different from this result.

Bonus exercise

The goal of the bonus task is to investigate the Spark DataFrame-based Machine learning library, cluster the dataset records into K different clusters using the Spark K-means clustering method, and visualize the results as a graph for the user.

This requires you to:

  1. Import additional libraries that may be needed, such as pandas, matplotlib
  2. Define features based on which the clustering is performed. You should use the "Humidity", "Wind Speed" columns.
    • It would also be good to filter out (can use filter()) obvious outliers from the source data.
  3. Configure the kMeans model (set the value of k to be 4) and train/fit it to the dataset
  4. Apply the kMeans model to the dataset to compute into which cluster each dataset record belongs (this creates a prediction column, prediction = cluster id).
  5. Convert the Spark DataFrame to Pandas DataFrame
  6. Use Pandas DataFrame df.plot.scatter() method to generate a scatterplot graph with features as axis' and cluster id as color.
  7. Use matplotlib.pyplot show() function to display the graph visually to the user.

Also, the dataset contains a large number of null values. Some machine learning methods can not properly deal with such values. To avoid removing all rows containing null values, we can convert all null values to 0 instead for the pure sake of simplicity. This can be done globally in the DataFrame with the following operation: weather_df = weather_df.na.fill(0)

  • NB! You should note that this is often not a smart thing to do in machine learning, as assuming 0 values when data is missing may result in a completely wrong model!

Deliverables:

  1. Python code (.py or .ipynb)
  2. Output of the program (use show() to print out the snapshot of the data frames)
  3. Screenshot of the generated graph

Deliverables:

  • Make sure all the checks on Nagios are green.
  • Python script (.py or .ipynb) from Exercise 7.4
  • Python script (.py or .ipynb) from Exercise 7.5 (only the final version; no need to save intermediate versions of the script separately)
  • Output of the Python scripts from Exercises 7.4, 7.5
    • The output should contain the resulting DataFrame content.
    • (It is ok, if the output is visible in .ipynb)
  • Answer to the question raised at the end of Exercise 7.5.
You must be logged in and registered to the course in order to submit solutions.

Potential issues and solutions

  • Be careful with the code indentation in Python script. Python is stringent when it comes to mixing tabs and spaces.
    • Check the indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html
  • Institute of Computer Science
  • Faculty of Science and Technology
  • University of Tartu
In case of technical problems or questions write to:

Contact the course organizers with the organizational and course content questions.
The proprietary copyrights of educational materials belong to the University of Tartu. The use of educational materials is permitted for the purposes and under the conditions provided for in the copyright law for the free use of a work. When using educational materials, the user is obligated to give credit to the author of the educational materials.
The use of educational materials for other purposes is allowed only with the prior written consent of the University of Tartu.
Terms of use for the Courses environment