Processing data with Apache Spark
In this Practice session you will start working with the Apache Spark framework in Python. You will learn how to set it up without having to install or configure Spark in your computer and learn how to create Spark applications. We will create several Spark applications that performs similar tasks as in previous labs to get an overview of the differences from Pig and MapReduce.
References
- Spark programming guide: https://spark.apache.org/docs/latest/programming-guide.html
- Spark Python API - https://spark.apache.org/docs/latest/api/python/index.html
- Spark Python RDD functions: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
- Spark Lecture slides: https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L8_2018.pdf
- Spark Python examples: https://github.com/apache/spark/tree/master/examples/src/main/python
Exercise 8.1. Preparing environment for Apache Spark
- NB! Make sure your computer has Python installed and is accessible from command line.
- You can choose to use either 2.7 or 3.6 version of Python. But make sure that the Python version you choose is set as the default Python in your computer
- In Windows, check that Python folder been added to the Windows PATH variable.
- Download pre-built Apache Spark distribution from:
- https://spark.apache.org/downloads.html
- Choose the following options:
- Version 2.3.2
- Pre-built for Apache Hadoop 2.7 and later
- Unpack the downloaded spark-2.3.2-bin-hadoop2.7.tgz archive
- Add a new
SPARK_HOME
System Environment variable which points to the unpacked Spark distribution folder. (Inside that folder should beREADME.md
file andbin
folder)- You can also set SPARK_HOME location programmatically inside your script in the following way:
import os
os.environ['SPARK_HOME'] = "/home/pelle/soft/spark-2.3.2"
- You can also set SPARK_HOME location programmatically inside your script in the following way:
- Verify that you also have the
HADOOP_HOME
environment variable set up, and that it links to the Hadoop folder we set up in previous labs. It must include the winutils.exe file inside the bin folder. - Verify that
JAVA_HOME
is set properly.- It is suggested to use Java 8 as the default Java in your computer.
- NB! You should avoid using paths in System environment variables that have spaces in them.
- Windows 8.3+ provides short names for many folders such as
PROGRA~1
forProgram Files
. - You can use
dir /x folder
to check the folders short name.
- Windows 8.3+ provides short names for many folders such as
Exercise 8.2. Configuring PyCharm for Spark Python
We will use a Python PyCharm IDE to simplify working with Spark Python scripts. We will download and run an example Spark WordCount script.
- Download and install the community edition of PyCharm Python IDE.
- Open PyCharm and create a new Python project.
- Create a new VirtualEnv.
- NB! Make sure the Python version is same as the default Python of your computer (Either 2.7 or 3.6)
- Make sure to activate Inherit global site-packages
- Create a new VirtualEnv.
- Open the PyCharm settings (
File->Settings
)- Open Project Interpreter settings
Project: Name -> Project Interpreter
- Click the Gear button on upper right side of interpreter settings window and choose
Show All..
- Open Project Interpreter settings
- Choose your virtual environment and click on the Show paths button on the right hand side
- Click on the Add button on the right hand side
- Browse to the previously unpaged Spark distribution folder and add two zip files (pyspark.zip & py4j-0.10.7-src.zip) from the
python\lib
folder
- Browse to the previously unpaged Spark distribution folder and add two zip files (pyspark.zip & py4j-0.10.7-src.zip) from the
- Download the following Python Spark WordCount example wordcount.py file and move it inside your PySpark project.
- You will find other Python Spark examples in the
examples\src\main\python
folder inside the Spark distribution you previously unpacked.
- You will find other Python Spark examples in the
- Try to run the wordcount.py (You will get an error about the missing arguments, but it will generate run configuration for it)
- Modify the run configuration of the wordcount.py script.
- Create a new folder for input files (Does not have to be inside your Spark Python project)
- Add full path to the input folder as the first argument of the script
- Add some text files into the input folder
- Run the Python WordCount script. If it works without errors then you have successfully set up Apache Spark in PyCharm.
- This example prints the results out directly to the console
- If you want to store the results into a filesystem instead, use
saveAsTextFile
RDD action - This script controls how many arguments were given. You will have to modify this part if you want to add additional parameters, such as output folder location
Exercise 8.3. Processing unclaimed bank accounts
- Create a new Spark Python script.
- Add Unclaimed_bank_accounts.csv as input file
- Load in the
Unclaimed_bank_accounts.csv
file as an RDD. - The RDD should contain tuples of length 7:
('Last / Business Name', 'First Name', 'Balance', 'Address', 'City', 'Last Transaction', 'bank_name')
- To parse csv files correctly, you can use:
bank_accounts = spark.read.option("header", "true").csv(input_folder).rdd.map(lambda x: tuple(x))
- This code line takes advantage of csv loading library from Spark DataFrame API to read in the csv file and converts the resulting DataFrame Rows back into RDD tuples.
- To parse csv files correctly, you can use:
- Fast way to check the content of an RDD is to use:
print(rdd.take(3))
- This command extracts 3 tuples out of the RDD as a list and prints the list out into the console.
- Now, lets solve the exercises 3.2 and 3.3 from Practice 3 - Processing data with MapReduce
- For each City, calculate the average account balance.
- For each City, generate 3 highest and 3 lowest balances.
- We can do it by applying 3 functions:
map
,groupByKey
andmap
- Create a
map
which extracts the values we are interested as key and value pairs and returns a tuple of:(city, balance)
(TIP: take a look at the Spark python WordCount example in the lecture slides) - Group data by city using
groupByKey
- Apply a function through
map()
operation to process each group at a time and find the top 3 and bottom 3 balances.- If the Map is applied to the result of groupByKey (or groupBy) it is almost the same as the Reduce method in Hadoop MapReduce and its input is a tuple of (key, values) where the values is an iterateable object containing all values inside this group.
- Here is an example of a user defined sum function myfun() applied inside map() (after applying groupByKey() to group values by key).
def mySumfun(tuple): (key, values) = tuple sum = 0 for value in values: sum = sum + value return (key, sum)
- Function inside map() should return a value or a tuple of values
- Create a
- We can do it by applying 3 functions:
- Change the grouping to contain both City and Bank Name
- In the
map
function beforegroupByKey
, assign a two element tuple (city, bank) of these fields as a Key (Key is first element in the RDD tuple)
- In the
- Now, analyse unclaimed bank account from only a specific last_transaction year.
- You should use the
filter
transformation. - Value of the specific year should be given as an extra program argument to your Spark script.
- You should use the
Exercise 8.4. Information retrieval - TF-IDF
Implement TF-IDF from the MapReduce in Information Retrieval lab in Spark. Out goal is to compute TF-IDF for each document and word combination in the dataset.
The formula for TF-IDF is:
TFIDF(Word, Document) = n/N * log(D/m)
where:
n
: number of occurrences of Word in DocumentN
: Total number of words in DocumentD
: Total number of documents in the datasetm
: number of documents Word occurs in
How to extract Document name/path
- NB! If you have trouble finding a way how to get the name of the file the input line was taken from you can use the following code to achieve this:
import pyspark.sql.functions as F
lines = spark.read.text(input_folder).withColumn("file", F.input_file_name()).rdd.map(lambda x: tuple(x))
- Resulting lines RDD contains (line, file) tuples.
Generating word tuples from text lines
- To explode/flatten line of words into multiple tuples of words - you can apply either lines.flatMap(fun) or lines.flatMapValues(fun). Our current case is different from the WordCount example because the lines RDD contains
(line, file)
tuples instead of simply lines if you followed the previous step.- flatMapValues(fun) can be used to flatten/explode only the value field, if the input RDD contains (Key, Value) tuples. But you first have to change the order (using map() transformation) of elements in the tuple, so that file is the first element (Key) and line is the second element (Value) in the RDD tuples.
- If you use flatMap(fun), then your flatMap function should generate a list of (word, file) tuples instead of simply a list of words.
Working with nested tuples in Python
- When you need to work with tuples that contain other tuples, you can address the tuple elements in the following ways:
tuple = ('x',('y','z')) (a,b) = tuple print(a) # x print(b) # ('y', 'z') (c,d) = b print(c) # y print(tuple[1][0]) # y
Computing TF/IDF
There are several approaches to compute TF-IDF using Spark. For example, you can:
- Follow the same step-by-step MapReduce-like approach we took in Lab 4 to compute TF-IDF.
- Or compute
n
,N
andm
values as separate RDDs and use join or joinByKey operations to join all the individual RDDs into a single one to compute TF-IDF.
For the MapReduce-like approach we took in Lab 4:
- Calculate the word count for each word and document as
n
.- Group words RDD by (Word, Document) and count each row in the group as
n
- Output should be a tuple of
((Word, Document), n)
- Group words RDD by (Word, Document) and count each row in the group as
- Calculate total number of words for each document as N
- Group previous RDD by Document and sum all the
n
's in the whole document asN
- Output should be a tuple of
(Document, (Word, n, N))
- Group previous RDD by Document and sum all the
- Calculate the number of documents each word occurs in as m
- Group previous RDD by Word and compute
m
as the number of tuples (inside each group) - Output should be a tuple of
(Word, (Document, n, N, m))
- Group previous RDD by Word and compute
- Calculate the TD-IDF value for each word and document pair.
- Apply map and compute TF-IDF based on n, N, m and D.
- Output should be a tuple of
((Word, Document), TF-IDF)
- It may be a bit too complicated to use reduceByKey or foldByKey type operations when value is a tuple. Feel free to use groupByKey() and flatMap() (or map()) instead.
- Here is an example of a user defined function myfun() applied inside flatMap() (after applying groupBy()) when the key is Document and values contain
(word, n)
tuples:def myfun(tuple): (doc, values) = tuple outputTuples = [] for value in values: (word, n) = value outputTuples.append((doc, word, n)) return outputTuples
- Function inside flatMap() should return a list of values or tuples. Each element in the output list will be one value/tuple inside the resulting RDD.
- Here is an example of a user defined function myfun() applied inside flatMap() (after applying groupBy()) when the key is Document and values contain
The computed TF-IDF results should be written to an output folder instead of just printing them out into the console.
Bonus Exercise
- Extend your Spark Python TF-IDF program to:
- Filter out common English stop-words (and, if, else, ...)
- The list of stop-words must be read in from a separate files and you must use broadcast variables to pass the content of this list along to your Spark functions.
- You can find examples of stop-word lists here: https://github.com/igorbrigadir/stopwords
- Find Top N (N=10 initially) highest weighted words for each document.
- Configure N to be read as an additional argument to your Spark Python TF-IDF application
- Generate a reverse index of Words to Documents based on all the top N highest weighted words. Something like this:
- word1 -> Document1, Document5
- word2 -> Document3,
- ..
- wordN -> DocumentX, ..., DocumentY
- Filter out common English stop-words (and, if, else, ...)
- The resulting index should not include any typical English stop-words.
- It should also not contain any unwanted special characters. You can make a few exceptions, such as
-
. - Deliverables
- Extended TF-IDF Python script
- Output file(s) of the script
Deliverables:
- Python scripts from Exercises 8.3, 8.4
- Output of the Python scripts from Exercises 8.3, 8.4
Potential issues and solutions
- The system cannot find the path specified when executing your Spark application the first time.
- Check that your SPARK_HOME environment variable value is correct
- It is suggested to use Java 8 as the default Java in your computer.
- Be careful with the code indentation in Python script. Python is very strict when it comes to mixing tabs and spaces.
- Check indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html
- Python in worker has different version 2.7 than that in driver 3.6
- Check that Python3.X (And not Python2.X) is the default Python in your computer.
- Issues with input path location in Windows
- To avoid relative path errors in Windows you may have to specify a valid full path for Spark warehouse directory. Change the SparkSession line and add spark.sql.warehouse.dir configuration :
spark = SparkSession\ .builder \ .appName("PythonWordCount") \ .config('spark.sql.warehouse.dir', 'file:///C:/Users/pjakovits/SparkWarehouse') \ .getOrCreate()