Graph processing with Spark GraphFrames
We will take a look at how to process graph data using Spark GraphFrames and will continue using the Yelp dataset.
A new small sample of the Yelp dataset has been prepared for developing and testing graph processing applications in the lab. It consists of businesses from 3 cities (Fairview, Burton, Bridgeville), their YELP reviews and the users who reviewed them. After we build a friendship and review graph from this smaller sample, it will contain 2935 vertices and 130985 edges.
NB! You must download the new Yelp dataset sample, as the sample that we have used in previous labs is not suitable for generating reasonable graphs!
References
GraphFrames
- Spark GraphFrame Programming guide: https://graphframes.github.io/user-guide.html
- Spark Lecture slides: https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L12_2018.pdf
- Spark GraphFrame API: https://graphframes.github.io/api/python/graphframes.html#graphframes.GraphFrame
DataFrames
- Spark DataFrame/SQL programming guide: https://spark.apache.org/docs/2.3.2/sql-programming-guide.html
- Spark DataFrame/SQL Functions: https://spark.apache.org/docs/2.3.2/api/sql/index.html
Dataset Description
The dataset that we will analyze is the Yelp Dataset. It contains Yelp businesses, reviews, and user data for use in personal, educational, and academic purposes. It is freely available to for teaching about databases and NLP. The dataset consists of 6 JSON files: Businesses, reviews, users, checkins, tips and photos.
NB! Using this dataset requires that you read and agree with the Dataset Challenge Dataset Agreement .
- Name: Yelp Dataset
- Location: https://www.yelp.com/dataset
- Download link for the new smaller sample of the dataset for testing in the lab: https://owncloud.ut.ee/owncloud/index.php/s/g87NS2QXssHTPtt (Ask for password from the lab supervisor)
- Documentation: https://www.yelp.com/dataset/documentation/main
- Location of the large dataset in the cluster: /tmp/yelp
Dataset tables that we use in this lab are: | ||
Review
|
User
|
Business
|
Join keys are marked red. As we are dealing with JSON files, order of the attributes may differ from the list here. Some attributes might in turn contain nested structures. Check the dataset documentation or Spark's df.printSchema()
command output for the complete list of (nested) attributes.
Exercise 12.1. Using Graphframe extension in PyCharm IDE
We will continue using Python PyCharm IDE to simplify working with GraphFrames and DataFrames in Spark. You will download and run an example Spark Python GraphFrame script and verify that you can successfully run GraphFrame applications in your computer. You can use this script as a template in the following exercises.
NB! You must download the new Yelp dataset sample, as the sample that we have used in previous labs is not suitable for generating reasonable graphs!
- Either create a new Python Spark project in PyCharm (just like in the previous lab) or reuse your previous project.
- Download the GraphFrames source code zip file from GraphFrames GitHub releases page: https://github.com/graphframes/graphframes/releases
- Unpack the downloaded zip file.
- Add the
python
folder that is located inside the unpacked folder to your PyCharm Python interpreter paths, just like we did with Spark pyspark.zip and py4j-0.10.7-src.zip (Except you need to add the whole python folder instead of zip files) in Exercise 8.2. Configuring PyCharm for Spark Python
- Download the Python Spark DataFrame graphframe_example.py file and move it inside your PySpark project.
- Download a new smaller sample of the dataset for development and testing in the lab from here: https://owncloud.ut.ee/owncloud/index.php/s/g87NS2QXssHTPtt (Ask for password from the lab supervisor)
- Create a new folder for input files (Does not have to be inside your Spark Python project) and move the Yelp dataset JSON files there.
- Run the dataframe_example.py Python script to check that everything is working as needed.
- Its argument should be:
input_folder
- You should use the location of the downloaded new yelp dataset sample folder as the
input_folder
- Its argument should be:
- Check that everything works.
- If you get errors check the Potential issues and solutions section at the end of the this page or ask lab supervisor for guidance.
Exercise 12.2. Using GraphFrame API
Your task in this exercise will be to manipulate the GraphFrame from the example script by adding additional Edge and Vertex attributes to it and to run a BFS query on the modified GraphFrame.
- Add additional attributes to both Vertices and Edges
- Change the example GraphFrame script to modify what attributes are assigned to Vertex and Edge DataFrames. Do the following changes:
- Add review_count attribute to Vertices
- Both the source DataFrames (user and business) already have that column so you can simply select it into the Vertex DataFrames.
- Add useful attribute to Edges.
- Select the useful column from review DataFrame for reviewed type Edges.
- You can leave it as empty string
""
for friends type edges for now
- Write a Breath First Search query (
g.bfs(source_filter, destination_filter, edge_filter)
) to find shortest paths from users named Eva to businesses that have at least 10 reviews through reviewed Edge that gave 5 stars.- You need to add a edge filter to be able to filter by edge attributes
- Be careful, edge filter is applied to all edges in the path. You do not want to filter out friendship edges (which have no stars value). Check the lecture slides for an example of edge filter.
Exercise 12.3. Using Motif finding
One of the most interesting methods for graph analytics in Apache Spark is Motif Finding that allows users to use labelled patterns to search for set of edges with user specified structure and inter-dependencies using the g.find(pattern)
operation.
Spark GraphFrames returns all found sets of edges that match the user specified pattern as a single DataFrame and they can be manipulated using DataFrame operations (such as filter, group, limit and aggregations).
Familiarize yourself with Motif Finding functionality from the Spark DataFrame programming guide here: https://graphframes.github.io/user-guide.html#motif-finding
Use motif finding operation results = g.find(pattern)
to:
- Find how many users have given 5 star review to businesses from two different cities.
- Write a pattern that contains two edges, user has reviewed business1 and user has reviewed business2
- User reviewed a business example:
(u)-[r1]->(b1)
- User reviewed a business example:
- Filter the returned edge sets (using
results.filter()
) to specify that businesses (b1 and b2) should be from different cities and both of the review edges should be 5 star reviews.- Filtering by review stars example:
r1.stars = 5
- Filtering by cities being different:
b1.city != b2.city
- Filtering by review stars example:
- Write a pattern that contains two edges, user has reviewed business1 and user has reviewed business2
- Find top 5 businesses which have received the most conflicted reviews from pairs of friends.
- Definition of Conflicting reviews from a pair of friends: Two users who are friends gave the opposite review score to the same business:
- Friend A: 2 star or less
- Friend B: 4 star or more
- Write a pattern for finding all possible edge sets where:
- Vertices u1 and u2 (users) and connected through two different (review) Edges to the same Vertex b (company).
- u1 and u2 are connected through an (friend) Edge.
- NB! You will not be able to specify Vertex types (user or company) and Edge relationship types (reviewed or friend) before the next filtering step!
- Filter the results by:
- b is of type company
- Two review scores must be conflicting
- Edge between users u1 and u2 should be of type friend
- Group the found paths by the Business vertex, compute count, order by count in descending manner and limit results to 5.
- Definition of Conflicting reviews from a pair of friends: Two users who are friends gave the opposite review score to the same business:
- Find top 5 pairs of users who are not friends, but have given out most reviews with exactly the same number of stars to the same businesses.
- Write a pattern and a filter for finding all such possible edge sets.
- Check Motif finding guide how to specify edges that should not exist in the pattern.
- Group the found paths by the user pair (user1, user2), compute count, order by count in descending manner and limit results to 5.
- Write a pattern and a filter for finding all such possible edge sets.
Bonus Exercise
Bonus task is to take the single source Shortest Path example from the lecture slides (https://courses.cs.ut.ee/LTAT.06.005/2018_fall/uploads/Main/L12_2018.pdf) which was implemented using aggregateMessages()
and modify it in two ways. The modifications that you need to do are similar to the bonus task from Practice 5 - Graph Processing in MapReduce
- Modify the application to send/generate messages only after the distance of the source vertex has changed. You will probably need to add additional state column to the Vertices to keep track of whether distance has been updated from previous iteration. The goal is to reduce the amount of work the program needs to perform and also the amount of messages.
- Currently, the example considers all graph edges to be the same length (1 step).
- Add positive weights to each edge and modify the application to take edge weights into account when finding shortest paths.
- You can choose yourself what should be the weight of edges (Transfer cost from source Vertex to destination Vertex though the specific Edge).
- For reviewed type edges, weight could be reverse of Stars value:
weight = 6 - stars
. (Be careful not to assign weight to friendship vertices with the same formula) - For friendship type edges, weight could be some kind of similarity (or distance) measure between two user attributes. For example:
log2(abs(VertexSrc.review_count-VertexDst.review_count))
. - PS! In some cases, you may find that it is easier to avoid computing weights ahead of time for each Edge and instead use the source and destination Vertex attributes to compute the transmission weight/cost when generating messages for the
aggregateMessages()
operation. That is also allowed.
- Add positive weights to each edge and modify the application to take edge weights into account when finding shortest paths.
- Deliverables of the bonus task
- Spark Python GraphFrame script(s)
- Output of the application. Either as json/csv files or as a text file that contains df.show() command(s) output.
- General (high-level) description of the modifications that needed to be done to the example from the lecture slides.
Deliverables:
- Spark Python scripts from Exercises 12.2, 12.3 (Can be the same script)
- Output of the scripts from Exercises 12.2, 12.3
- The output should contain the resulting DataFrame content.
- Output should be provided either as:
- JSON or CSV output file(s)
- Or the respective
df.show()
command's output in a text file.
Potential issues and solutions
- When using Spark GraphFrames in Windows, you may get an error about Spark not being able to delete temporary job folder. It will not cause your application to fail. It simply means that the job will leave behind folders that should be deleted later. Check the content of the error messages for where these folders are located.
- If you get an error about slf4j library when you first try to run a Spark GraphFrame application:
- Easiest way to solve the issue is to clear the Maven (.m2/repository folder inside your home directory) and Ivy (.ivy2/cache .ivy/jars folders inside your home directory) repositories of the conflicting slf4j libraries and their folders.
- At worst case, deleting
.ivy2
and.m2
folders should solve the issue. However, this means that all Maven and Ivy dependencies need to be downloaded again!
- If you get an error about not enough memory or out of memory, you can increase the memory by using:
- In the local machine:
spark = SparkSession \ .builder \ .appName(appName) \ .config("spark.driver.memory", "2g") \ .getOrCreate()
- In the cluster:
spark = SparkSession \ .builder \ .appName(appName) \ .config("spark.driver.memory", "2g") \ .config("spark.executor.memory", "2g") \ .getOrCreate()
- In the local machine:
- The system cannot find the path specified when executing your Spark application the first time.
- Check that your SPARK_HOME environment variable value is correct
- You can also set SPARK_HOME location programmatically inside your script in the following way:
os.environ['SPARK_HOME'] = "/home/pelle/soft/spark-2.3.2"
- It is suggested to use Java 8 as the default Java in your computer.
- NB! Java 9 and 10 will not work without issues!
- Be careful with the code indentation in Python script. Python is very strict when it comes to mixing tabs and spaces.
- Check indentation manual for Python: https://docs.python.org/2.0/ref/indentation.html
- Python in worker has different version 2.7 than that in driver 3.6
- Check that the default Python version in your computer is the same version you use inside your PyCharm project.