Stream Data Processing: Spark Streaming
In this practice session we will take a look how to process stream data using both Spark Streaming and Spark Structured Streaming API's in Python. You will learn how to create, run and test streaming applications in Spark and how to modify their behavior.
References
- Spark Streaming programming guide: https://spark.apache.org/docs/2.3.2/streaming-programming-guide.html
- Spark Structured Streaming guide: https://spark.apache.org/docs/2.3.2/structured-streaming-programming-guide.html
- Lecture slides: https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L11_2018.pdf
- Spark Python API - https://spark.apache.org/docs/latest/api/python/index.html
- Spark Python Streaming API: https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html
- Spark Python Streaming examples: https://github.com/apache/spark/tree/master/examples/src/main/python/streaming
Exercise 11.1. Running a Spark Streaming WordCount example in PyCharm
To start with, lets download and run an example file stream based Spark Streaming WordCount application. We will check how to deal with input files and take a look at the Spark Streaming web console and streaming metrics displayed there.
- Either create a new Python Spark project in PyCharm or reuse your previous project.
- Download Spark Streaming WordCount example: streamFileExample.py
- Create an empty input folder into which we will later create input files.
- Create a run configuration for the streamFileExample.py file.
- streamFileExample takes one argument:
input_folder
- streamFileExample takes one argument:
- Run the streamFileExample.py Python script.
- PS! Spark streaming applications keep running indefinitely - until you stop them.
- Initially your application will have no data to process. File based Spark Streaming applications use the content of newly created files in the input folder as input stream.
- Create new input files and move them into the input directory.
- The files must have a created date later than the start time of the application so simply moving old files will not work.
- Use command line command
cp file_location new_path
to copy a file into the input folder. - You can also use
touch file_name
to reset the file modified timestamp if copying files does not work for you.
- streamFileExample.py prints the results out directly into the console and does not store them in the file system.
- Spark also starts a web interface for your application in the background that can be accessed by going to: http://localhost:4040/
- In some cases, web interface port may differ.
- Open the web interface of your Spark Streaming application and navigate to Streaming tab. Take a look at the Scheduling Delay and Processing Time metrics.
- Input rate is not used when using textFileStream and will stay 0 regardless of how much data is being processed.
- This is because Spark depends on Receivers to report input rate and textFileStream based streaming does not use Receivers.
- Try to generate larger amount of input files between batches to see if you can make the Processing Time raise higher.
- A simple way to generate additional files fast for testing would be to copy the same input file multiple times, each time with a new name. For example:
cp pg00032.txt stream_input/76.txt cp pg00032.txt stream_input/77.txt cp pg00032.txt stream_input/78.txt
- A simple way to generate additional files fast for testing would be to copy the same input file multiple times, each time with a new name. For example:
- Take a screenshot of the Spark Streaming interface at http://localhost:4040/ while your streaming application is running as the delivareble from this exercise
- Example screenshot: sampleScreen.png
Exercise 11.2. Stateful Streaming RDD operations and storing result as text files
The Exercise 11.1 WordCount example computes word count separately for every micro-batch. Combining the output of each micro-batch into a single WordCount would not be difficult, but would require an additional job that processes the output of all previous micro-batches. Instead, lets use Spark Streaming stateful operations, which enable keeping running aggregations in memory and updating them at every micro-batch.
- We first need to set up checkpointing directory to use stateful Spark Streaming operations. Stateful operations cache their running results in memory and thus checkpointing is needed to be able to recover from any faults.
- Add the following line:
ssc.checkpoint("checkpoint")
after Streaming context objectssc
has been created.
- Add the following line:
- Replace the
reduceByKey()
operation with updateStateByKey() operation- updateStateByKey will generate a new aggregated DataFrame just like reduceByKey when it is first applied. However it will keep the aggregation results in memory as intermediate state and update the values at every following batch interval using the supplied user defined aggregation function.
- Also replace the lambda function (
lambda a, b: a + b
) which was previously insidereduceByKey()
to be:lambda a, b: sum(a) + (b or 0)
- Where
a
is a list of new values andb
is the value of the current state, meaning the current state of count of this word.sum(a)
instead of justa
becausea
may now be a list of values instead of just a single value.(b or 0)
instead of justb
becauseb
will beNone
if the previous state (for some key) does not exist.
- Change the application to write the results out as files.
- You can use the
saveAsTextFiles(prefix)
RDD operation to achieve this prefix
is the start of the output folder location. Spark stores the output of each micro-batch into a separate folder which path starts with the prefix and ends with the timestamp of the micro-batch.- If you want the output files to be inside a specific folder, use something like:
output_folder/out
as prefix. Spark will then generate output files with names likeout-1543153935000
inside the output_folder.
- You can use the
Exercise 11.3. Running a Spark Structured Streaming WordCount example
Lets now take a look at Spark Structured Streaming example. Main differences are that it uses DataFrame API and that aggregations are stateful by default.
- Download Spark Structured Streaming WordCount example: structStreamExample.py and add it to your PyCharm project.
- Use the same input folder as you used in Exercise 11.1.
- Create a run configuration for the streamFileExample.py file.
- structStreamExample.py takes one argument:
input_folder
- structStreamExample.py takes one argument:
- Run the structStreamExample.py Python script.
- Just like with Spark Streaming, Spark starts a web interface for your application on the background that can be accessed by going to: http://localhost:4040/
- However, there will be no separate Streaming interface/tab
- In some cases, web interface port may differ.
- Create new input files in the input directory just like you did in Exercise 11.1
- structStreamExample.py prints the results out directly into the console and does not store them in the file system.
Exercise 11.4. Processing csv files with Spark Structured Streaming
Lets now take a look at how to process csv files (Unclaimed bank accounts dataset) using Structured Streaming and how to change the streaming application behavior in various ways.
Dataset Description
- Name: Unclaimed bank accounts
- Location: https://opendata.socrata.com/Government/Unclaimed-bank-accounts/n2rk-fwkj
- Dataset attributes (column names) are:
- Last / Business Name - string
- First Name - string
- Balance - double
- Address - string
- City - string
- Last Transaction Date - string
- Bank Name - string
- Start by modifying the structStreamExample.py script from the previous exercise.
- Change the input format to be csv.
- You will have to create schema manually, specifying what columns and column types will be read from the csv format input stream.
- You can find a Python example how to load in csv format stream and how to specify user schemas from here: https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#creating-streaming-dataframes-and-streaming-datasets
- Use data from the Unclaimed_bank_accounts.csv file as input
- You can use the following Python script to continuously generate smaller input files from a single larger file.
- Download the example streaming file generator: genStreamFiles.py
- Its arguments should be:
input_file output_folder nr_of_lines delay
- genStreamFiles.py script reads the content of the input_file and every delay seconds generates a new file that contains nr_of_lines lines form the input file.
- You can use the following Python script to continuously generate smaller input files from a single larger file.
- Modify the application to group data by bank_name and compute sum, count and averageof Balance as new columns (PS! You can generate multiple aggregation columns at the same time using
.agg(fun1(), fun2(), fun3(), ...)
). - Lets change the grouping to also group by the arrival time of the data records so that the aggregations will be generated for every (user defined) time period.
- Add a new timestamp column to the input DataFrame (before groupBy operation) using
withColumn()
operation.import datetime import time dfWithTimeStamp = yourDataFrame.withColumn("timestamp", F.lit(datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')).cast("timestamp"))
- NB!
lit()
function from the spark.sql.functions library allows you to use simple Python commands as UDF's without having to register them.
- NB!
- Modify the
groupBy("bank")
to also include a 1 minute window (Window duration 1 minutes, step 1 minutes) over timestamp as one of the grouping keys:groupBy("bank", F.window("timestamp", "1 minutes", "1 minutes"))
- As a result, the running aggregations will be computed for every 1 minute time periods.
- Add a new timestamp column to the input DataFrame (before groupBy operation) using
- Change the output mode type of streaming query.
- Change the outputMode from complete first into append and then update.
- Leave the outputMode at update.
- Modify the streaming query definition/options to trigger (micro-batch) processing every 20 seconds.
Bonus Exercise
Bonus task is to investigate the limitations of Spark Structured Streaming. Try to implement Streaming TF-IDF using Spark Structured Streaming and investigate whether it is possible or whether there are technical limitations that make it not feasible using a single Spark Structured Streaming application.
- Structured streaming example from Exercise 11.3 and the DataFrame TF-IDF code from the DataFrame abstraction for distributed data processing lecture slides should provide you with a reasonable starting point.
- The application should use file based streaming and text documents as input (such as the Gutenberg books we have used in some of the previous practice sessions).
- You can also use the example Python file generator from Exercise 11.4 to continuously generate small text files.
Bonus task deliverables
- Python script(s) you created
- General description of whether you succeeded to implement Streaming TF-IDF using Spark Structured Streaming or not
- (Only if applicable) Description of Spark Structured Streaming limitations that make it not feasible to implement TF-IDF using a single Spark Structured streaming.
- You should provide references to any external materials you use as sources.
NB! Considering this bonus task is rather exploratory this time, don't hesitate to ask for clarification form the lab supervisor.
Deliverables:
- Screenshot of the Spark Streaming web interface from Exercise 11.1
- Example screenshot: sampleScreen.png
- Python scripts from Exercises 11.2, 11.4
- Output of the Python scripts from Exercises 11.2, 11.4
- Either as:
- text/csv Output files (should provide files from at least two micro-batches)
- Or as a console output , which has been in a text file.
- Either as:
Potential issues and solutions
- Do not assign Spark Job names that have spaces in them.
- Make sure
SPARK_HOME
System Environment variable has been configured properly.- You can also set SPARK_HOME location programmatically from inside your script in the following way:
import os
os.environ['SPARK_HOME'] = "/home/pelle/soft/spark-2.3.2"
- You can also set SPARK_HOME location programmatically from inside your script in the following way:
- When using Windows: verify that you also have the
HADOOP_HOME
environment variable set up, and that it links to the Hadoop folder we set up in previous labs. It must include the winutils.exe file inside theHADOOP_HOME/bin
folder. - Verify that
JAVA_HOME
is set properly.- It is suggested to use Java 8 as the default Java in your computer.
- NB! You should avoid using paths in System environment variables that have spaces in them.
- Windows 8.3+ provides short names for many folders such as
PROGRA~1
forProgram Files
. - You can use
dir /x folder
to check the folders short name.
- Windows 8.3+ provides short names for many folders such as