import pandas as pd
import wmfdata as wmf
from datetime import datetime, timedelta
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from IPython.display import clear_output
pd.options.display.max_columns = None
pd.options.display.max_rows = 250
spark_session = wmf.spark.get_active_session()
if type(spark_session) == type(None):
spark_session = wmf.spark.create_custom_session(
master="yarn",
app_name='cx-deletion-rate-factors',
spark_config={
"spark.driver.memory": "24g",
"spark.driver.memoryOverhead": "4g",
"spark.dynamicAllocation.maxExecutors": 90,
"spark.executor.memory": "12g",
"spark.executor.memoryOverhead": "8g",
"spark.sql.shuffle.partitions": 128,
"spark.driver.maxResultSize": "16g",
"spark.executor.cores": 16
}
)
spark_session.sparkContext.setLogLevel("ERROR")
clear_output()
spark_session
SparkSession - hive
mwh_snapshot = '2024-01'
end_dt = datetime.strptime('2023-12-31', '%Y-%m-%d').date()
start_dt = end_dt - timedelta(365*2)
def cr_spark_view(df, tbl_name, session=spark_session):
# replace attribute to avoid the following error:
# https://stackoverflow.com/questions/76404811/attributeerror-dataframe-object-has-no-attribute-iteritems
df.iteritems = df.items
# create temporary view within the spark session
df_spark = spark_session.createDataFrame(df)
df_spark.createOrReplaceTempView(tbl_name)
%%time
cx_query = f"""
SELECT
DISTINCT translation_id AS cx_id,
CONCAT(translation_source_language, 'wiki') AS source_db,
CONCAT(translation_target_language, 'wiki') AS target_db,
translation_source_revision_id AS source_rev_id,
translation_target_revision_id AS target_rev_id,
translation_start_timestamp AS cx_start_ts,
translation_last_updated_timestamp AS cx_update_ts,
translation_last_updated_timestamp - translation_start_timestamp AS cx_duration_secs,
CAST(JSON_EXTRACT(translation_progress, '$.mt') AS FLOAT) AS mt_pct,
CAST(JSON_EXTRACT(translation_progress, '$.human') AS FLOAT) AS human_pct
FROM
cx_translations
JOIN
cx_corpora AS cc
ON translation_id = cc.cxc_translation_id
WHERE
translation_status = 'published'
AND NOT translation_last_updated_timestamp - translation_start_timestamp = 0
AND NOT translation_source_revision_id = 0
AND NOT translation_target_revision_id = 0
AND DATE(translation_start_timestamp) >= DATE('{start_dt}')
AND DATE(translation_last_updated_timestamp) <= DATE('{end_dt}')
"""
cx_data = wmf.mariadb.run(cx_query, dbs='wikishared')
cx_data.info()
/home/kcvelaga/.conda/envs/2024-03-06T11.09.55_kcvelaga/lib/python3.10/site-packages/wmfdata/mariadb.py:142: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy. result = pd.read_sql_query(
<class 'pandas.core.frame.DataFrame'> RangeIndex: 630597 entries, 0 to 630596 Data columns (total 10 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 cx_id 630597 non-null int64 1 source_db 630597 non-null object 2 target_db 630597 non-null object 3 source_rev_id 630597 non-null int64 4 target_rev_id 630597 non-null int64 5 cx_start_ts 630597 non-null object 6 cx_update_ts 630597 non-null object 7 cx_duration_secs 630597 non-null float64 8 mt_pct 630597 non-null float64 9 human_pct 630597 non-null float64 dtypes: float64(3), int64(3), object(4) memory usage: 48.1+ MB CPU times: user 4.04 s, sys: 1.08 s, total: 5.11 s Wall time: 17.3 s
%%time
cr_spark_view(cx_data, 'cx_data_base')
CPU times: user 35.6 s, sys: 417 ms, total: 36 s Wall time: 38.2 s
%%time
cx_data_ext_query = f"""
SELECT
cx.*,
page_id AS source_page_id,
mwh.event_timestamp AS source_rev_ts,
DATE(mwh.event_timestamp) AS source_rev_dt
FROM
cx_data_base cx
JOIN
wmf.mediawiki_history mwh
ON cx.source_db = mwh.wiki_db
AND cx.source_rev_id = mwh.revision_id
WHERE
mwh.snapshot = '{mwh_snapshot}'
"""
cx_data_ext = wmf.spark.run(cx_data_ext_query)
cx_data_ext.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 629359 entries, 0 to 629358 Data columns (total 13 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 cx_id 629359 non-null int64 1 source_db 629359 non-null object 2 target_db 629359 non-null object 3 source_rev_id 629359 non-null int64 4 target_rev_id 629359 non-null int64 5 cx_start_ts 629359 non-null object 6 cx_update_ts 629359 non-null object 7 cx_duration_secs 629359 non-null float64 8 mt_pct 629359 non-null float64 9 human_pct 629359 non-null float64 10 source_page_id 629359 non-null int64 11 source_rev_ts 629359 non-null object 12 source_rev_dt 629359 non-null object dtypes: float64(3), int64(4), object(6) memory usage: 62.4+ MB CPU times: user 7.54 s, sys: 808 ms, total: 8.35 s Wall time: 7min 2s
%%time
cr_spark_view(cx_data_ext, 'cx_data')
CPU times: user 45.4 s, sys: 429 ms, total: 45.8 s Wall time: 46 s
from mediawiki_history
from mediawiki_wikitext_history
from content gap metrics
# spark UDF to check user rights level
def classify_user_groups(groups):
ipb_exempt = 'ipblock-exempt'
confirmed_rights = set(['autoconfirmed', 'confirmed', 'autoreviewed', 'autoeditor'])
other_groups = confirmed_rights.union({ipb_exempt})
if len(groups) == 1 and ipb_exempt in groups:
return 'none'
elif set(groups).issubset(other_groups):
return 'confirmed'
else:
return 'extended'
classify_user_groups_udf = udf(classify_user_groups, StringType())
spark_session.udf.register("classify_user_groups", classify_user_groups_udf)
<function __main__.classify_user_groups(groups)>
%%time
query = f"""
WITH
base AS (
SELECT
cx.cx_id,
source_db,
source_rev_dt,
source_rev_id,
source_page_id,
gap.is_human AS is_source_human,
mwt.revision_text_bytes AS source_bytes,
mwh.wiki_db AS target_db,
event_timestamp AS target_rev_ts,
DATE(event_timestamp) AS target_rev_dt,
mwh.page_id AS target_page_id,
mwh.revision_id AS target_rev_id,
mwh.revision_text_bytes AS target_bytes,
ARRAY_CONTAINS(revision_tags, 'mobile edit') AS is_mobile_edit,
cx_start_ts,
cx_update_ts,
cx_duration_secs,
mt_pct,
human_pct,
event_user_text AS user_name,
CASE
WHEN event_user_seconds_since_previous_revision IS NULL AND event_user_revision_count = 1 THEN TRUE
ELSE FALSE
END AS is_first_edit,
CASE
WHEN event_user_revision_count > 0 AND event_user_revision_count <= 10 THEN '1-10'
WHEN event_user_revision_count > 10 AND event_user_revision_count <= 99 THEN '11-99'
WHEN event_user_revision_count > 99 AND event_user_revision_count <= 999 THEN '100-999'
WHEN event_user_revision_count > 999 AND event_user_revision_count <= 4999 THEN '1000-4999'
WHEN event_user_revision_count > 4999 THEN '5000+'
ELSE NULL
END AS user_edit_bucket,
CLASSIFY_USER_GROUPS(event_user_groups) AS user_rights_level,
event_user_seconds_since_previous_revision AS time_since_prev_edit,
CASE
WHEN revision_is_deleted_by_page_deletion THEN TRUE
ELSE FALSE
END AS is_page_deleted
FROM
wmf.mediawiki_history mwh
JOIN
canonical_data.wikis
ON database_code = wiki_db
JOIN
cx_data cx
ON mwh.wiki_db = cx.target_db
AND mwh.revision_id = cx.target_rev_id
LEFT JOIN
content_gap_metrics.content_gap_features gap
ON cx.source_db = gap.wiki_db
AND cx.source_page_id = gap.page_id
JOIN
wmf.mediawiki_wikitext_history mwt
ON cx.source_db = mwt.wiki_db
AND cx.source_rev_id = mwt.revision_id
WHERE
mwh.snapshot = '{mwh_snapshot}'
AND mwt.snapshot = '{mwh_snapshot}'
AND database_group = 'wikipedia'
AND DATE(event_timestamp) > DATE('{start_dt}')
AND DATE(event_timestamp) <= DATE('{end_dt}')
AND event_entity = 'revision'
AND event_type = 'create'
AND page_namespace_is_content
AND NOT mwh.event_user_is_anonymous
AND mwh.revision_parent_id = 0
AND SIZE(event_user_is_bot_by) = 0
AND (
ARRAY_CONTAINS(revision_tags, 'contenttranslation')
OR ARRAY_CONTAINS(revision_tags, 'contenttranslation-v2')
)
AND status = 'open'
AND visibility = 'public'
AND editability = 'public'
AND SIZE(event_user_blocks_historical) = 0
),
quality_info_source AS (
SELECT
base.source_db,
base.source_page_id,
gap.quality_score AS source_quality,
gap.standard_quality AS is_source_std_quality,
ROW_NUMBER() OVER (
PARTITION BY gap.wiki_db, gap.page_id
ORDER BY time_bucket DESC) AS rn
FROM
base
JOIN
content_gap_metrics.metric_features gap
ON base.source_db = gap.wiki_db
AND base.source_page_id = gap.page_id
WHERE
TO_DATE(time_bucket || '-01', 'yyyy-MM-dd') <= DATE(source_rev_dt)
),
source_quality_at_rev AS (
SELECT
*
FROM
quality_info_source
WHERE
rn = 1),
quality_info_target AS (
SELECT
base.target_db,
base.target_page_id,
gap.quality_score AS target_quality,
gap.standard_quality AS is_target_std_quality,
ROW_NUMBER() OVER (
PARTITION BY gap.wiki_db, gap.page_id
ORDER BY time_bucket DESC) AS rn
FROM
base
JOIN
content_gap_metrics.metric_features gap
ON base.target_db = gap.wiki_db
AND base.target_page_id = gap.page_id
WHERE
TO_DATE(time_bucket || '-01', 'yyyy-MM-dd') <= DATE(target_rev_dt)
),
target_quality_at_rev AS (
SELECT
*
FROM
quality_info_target
WHERE
rn = 1)
SELECT
a.*,
source_quality,
is_source_std_quality,
target_quality,
is_target_std_quality,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 1 HOUR AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_1hr,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 6 HOURS AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_6hr,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 24 HOURS AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_24hr,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 72 HOURS AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_72hr,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 7 DAYS AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_7days,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 15 DAYS AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_15days,
SUM(CASE WHEN b.target_rev_ts BETWEEN a.target_rev_ts - INTERVAL 30 DAYS AND a.target_rev_ts THEN 1 ELSE 0 END) AS creations_30days
FROM
base a
JOIN
base b ON a.target_db = b.target_db
AND a.user_name = b.user_name
LEFT JOIN
source_quality_at_rev sq
ON a.source_db = sq.source_db AND a.source_page_id = sq.source_page_id
LEFT JOIN
target_quality_at_rev tq
ON a.target_db = tq.target_db AND a.target_page_id = tq.target_page_id
GROUP BY
a.cx_id,
a.source_db,
a.source_rev_id,
a.source_page_id,
a.target_page_id,
a.source_rev_dt,
a.target_rev_dt,
a.target_db,
a.target_rev_id,
a.cx_start_ts,
a.cx_update_ts,
a.cx_duration_secs,
a.mt_pct,
a.human_pct,
a.target_rev_ts,
a.user_name,
a.user_edit_bucket,
a.user_rights_level,
a.time_since_prev_edit,
a.is_first_edit,
a.is_page_deleted,
a.target_bytes,
a.source_bytes,
a.is_source_human,
source_quality,
is_source_std_quality,
target_quality,
is_target_std_quality,
a.is_mobile_edit
"""
output = wmf.spark.run(query)
output.info()
24/04/07 13:03:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. ]]]]]]3]]]]]]]]
<class 'pandas.core.frame.DataFrame'> RangeIndex: 549145 entries, 0 to 549144 Data columns (total 36 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 cx_id 549145 non-null int64 1 source_db 549145 non-null object 2 source_rev_dt 549145 non-null object 3 source_rev_id 549145 non-null int64 4 source_page_id 549145 non-null int64 5 is_source_human 545741 non-null object 6 source_bytes 549145 non-null int64 7 target_db 549145 non-null object 8 target_rev_ts 549145 non-null object 9 target_rev_dt 549145 non-null object 10 target_page_id 549145 non-null int64 11 target_rev_id 549145 non-null int64 12 target_bytes 549110 non-null float64 13 is_mobile_edit 549145 non-null bool 14 cx_start_ts 549145 non-null object 15 cx_update_ts 549145 non-null object 16 cx_duration_secs 549145 non-null float64 17 mt_pct 549145 non-null float64 18 human_pct 549145 non-null float64 19 user_name 549145 non-null object 20 is_first_edit 549145 non-null bool 21 user_edit_bucket 549145 non-null object 22 user_rights_level 549145 non-null object 23 time_since_prev_edit 536482 non-null float64 24 is_page_deleted 549145 non-null bool 25 source_quality 545025 non-null float32 26 is_source_std_quality 545582 non-null float64 27 target_quality 511906 non-null float32 28 is_target_std_quality 517819 non-null float64 29 creations_1hr 549145 non-null int64 30 creations_6hr 549145 non-null int64 31 creations_24hr 549145 non-null int64 32 creations_72hr 549145 non-null int64 33 creations_7days 549145 non-null int64 34 creations_15days 549145 non-null int64 35 creations_30days 549145 non-null int64 dtypes: bool(3), float32(2), float64(7), int64(13), object(11) memory usage: 135.6+ MB CPU times: user 39.7 s, sys: 7.77 s, total: 47.5 s Wall time: 1h 57min 42s
output.to_csv('secrets/cx_data_raw.tsv', sep='\t', index=False)