from splink.spark.jar_location import similarity_jar_location
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types
conf = SparkConf()
# This parallelism setting is only suitable for a small toy example
conf.set("spark.driver.memory", "12g")
conf.set("spark.default.parallelism", "16")
# Add custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
path = similarity_jar_location()
conf.set("spark.jars", path)
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
spark.sparkContext.setCheckpointDir("./tmp_checkpoints")
# Register the jaro winkler custom udf
spark.udf.registerJavaFunction(
"jaro_winkler", "uk.gov.moj.dash.linkage.JaroWinklerSimilarity", types.DoubleType()
)
22/09/19 14:39:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
import pandas as pd
df = spark.read.csv("./data/fake_1000.csv", header=True)
import splink.spark.spark_comparison_library as cl
settings = {
"link_type": "dedupe_only",
"comparisons": [
cl.jaro_winkler_at_thresholds("first_name", 0.8),
cl.jaro_winkler_at_thresholds("surname", 0.8),
cl.levenshtein_at_thresholds("dob"),
cl.exact_match("city", term_frequency_adjustments=True),
cl.levenshtein_at_thresholds("email"),
],
"blocking_rules_to_generate_predictions": [
"l.first_name = r.first_name",
"l.surname = r.surname",
],
"retain_matching_columns": True,
"retain_intermediate_calculation_columns": True,
"em_convergence": 0.01
}
from splink.spark.spark_linker import SparkLinker
linker = SparkLinker(df, settings)
deterministic_rules = [
"l.first_name = r.first_name and levenshtein(r.dob, l.dob) <= 1",
"l.surname = r.surname and levenshtein(r.dob, l.dob) <= 1",
"l.first_name = r.first_name and levenshtein(r.surname, l.surname) <= 2",
"l.email = r.email"
]
linker.estimate_probability_two_random_records_match(deterministic_rules, recall=0.6)
/Users/robinlinacre/Documents/data_linking/splink_demos/venv/lib/python3.8/site-packages/pyspark/sql/dataframe.py:148: UserWarning: DataFrame.sql_ctx is an internal property, and will be removed in future releases. Use DataFrame.sparkSession instead. warnings.warn( Probability two random records match is estimated to be 0.00389. This means that amongst all possible pairwise record comparisons, one in 257.25 are expected to match. With 499,500 total possible comparisons, we expect a total of around 1,941.67 matching pairs
linker.estimate_u_using_random_sampling(target_rows=5e5)
----- Estimating u probabilities using random sampling -----
22/09/19 14:39:14 WARN DataSource: All paths were ignored: file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_concat_with_tf
Estimated u probabilities using random sampling Your model is not yet fully trained. Missing estimates for: - first_name (no m values are trained). - surname (no m values are trained). - dob (no m values are trained). - city (no m values are trained). - email (no m values are trained).
training_blocking_rule = "l.first_name = r.first_name and l.surname = r.surname"
training_session_fname_sname = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)
training_blocking_rule = "l.dob = r.dob"
training_session_dob = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule)
----- Starting EM training session ----- Estimating the m probabilities of the model by blocking on: l.first_name = r.first_name and l.surname = r.surname Parameter estimates will be made for the following comparison(s): - dob - city - email Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: - first_name - surname
22/09/19 14:39:20 WARN DataSource: All paths were ignored: file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_77ebe74
Iteration 1: Largest change in params was -0.53 in the m_probability of dob, level `Exact match` Iteration 2: Largest change in params was 0.0335 in probability_two_random_records_match Iteration 3: Largest change in params was 0.0129 in probability_two_random_records_match Iteration 4: Largest change in params was 0.00639 in probability_two_random_records_match EM converged after 4 iterations Your model is not yet fully trained. Missing estimates for: - first_name (no m values are trained). - surname (no m values are trained). ----- Starting EM training session ----- Estimating the m probabilities of the model by blocking on: l.dob = r.dob Parameter estimates will be made for the following comparison(s): - first_name - surname - city - email Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: - dob
22/09/19 14:39:26 WARN DataSource: All paths were ignored: file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_comparison_vectors_2bd95d3
Iteration 1: Largest change in params was -0.413 in the m_probability of surname, level `Exact match` Iteration 2: Largest change in params was 0.108 in probability_two_random_records_match Iteration 3: Largest change in params was 0.0348 in probability_two_random_records_match Iteration 4: Largest change in params was 0.0133 in probability_two_random_records_match Iteration 5: Largest change in params was 0.00593 in probability_two_random_records_match EM converged after 5 iterations Your model is fully trained. All comparisons have at least one estimate for their m and u values
results = linker.predict(threshold_match_probability=0.9)
22/09/19 14:39:32 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. 22/09/19 14:39:32 WARN DataSource: All paths were ignored: file:/Users/robinlinacre/Documents/data_linking/splink_demos/tmp_checkpoints/06909547-db7a-49ae-a2d5-e913b503cb7c/__splink__df_predict_99e9cb4
results.as_pandas_dataframe(limit=5)
match_weight | match_probability | unique_id_l | unique_id_r | first_name_l | first_name_r | gamma_first_name | bf_first_name | surname_l | surname_r | ... | gamma_city | tf_city_l | tf_city_r | bf_city | bf_tf_adj_city | email_l | email_r | gamma_email | bf_email | match_key | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 3.371739 | 0.911904 | 220 | 223 | Logan | Logan | 2 | 86.748396 | serguFon | Ferguson | ... | 1 | 0.212792 | 0.212792 | 10.316002 | 0.259162 | l.feruson46@sahh.com | None | -1 | 1.000000 | 0 |
1 | 14.743406 | 0.999964 | 879 | 880 | Leo | Leo | 2 | 86.748396 | Webster | Webster | ... | 1 | 0.008610 | 0.008610 | 10.316002 | 6.404996 | leo.webster54@moore.biez | None | -1 | 1.000000 | 0 |
2 | 13.140266 | 0.999889 | 446 | 450 | Aisha | Aisha | 2 | 86.748396 | Bryant | None | ... | 0 | 0.011070 | 0.001230 | 0.456259 | 1.000000 | aishab64@obrien-flores.com | aishab64@obrien-flores.com | 3 | 257.458944 | 0 |
3 | 8.829126 | 0.997806 | 446 | 448 | Aisha | Aisha | 2 | 86.748396 | Bryant | BryBant | ... | 0 | 0.011070 | 0.001230 | 0.456259 | 1.000000 | aishab64@obrien-flores.com | aishab64@obrien-flores.com | 3 | 257.458944 | 0 |
4 | 6.584844 | 0.989690 | 790 | 791 | Jackson | Jackson | 2 | 86.748396 | Fisreh | Fishier | ... | 0 | 0.009840 | 0.001230 | 0.456259 | 1.000000 | j.fisher4@sullivan.com | None | -1 | 1.000000 | 0 |
5 rows × 28 columns