Machine Learning in Apache Spark (Python)
We will take a look at how to use Spark DataFrame-based Machine Learning API to perform machine learning operations on the Census-Income (KDD) dataset that has large amount of numerical and textual columns. We will use Spark ML API to create a ML pipeline which performs feature transformation and selection on the dataset and builds a classification model for predicting how much salary people earn based on their census data. We will mainly concentrate of feature preparation and selection, but will also try out multiple different classifiers and changing their parameters.
References
- Spark ML lib documentation: https://spark.apache.org/docs/latest/ml-guide.html
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/2.3.2/sql-programming-guide.html
- Spark DataFrame/SQL Functions - https://spark.apache.org/docs/2.3.2/api/sql/index.html
- Lecture slides: https://courses.cs.ut.ee/2018/DDPC/fall/uploads/Main/L14_2018.pdf
Dataset Description
The dataset that we will analyze is the Census-Income (KDD) dataset from the UCI machine learning repository. This data was originally extracted from the US census bureau database.
The original goal of this dataset is to use it to predict the income level of persons base don their census record. Incomes have been divided into two binary classes:
- Below $50K (value:
- 50000
) - Above $50K (value:
50000+
)
Description of the dataset
- Name: Census-Income (KDD)
- Location: https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29
- Download the census.tar.gz file
- Documentation: https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.names
- Files:
- Training set: census-income.data
- Testing set: census-income.test
Dataset columns are: | |||
| |||
|
Preparation: Using the Spark ML library
We will continue using Python PyCharm IDE to simplify working with DataFrames in Spark. You will download and run an example Spark Python Machine Learning library script to check that everything is working. This script requires you to also download the Iris dataset from the UCI machine learning library You can use this script as a template in the following exercises.
- Either create a new Python Spark project in PyCharm or reuse your previous project.
- Download the Python Spark ML_lib_example.py file and move it inside your PySpark project.
- NB! This script does not use the main dataset of the lab! Download the Iris dataset file from the UCI machine learning repository: https://archive.ics.uci.edu/ml/datasets/iris
- You will the direct link to the iris.data file from here: https://archive.ics.uci.edu/ml/machine-learning-databases/iris/
- Create a new folder for input files (Does not have to be inside your Spark Python project) and move the dataset files there.
- You will need to install numpy package into your PyCharm project interpreter.
- Go to
File->Settings->Project-> Project Interpreter
- Click the pluss sign at top right.
- Search for numpy package and install it.
- Go to
- Run the ML_lib_example.py Python script to check that everything is working as needed.
- Its argument should be:
input_folder
- You should use the location of the downloaded dataset folder as the
input_folder
- Its argument should be:
- Check that everything works.
- If you get errors check the Potential issues and solutions section at the end of the this page or ask lab supervisor for guidance.
Exercise 14.1. Feature preparation and transformation
In this exercise we will load in the Census-Income dataset files and prepare the feature vector for the Classification task. We will create a feature vector from all dataset columns (except the class column) by converting textual columns into categorical index columns and create a dataset preparation pipeline where we can later add the classification task.
- Download the Census-Income dataset census.tar.gz file.
- It contains census-income.test and census-income.data files.
- Create
dataset_testing
anddataset_training
dataframes by loading files from census-income.test and census-income.data folders as csv files.- Don't forget to use
.option("inferSchema", True)
- The dataset file do not contain the header line, so do not set
.option("header", True)
option when reading the csv files.- Column labels will simply be:
_c0,_c1, ..., _c41
- Column labels will simply be:
- Don't forget to use
- The resulting dataframes contain 42 columns. Last column (
_c41
) is the class column. Rest of the columns are various census attributes. - In the dataset description, it is suggested to drop the column
_24
, so lets do that for both of the dataframes before we continue:dataset_testing = dataset_testing.drop("_c24") dataset_training = dataset_training.drop("_c24")
- Most of the Spark ML methods require a feature column that contains a sparse vector of the features (instead of dynamically selecting X dataframe columns to be features). Lets create the feature column using Spark ML
VectorAssembler
method.- VectorAssembler takes a list of DataFrame column labels (inputCols) and generates a new features column (outputCol).
- The script from the previous exercise contains an example of how to use VectorAssembler.
- To avoid having to create the list of columns manually, we can manipulate the list of all dataframe column names (dataframe.columns), and provide it as an argument to the
VectorAssembler
method- You can fetch the list of DataFrame columns using
cols = dataframe.columns
- Remove the last column from the column list to avoid passing class column to the machine learning methods:
feature_cols = cols[0:-1]
- Lets pass the list of remaining columns to the
VectorAssembler
:assembler = VectorAssembler( inputCols=feature_cols, outputCol="features")
- We can then apply the VectorAssempler on the training dataframe to create a new dataframe with a new features column:
training_set = assembler.transform(dataset_training)
- You can fetch the list of DataFrame columns using
- Execute the script. You WILL receive an error which indicates that feature vector should not contain string values.
- This happens because we passed all DataFrame columns to the VectorAssembler. However, many ML algorithms can not utilize textual columns.
- Lets convert the textual dataframe columns into categorical index columns using Spark ML
StringIndexer
method.StringIndexer
takes a textual column, creates a list of all unique textual values in the column and replaces the original string values with the index of the list.- Example:
indexer = StringIndexer(inputCol="_col1", outputCol="s_col1")
- Example:
- Unfortunately, we need to convert textual columns one column at a time. Lets create a Pipeline that contains a StringIndexer for each column we want to convert and apply it as a single operation.
- To avoid excessive manual work, lets first generate a list of all textual columns of the training dataframe dataset_training.
- Fetch the list of dataframe column labels and types using:
dtypes = dataset_training.dtypes
. The list contains tuples of:(label, data_type)
. For example:("_c37", "string")
- Lets also make sure that we remove the class column (last column) because we don't want to accidentally pass it along to the machine learning methods as a feature:
dtypes = dtypes[0:-1]
- We can use Python list generator to generate a list of string column labels:
string_cols = [c for (c,t) in dtypess if t == "string"]
- This code checks though all tuples in dtypes list, and adds column label to the list, if the column type is string
- Similarly, we can extract the list of all non-string columns:
numerical_cols = [c for (c,t) in dtypes if t != "string"]
- Lets now create a Pipeline, that contains a list of StringIndexer's, one for each textual column:
pipeline = Pipeline(stages=indexers)
- Lets also generate the list of string indexers for the pipeline based on the list of string columns:
indexers = [] for col in string_cols: indexer = StringIndexer(inputCol=col, outputCol="s"+col) indexers.append(indexer) pipeline = Pipeline(stages=indexers)
- This will add new columns to the dataset, one for each textual column, with letter "s" added as a prefix to the column name.
- Add code to build the StringIndexer models:
model_pipeline = pipeline.fit(dataset_training)
- And finally, apply the pipeline on the training dataFrame:
dataset_training_converted = model_pipeline.transform(dataset_training)
to generate a new dataframe. - you can check the content of the transformed dataframe by using
dataset_training_converted.show(10,False)
- Fetch the list of dataframe column labels and types using:
- To avoid excessive manual work, lets first generate a list of all textual columns of the training dataframe dataset_training.
- Now, lets modify the VectorAssembler's inputCols, and use the new converted columns together with all numerical columns.
- Lets first generate the list of new converted column names by adding "s" to each of their names as a prefix:
converted_cols = ["s"+col for col in string_cols]
- And then combine converted_cols and numerical_cols into the list of columns which should be used as feature vector:
assembler = VectorAssembler( inputCols = converted_cols + numerical_cols, outputCol = "features")
- Lets first generate the list of new converted column names by adding "s" to each of their names as a prefix:
- Lets also create a StringIndexer for converting the class column into category index named label:
labelIndexer = StringIndexer(inputCol="_c41", outputCol="label")
- Now lets put everything together into a new Pipeline, which executes all the steps in a sequence.
- NB! You should replace the previous pipeline.
pipeline = Pipeline(stages=indexers+[assembler, labelIndexer]) model = pipeline.fit(dataset_training) predictions = model.transform(dataset_testing) predictions.show(10, False)
- indexers is the list of all the string converting pipeline stages, assembler creates the feature vector, labelIndexser creates the class category column.
- PS! remove the
training_set = assembler.transform(dataset_training)
line. As we are calling the assembler through the pipeline, we no longer need to apply it directly on the training dataframe.
In the next exercise we will add a classification stage to the pipeline.
Exercise 14.2. Classification
In this exercise we will create build a classification model, add it to the pipeline created in the previous exercise and apply the pipeline on the test set to build the model. Initially we will use RandomForestClassifier
, but it will be simple to switch it out for other classifiers in the pipeline.
- Read about the RandomForestClassifier from: https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
- Also check the Python example of configuring RandomForestClassifier on that page
- Create and configure a RandomForestClassifier
classifier
object- Make sure to at least configure the label and features columns:
labelCol="label"
,featuresCol="features"
(These are default labels, but we may later modify them)
- Make sure to at least configure the label and features columns:
- Add the
classifier
to the end of the machine learning pipeline after thelabelIndexer
stage. - Run the modified pipeline, and check that a new "prediction" column was added to the predictions dataframe.
- Lets add a binary classification evaluator which will automatically compute the accuracy of the result.
evaluator = BinaryClassificationEvaluator( labelCol="label", rawPredictionCol="rawPrediction") accuracy = evaluator.evaluate(predictions) print("Test Error = %g " % ( 1.0 - accuracy ) )
- Do not add it to pipeline, leave it as a separate operation after predictions dataframe was created.
- Lets also print out the cross table of classes and predictions:
predictions.crosstab("prediction", "label").show(30, False)
- While the accuracy of the model appears to be good at first glance, we can see that it only classifies the largest class (Below $50K) correctly.
- This is because of the skewed class balance in the dataset.
- After all, if a binary dataset contains 95% of class A and 5% class B, we can easily achieve 95% accuracy if we always predict that a record is class A.
- Lets modify the training dataset to contain balanced number of records from both classes.
- Divide training set DataFrame into two dataFrames based on the class column (_c41) value:
- Below $50K
- Above $50K
- You can use the dataframe filter method (
filtered_df = dataframe.filter()
) to create two dataframes based on the class column (_c41) value. - Count how many records are in Above $50K DataFrame.
- Sample the same amount of records from the Below $50K DataFrame
- You can use the
sampled_df = dataframe.sample(fraction)
to sample a fraction of the DataFrame. - We need to specify fraction because sampling will be performed in parallel: sampling an equal fraction from each partition of the dataFrame.
- When you want to sample
N
elements from the DataFrameD
, you can compute the respective fraction so:1.0*N/D.count()
.
- You can use the
- Union the Above $50K DataFrame with Sampled Below $50K DataFrame to create a new training set which contains the same amount of records from both classes
- Make sure not to modify the dataset_training
- Divide training set DataFrame into two dataFrames based on the class column (_c41) value:
- Modify
RandomForestClassifier
parameters and investigate their effect on the accuracy of the result.- Two main parameters to change are:
numTrees
- How many trees to generatemaxDepth
- How tall trees should be generatedsubsamplingRate
- how many samples to take from the training set for each random tree.
- Two main parameters to change are:
- Replace the
RandomForestClassifier
in the pipeline with at least two other classifiers (DecisionTreeClassifier, NaiveBayes, GBTClassifier for example)- Check what parameters you can change and investigate their effect on the accuracy.
- Which one (if any) improved the accuracy of the result? (Balanced training set)
- Did you have to change their parameters to get better result than with the initial
RandomForestClassifier
?
Exercise 14.3. Feature selection
The number of features that our classification uses is very large. Lets add SparkML ChiSqSelector feature selection method into our ML pipeline, to automatically choose N best features before we build the machine learning model.
- Read about the ChiSqSelector from: https://spark.apache.org/docs/latest/ml-features.html#chisqselector
- Also check the Python example on that page
- Create and configure a ChiSqSelector
selector
object- At least configure the numTopFeatures parameter.
- Add the
selector
to the machine learning pipeline betweenlabelIndexer
andclassifier
. - Run the application, check the effect of multiple different numTopFeatures parameter values on the result and choose the value that gives best accuracy.
- You do not have to check every single possible value of numTopFeatures, but at least investigate a reasonable selection of low, high and medium number of top features.
- Did using ChiSqSelector improve the accuracy of the result? Which parameter values gave the best result? (Balanced training set)
Bonus Exercise
- The bonus task is to implement exercises 14.1, 14.2 and 14.3 in SparkR!
- It is ok to choose just one of the classification algorithm and parameters configuration (final one you used in the lab soution).
- Take the script that you completed by the end of Exercise 14.3 and adapt it into SparkR.
- It is suggested to use RStudio as an R IDE.
- You can find a small guide how to configure SparkR in R scripts here: https://spark.apache.org/docs/latest/sparkr.html#starting-up-from-rstudio
- PS! One thing to note is that in SparkR the automatically generated dataframe column labels will be named differently! .
- It is ok to ask help from the lab supervisor, if you run into any technical issues of using SparkR in RStudio in your computer.
- Deliverables of the bonus task
- R script you created
- Output files or a snapshot of the console output that shows the results of your script.
- Answer: Did you achieve the same accuracy of the result with the same parameters as in the Python version of your script?
Deliverables:
- Spark Python script from Exercise 14.3
- It should contain all steps from 14.1 til 14.3.
- Output of the script after Exercise 14.3
- Output should be provided either as a text file that contains the respective
df.show()
or/andprint()
commands.
- Output should be provided either as a text file that contains the respective
PS! The final result error rate should be something between 10% and 16% after training on the balanced training set. It is possible you can achieve better accuracy. However, if you achieve error rate close to 0% then it probably means you accidentally passed "class" column into the features vector!
14. Practice session 14Potential issues and solutions
- If you get an arrot about maxBins, then you should configur a higher maxBins value (such as , maxBins=64 ) in the classifier you use.
- For example
classifier = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10, maxBins=64 )
- For example
- If you get an error about unseen categories/lablels, it is because StringIndexer was built on only the training dataset, and testing dataset contains labels that were not available when building the StringIndexer model.
- One solution is to add
.setHandleInvalid("skip")
to the StringIndexer() object to ignore previously unseen categories.
- One solution is to add
- If you get an error about column having too many values when using ChiSqSelector, you can drop the column 24 from the dataframes:
dataset_testing = dataset_testing.drop("_c24") dataset_training = dataset_training.drop("_c24")
- You should apply this fix as early as possible, just after reading in the data files.