Users may experience out of memory errors when running large linkage and deduplication jobs with Splink
's default settings.
This notebook demonstrates how Splink
can be configured to reduce the amount of memory needed.
However, before demoing these configuration settings it's important to understand why you might be running out of memory.
⚠️ Increasing the size of your Spark cluster or changing splink
configuration options other than blocking rules is unlikely to solve this problem.
Imagine you are running a deduplication job on a large input dataset called df
, and one of your blocking rules is:
"l.first_name = r.first_name"
Let's assume there are 10,000 people in your dataset with the first name 'John'.
splink
will perform a join that looks a bit like this:
select *
from df as l
inner join df as r
on l.first_name = r.first_name
This will result in 10,000 * 10,000 = 100 million rows being generated for Johns on a single node of your spark cluster. This happens because when Spark performs a join, it hash partitions on the join key. This means all records for each first_name go to the same node.
⚠️ Increasing the size of your Spark cluster will sometimes mitigate these issues but there is a more efficient solution
This is often because Spark's default parallelism of 200 shuffle partitions is not really appropriate to the task of generating comparisons from blocking rules.
When generating record comparisons from your blocking rules, your input dataset will be split into 200 parts (jobs).
This is generally insufficient for large jobs because, whilst 200 parts may be sufficient for your input dataset, the dataset of comparisons generated by your blocking rules is typically 10 times as large more.
Furthermore, blocking rules can often result in skew - e.g. the number of comparisons generated by 'John's is larger than those generated by 'Robin's - meaning that if a few 'John's happen to end up on the same node, it can run out of memory.
Try increasing the spark's parallelism as follows (start with 1000, but try different values)
spark.conf.set("spark.sql.shuffle.partitions", "1000")
spark.conf.set("spark.default.parallelism", "1000")
⚠️ Increasing the size of your Spark cluster will sometimes mitigate these issues but there is a more efficient solution
By default, splink
attempts to compute its results as a single spark job. However, there is a known problem in Spark of long lineage, and iterative algorithms like the one used by splink
are often cited as the culprit.
Breaking the lineage means splitting the job up into smaller parts which can be computed independently. This can result in somewhat longer execution times (perhaps 30% longer), but often allows the same job to successfully complete on a smaller cluster.
splink
has some configuration options which allow you to break the lineage. This must be configured by the user because the solution will depend on the specifics of your spark cluster and/or cloud provider.
In the remainder of this notebook we give a simple example of breaking the lineage, using the same job as in the data deduplication quick start
If you've tried all of the above and are still getting out of memory errors, you may need a bigger cluster, or higher memory nodes.
As a guide, we've successfully processed 15 million input records and 170 million comparisons using AWS Glue with a cluster of size 5.
The following is just boilerplate code that sets up the Spark session and sets some other non-essential configuration options
import pandas as pd
pd.options.display.max_columns = 500
pd.options.display.max_rows = 100
import logging
logging.basicConfig()
logging.getLogger("splink").setLevel(logging.INFO)
from utility_functions.demo_utils import get_spark
spark = get_spark() # See utility_functions/demo_utils.py for how to set up Spark
In the underlying splink
algorithm, there are two sensible places to break the lineage:
splink
allows the user to provide two custom functions that are executed in these two places to break the lineage.
They both take as an argument a dataframe, and require the user to break the lineage. Usually this is done by:
persist
ing, checkpoint
ing or cache
ing the dataframe (although in our experience, this has been unreliable)def blocked_comparisons_to_disk(df, spark):
df.write.mode("overwrite").parquet("gammas_temp/")
df_new = spark.read.parquet("gammas_temp/")
return df_new
def scored_comparisons_to_disk(df, spark):
df.write.mode("overwrite").parquet("df_e_temp/")
df_new = spark.read.parquet("df_e_temp/")
return df_new
splink
job, providing the custom functions as arguments¶from splink import Splink
df = spark.read.parquet("data/fake_1000.parquet")
settings = {
"link_type": "dedupe_only",
"blocking_rules": [
"l.first_name = r.first_name",
"l.surname = r.surname",
"l.dob = r.dob"
],
"comparison_columns": [
{
"col_name": "first_name",
"num_levels": 3,
"term_frequency_adjustments": True
},
{
"col_name": "surname",
"num_levels": 3,
"term_frequency_adjustments": True
},
{
"col_name": "dob"
},
{
"col_name": "city"
},
{
"col_name": "email"
}
],
"additional_columns_to_retain": ["group"],
"em_convergence": 0.01
}
from splink import Splink
linker = Splink(settings,
df,
spark,
break_lineage_blocked_comparisons = blocked_comparisons_to_disk,
break_lineage_scored_comparisons = scored_comparisons_to_disk
)
df_e = linker.get_scored_comparisons()
INFO:splink.iterate:Iteration 0 complete INFO:splink.model:The maximum change in parameters was 0.49493410587310793 for key surname, level 2 INFO:splink.iterate:Iteration 1 complete INFO:splink.model:The maximum change in parameters was 0.12059885263442993 for key surname, level 2 INFO:splink.iterate:Iteration 2 complete INFO:splink.model:The maximum change in parameters was 0.039048194885253906 for key surname, level 0 INFO:splink.iterate:Iteration 3 complete INFO:splink.model:The maximum change in parameters was 0.017310582101345062 for key dob, level 1 INFO:splink.iterate:Iteration 4 complete INFO:splink.model:The maximum change in parameters was 0.014871925115585327 for key email, level 0 INFO:splink.iterate:Iteration 5 complete INFO:splink.model:The maximum change in parameters was 0.011928170919418335 for key email, level 0 INFO:splink.iterate:Iteration 6 complete INFO:splink.model:The maximum change in parameters was 0.009333670139312744 for key email, level 1 INFO:splink.iterate:EM algorithm has converged
# Inspect main dataframe that contains the match scores
cols_to_inspect = ["match_probability","unique_id_l","unique_id_r","group_l", "group_r", "first_name_l","first_name_r","surname_l","surname_r","dob_l","dob_r","city_l","city_r","email_l","email_r",]
df_e.toPandas()[cols_to_inspect].sort_values(["unique_id_l", "unique_id_r"]).head(5)
match_probability | unique_id_l | unique_id_r | group_l | group_r | first_name_l | first_name_r | surname_l | surname_r | dob_l | dob_r | city_l | city_r | email_l | email_r | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2 | 0.955579 | 0 | 1 | 0 | 0 | Julia | Julia | None | Taylor | 2015-10-29 | 2015-07-31 | London | London | hannah88@powers.com | hannah88@powers.com |
1 | 0.955579 | 0 | 2 | 0 | 0 | Julia | Julia | None | Taylor | 2015-10-29 | 2016-01-27 | London | London | hannah88@powers.com | hannah88@powers.com |
0 | 0.771380 | 0 | 3 | 0 | 0 | Julia | Julia | None | Taylor | 2015-10-29 | 2015-10-29 | London | None | hannah88@powers.com | hannah88opowersc@m |
4 | 0.939962 | 1 | 2 | 0 | 0 | Julia | Julia | Taylor | Taylor | 2015-07-31 | 2016-01-27 | London | London | hannah88@powers.com | hannah88@powers.com |
3 | 0.022550 | 1 | 3 | 0 | 0 | Julia | Julia | Taylor | Taylor | 2015-07-31 | 2015-10-29 | London | None | hannah88@powers.com | hannah88opowersc@m |