Arvutiteaduse instituut
  1. Kursused
  2. 2018/19 sügis
  3. Hajusandmetöötlus pilves (LTAT.06.005)
EN
Logi sisse

Hajusandmetöötlus pilves 2018/19 sügis

  • HomePage
  • Lectures
  • Practicals
  • Submit Homework

Stream Data Processing: Spark Streaming

In this practice session we will take a look how to process stream data using both Spark Streaming and Spark Structured Streaming API's in Python. You will learn how to create, run and test streaming applications in Spark and how to modify their behavior.

References

  • Spark Streaming programming guide: https://spark.apache.org/docs/2.3.2/streaming-programming-guide.html
  • Spark Structured Streaming guide: https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html
  • Lecture slides: https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L11_2018.pdf
  • Spark Python API - https://spark.apache.org/docs/latest/api/python/index.html
  • Spark Python Streaming API: https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html
  • Spark Python Streaming examples: https://github.com/apache/spark/tree/master/examples/src/main/python/streaming

Exercise 11.1. Running a Spark Streaming WordCount example in PyCharm

To start with, lets download and run an example file stream based Spark Streaming WordCount application. We will check how to deal with input files and take a look at the Spark Streaming web console and streaming metrics displayed there.

  • Either create a new Python Spark project in PyCharm or reuse your previous project.
  • Download Spark Streaming WordCount example: streamFileExample.py
  • Create an empty input folder into which we will later create input files.
  • Create a run configuration for the streamFileExample.py file.
    • streamFileExample takes one argument: input_folder
  • Run the streamFileExample.py Python script.
    • PS! Spark streaming applications keep running indefinitely - until you stop them.
  • Initially your application will have no data to process. File based Spark Streaming applications use the content of newly created files in the input folder as input stream.
  • Create new input files and move them into the input directory.
    • The files must have a created date later than the start time of the application so simply moving old files will not work.
    • Use command line command cp file_location new_path to copy a file into the input folder.
    • You can also use touch file_name to reset the file modified timestamp if copying files does not work for you.
  • streamFileExample.py prints the results out directly into the console and does not store them in the file system.
  • Spark also starts a web interface for your application in the background that can be accessed by going to: http://localhost:4040/
    • In some cases, web interface port may differ.
    • Open the web interface of your Spark Streaming application and navigate to Streaming tab. Take a look at the Scheduling Delay and Processing Time metrics.
      • Input rate is not used when using textFileStream and will stay 0 regardless of how much data is being processed.
      • This is because Spark depends on Receivers to report input rate and textFileStream based streaming does not use Receivers.
    • Try to generate larger amount of input files between batches to see if you can make the Processing Time raise higher.
      • A simple way to generate additional files fast for testing would be to copy the same input file multiple times, each time with a new name. For example:
        • cp pg00032.txt stream_input/76.txt
          cp pg00032.txt stream_input/77.txt
          cp pg00032.txt stream_input/78.txt
    • Take a screenshot of the Spark Streaming interface at http://localhost:4040/ while your streaming application is running as the delivareble from this exercise
    • Example screenshot: sampleScreen.png

Exercise 11.2. Stateful Streaming RDD operations and storing result as text files

The Exercise 11.1 WordCount example computes word count separately for every micro-batch. Combining the output of each micro-batch into a single WordCount would not be difficult, but would require an additional job that processes the output of all previous micro-batches. Instead, lets use Spark Streaming stateful operations, which enable keeping running aggregations in memory and updating them at every micro-batch.

  1. We first need to set up checkpointing directory to use stateful Spark Streaming operations. Stateful operations cache their running results in memory and thus checkpointing is needed to be able to recover from any faults.
    • Add the following line: ssc.checkpoint("checkpoint") after Streaming context object ssc has been created.
  2. Replace the reduceByKey() operation with updateStateByKey() operation
    • updateStateByKey will generate a new aggregated DataFrame just like reduceByKey when it is first applied. However it will keep the aggregation results in memory as intermediate state and update the values at every following batch interval using the supplied user defined aggregation function.
  3. Also replace the lambda function (lambda a, b: a + b) which was previously inside reduceByKey() to be:
    • lambda a, b: sum(a) + (b or 0)
    • Where a is a list of new values and b is the value of the current state, meaning the current state of count of this word.
      • sum(a) instead of just a because a may now be a list of values instead of just a single value.
      • (b or 0) instead of just b because b will be None if the previous state (for some key) does not exist.
  4. Change the application to write the results out as files.
    • You can use the saveAsTextFiles(prefix) RDD operation to achieve this
    • prefix is the start of the output folder location. Spark stores the output of each micro-batch into a separate folder which path starts with the prefix and ends with the timestamp of the micro-batch.
    • If you want the output files to be inside a specific folder, use something like: output_folder/out as prefix. Spark will then generate output files with names like out-1543153935000 inside the output_folder.

Exercise 11.3. Running a Spark Structured Streaming WordCount example

Lets now take a look at Spark Structured Streaming example. Main differences are that it uses DataFrame API and that aggregations are stateful by default.

  • Download Spark Structured Streaming WordCount example: structStreamExample.py and add it to your PyCharm project.
  • Use the same input folder as you used in Exercise 11.1.
  • Create a run configuration for the streamFileExample.py file.
    • structStreamExample.py takes one argument: input_folder
  • Run the structStreamExample.py Python script.
  • Just like with Spark Streaming, Spark starts a web interface for your application on the background that can be accessed by going to: http://localhost:4040/
    • However, there will be no separate Streaming interface/tab
    • In some cases, web interface port may differ.
  • Create new input files in the input directory just like you did in Exercise 11.1
  • structStreamExample.py prints the results out directly into the console and does not store them in the file system.

Exercise 11.4. Processing csv files with Spark Structured Streaming

Lets now take a look at how to process csv files (Unclaimed bank accounts dataset) using Structured Streaming and how to change the streaming application behavior in various ways.

Dataset Description

  • Name: Unclaimed bank accounts
  • Location: https://opendata.socrata.com/Government/Unclaimed-bank-accounts/n2rk-fwkj
  • Dataset attributes (column names) are:
    1. Last / Business Name - string
    2. First Name - string
    3. Balance - double
    4. Address - string
    5. City - string
    6. Last Transaction Date - string
    7. Bank Name - string
  1. Start by modifying the structStreamExample.py script from the previous exercise.
  2. Change the input format to be csv.
    • You will have to create schema manually, specifying what columns and column types will be read from the csv format input stream.
    • You can find a Python example how to load in csv format stream and how to specify user schemas from here: https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets
  3. Use data from the Unclaimed_bank_accounts.csv file as input
    • You can use the following Python script to continuously generate smaller input files from a single larger file.
      • Download the example streaming file generator: genStreamFiles.py
      • Its arguments should be: input_file output_folder nr_of_lines delay
      • genStreamFiles.py script reads the content of the input_file and every delay seconds generates a new file that contains nr_of_lines lines form the input file.
  4. Modify the application to group data by bank_name and compute sum, count and averageof Balance as new columns (PS! You can generate multiple aggregation columns at the same time using .agg(fun1(), fun2(), fun3(), ...) ).
  5. Lets change the grouping to also group by the arrival time of the data records so that the aggregations will be generated for every (user defined) time period.
    • Add a new timestamp column to the input DataFrame (before groupBy operation) using withColumn() operation.
      • import datetime
        import time
        dfWithTimeStamp = yourDataFrame.withColumn("timestamp", F.lit(datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')).cast("timestamp"))
        • NB! lit() function from the spark.sql.functions library allows you to use simple Python commands as UDF's without having to register them.
    • Modify the groupBy("bank") to also include a 1 minute window (Window duration 1 minutes, step 1 minutes) over timestamp as one of the grouping keys:
      • groupBy("bank", F.window("timestamp", "1 minutes", "1 minutes"))
      • As a result, the running aggregations will be computed for every 1 minute time periods.
  6. Change the output mode type of streaming query.
    • Change the outputMode from complete first into append and then update.
    • Leave the outputMode at update.
  7. Modify the streaming query definition/options to trigger (micro-batch) processing every 20 seconds.

Bonus Exercise

Bonus task is to investigate the limitations of Spark Structured Streaming. Try to implement Streaming TF-IDF using Spark Structured Streaming and investigate whether it is possible or whether there are technical limitations that make it not feasible using a single Spark Structured Streaming application.

  • Structured streaming example from Exercise 11.3 and the DataFrame TF-IDF code from the DataFrame abstraction for distributed data processing lecture slides should provide you with a reasonable starting point.
  • The application should use file based streaming and text documents as input (such as the Gutenberg books we have used in some of the previous practice sessions).
  • You can also use the example Python file generator from Exercise 11.4 to continuously generate small text files.

Bonus task deliverables

  1. Python script(s) you created
  2. General description of whether you succeeded to implement Streaming TF-IDF using Spark Structured Streaming or not
  3. (Only if applicable) Description of Spark Structured Streaming limitations that make it not feasible to implement TF-IDF using a single Spark Structured streaming.
    • You should provide references to any external materials you use as sources.

NB! Considering this bonus task is rather exploratory this time, don't hesitate to ask for clarification form the lab supervisor.

Deliverables:

  • Screenshot of the Spark Streaming web interface from Exercise 11.1
    • Example screenshot: sampleScreen.png
  • Python scripts from Exercises 11.2, 11.4
  • Output of the Python scripts from Exercises 11.2, 11.4
    • Either as:
      1. text/csv Output files (should provide files from at least two micro-batches)
      2. Or as a console output , which has been in a text file.
11. Practice session 11
Sellele ülesandele ei saa enam lahendusi esitada.

Potential issues and solutions

  • Do not assign Spark Job names that have spaces in them.
  • Make sure SPARK_HOME System Environment variable has been configured properly.
    • You can also set SPARK_HOME location programmatically from inside your script in the following way:
      • import os
      • os.environ['SPARK_HOME'] = "/home/pelle/soft/spark-2.3.2"
  • When using Windows: verify that you also have the HADOOP_HOME environment variable set up, and that it links to the Hadoop folder we set up in previous labs. It must include the winutils.exe file inside the HADOOP_HOME/bin folder.
  • Verify that JAVA_HOME is set properly.
    • It is suggested to use Java 8 as the default Java in your computer.
  • NB! You should avoid using paths in System environment variables that have spaces in them.
    • Windows 8.3+ provides short names for many folders such as PROGRA~1 for Program Files.
    • You can use dir /x folder to check the folders short name.
  • Arvutiteaduse instituut
  • Loodus- ja täppisteaduste valdkond
  • Tartu Ülikool
Tehniliste probleemide või küsimuste korral kirjuta:

Kursuse sisu ja korralduslike küsimustega pöörduge kursuse korraldajate poole.
Õppematerjalide varalised autoriõigused kuuluvad Tartu Ülikoolile. Õppematerjalide kasutamine on lubatud autoriõiguse seaduses ettenähtud teose vaba kasutamise eesmärkidel ja tingimustel. Õppematerjalide kasutamisel on kasutaja kohustatud viitama õppematerjalide autorile.
Õppematerjalide kasutamine muudel eesmärkidel on lubatud ainult Tartu Ülikooli eelneval kirjalikul nõusolekul.
Courses’i keskkonna kasutustingimused