My scratch notebook for exploring the mybinder analytics archive
Download raw events with gsutil (very quick after first run, ~2 minutes to start):
mkdir -p ./events/
gsutil -m rsync -r gs://binder-events-archive/ ./events/
Repack data with pandas for faster re-loading (~3 minutes for fresh data):
Load files with pandas and plot (~1s)
import datetime
import pandas as pd
import altair as alt
from pathlib import Path
from functools import lru_cache
from urllib.request import urlretrieve
from concurrent.futures import ThreadPoolExecutor
from tqdm.notebook import tqdm
category_columns = ["schema", "version", "provider", "status", "origin"]
def categoricalize(df):
"""Ensure categorical columns are categorical
For more efficient storage, processing
"""
dtypes = {}
for col in category_columns:
if col in df.columns:
dtypes[col] = "category"
if dtypes:
return df.astype(dtypes)
else:
return df
def uncategoricalize(df):
"""revert categories
groupby is _super_ slow on categoricals
https://github.com/pandas-dev/pandas/issues/32976
"""
dtypes = {}
for col, dtype in df.dtypes.items():
if isinstance(dtype, pd.CategoricalDtype):
dtypes[col] = dtype.categories.dtype
if dtypes:
return df.astype(dtypes)
else:
return df
@lru_cache(maxsize=60)
def _read_df(path):
df = pd.read_json(path, lines=True)
return categoricalize(df)
@lru_cache(maxsize=10)
def _read_dfs(*paths):
dfs = [_read_df(path) for path in paths]
if len(dfs) == 1:
return dfs[0]
else:
# concatenate, preserve categoricals with new values
return (
categoricalize(pd.concat(dfs, ignore_index=True))
.sort_values(["timestamp"])
.reset_index(drop=True)
)
%%time
events_dir = Path("events")
agg_dir = Path("aggregated")
agg_dir.mkdir(exist_ok=True)
daily_dir = agg_dir.joinpath("daily")
daily_dir.mkdir(exist_ok=True)
weekly_dir = agg_dir.joinpath("weekly")
weekly_dir.mkdir(exist_ok=True)
monthly_dir = agg_dir.joinpath("monthly")
monthly_dir.mkdir(exist_ok=True)
for parent in (daily_dir, weekly_dir, monthly_dir):
parent.joinpath("by-hour").mkdir(exist_ok=True)
parent.joinpath("by-day").mkdir(exist_ok=True)
# daily-by-hour
# weekly-by-day
jsonl_fmt = f"{events_dir}/events-%Y-%m-%d.jsonl"
daily_fmt = f"{daily_dir}/daily-%Y-%m-%d.feather"
weekly_fmt = f"{weekly_dir}/weekly-%Y-%m-%d"
def rounded_count(df, freq="H"):
# copy to avoid leaving our new column in the df
df = df.copy()
# add counting column
df["n"] = 1
df["timestamp"] = df.timestamp.dt.round(freq)
# exclude ref from aggregations
groupby = list(set(df.columns).difference({"ref", "n"}))
# uncategoricalize because groupby is crazy slow with categoricals
# must faster to copy the whole df multiple times!
return uncategoricalize(df).groupby(groupby).n.count().reset_index()
def _agg_and_save(src_list, window, date_str, debug=False):
for src in src_list:
if not os.path.exists(src):
print(f"Missing file to aggregate by {window} for {date_str}: {src}")
return
dest_fmt = str(agg_dir.joinpath("{window}/{agg}/{window}-{agg}-{date_str}.feather"))
dest_hourly = dest_fmt.format(window=window, agg="by-hour", date_str=date_str)
dest_daily = dest_fmt.format(window=window, agg="by-day", date_str=date_str)
if os.path.exists(dest_hourly) and os.path.exists(dest_daily):
if debug:
print(f"already have {dest_hourly} and {dest_daily}")
return
df = _read_dfs(*src_list)
if debug:
print(f"Aggregating {len(df)} rows to {dest_hourly} and {dest_hourly}")
h = rounded_count(df, freq="H")
h.to_feather(dest_hourly)
if debug:
print(
f"Wrote {len(h)}/{len(df)} ({len(h) / len(df):.0%}) rows to {dest_hourly}"
)
d = rounded_count(df, freq="D")
d.to_feather(dest_daily)
if debug:
print(f"Wrote {len(d)}/{len(df)} ({len(d) / len(df):.0%}) rows to {dest_daily}")
def aggregate_day(day):
# hourly counts by r
src = day.strftime(jsonl_fmt)
date_str = day.strftime("%Y-%m-%d")
_agg_and_save([src], "daily", date_str)
def aggregate_week(day):
iso_day = day.isocalendar()
week_start = day - datetime.timedelta(days=iso_day.weekday - 1)
date_str = day.strftime("%Y-w%W")
src_list = []
for i in range(7):
day = week_start + datetime.timedelta(days=i)
assert day.isocalendar().week == iso_day.week
src = day.strftime(jsonl_fmt)
src_list.append(src)
_agg_and_save(src_list, "weekly", date_str)
def aggregate_month(day):
src_list = []
month = day.month
day = datetime.date(year=day.year, month=day.month, day=1)
date_str = day.strftime("%Y-%m")
while day.month == month:
src_list.append(day.strftime(jsonl_fmt))
day = day + datetime.timedelta(days=1)
_agg_and_save(src_list, "monthly", date_str)
def aggregate(start_date=datetime.date(2019, 1, 1), end_date=datetime.date.today()):
day = start_date
total_days = int((end_date - start_date).total_seconds() // (24 * 3600))
days = tqdm(unit="day", desc="days", total=total_days)
weeks = tqdm(unit="week", desc="weeks", total = total_days // 7)
months = tqdm(unit="month", desc="months", total = total_days // 31)
while day < end_date:
aggregate_day(day)
if day.isocalendar().weekday == 7:
aggregate_week(day)
weeks.update(1)
if (day + datetime.timedelta(days=1)).month != day.month:
aggregate_month(day)
months.update(1)
day += datetime.timedelta(days=1)
days.update(1)
days.close()
weeks.close()
months.close()
aggregate()
days: 0%| | 0/1305 [00:00<?, ?day/s]
weeks: 0%| | 0/186 [00:00<?, ?week/s]
months: 0%| | 0/43 [00:00<?, ?month/s]
CPU times: user 2min 45s, sys: 16.2 s, total: 3min 1s Wall time: 3min
!du -hs events
6.6G events
!du -hs aggregated/*
284M aggregated/daily 237M aggregated/monthly 245M aggregated/weekly
import matplotlib.pyplot as plt
%%time
def get_monthly_data(by="day"):
frames = [pd.read_feather(f) for f in monthly_dir.glob(f"by-{by}/*.feather")]
return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True))
def get_weekly_data(by="day"):
frames = [pd.read_feather(f) for f in weekly_dir.glob(f"by-{by}/*.feather")]
return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True))
df = get_weekly_data()
df.origin.fillna("gke.mybinder.org", inplace=True)
CPU times: user 1.13 s, sys: 378 ms, total: 1.51 s Wall time: 1.52 s
len(df)
2038508
%%time
uncategoricalize(df).groupby("provider").n.sum()
CPU times: user 170 ms, sys: 42.4 ms, total: 212 ms Wall time: 220 ms
provider Dataverse 347 Figshare 342 Gist 402479 Git 293521 GitHub 26183517 GitLab 102849 Hydroshare 531 Zenodo 2570 Name: n, dtype: int64
origins = {
'binder.mybinder.turing.ac.uk': "turing.mybinder.org",
"binder.mybinder.ovh": "ovh.mybinder.org",
"notebooks.gesis.org": "gesis.mybinder.org",
"gke2.mybinder.org": "gke.mybinder.org",
"gke1.mybinder.org": "gke.mybinder.org",
}
df["federation"] = df.origin.apply(lambda x: origins.get(x, x)).str.split(".").str[0]
df.origin.unique()
['gke.mybinder.org', 'ovh.mybinder.org', 'binder.mybinder.ovh', 'notebooks.gesis.org', 'gke.mybinder.org:443', ..., 'binder.mybinder.turing.ac.uk', 'gesis.mybinder.org', 'ovh.mybinder.org:8893', 'gke2.mybinder.org', 'gke1.mybinder.org'] Length: 11 Categories (11, object): ['binder.mybinder.ovh', 'binder.mybinder.turing.ac.uk', 'gesis.mybinder.org', 'gke.mybinder.org', ..., 'notebooks.gesis.org', 'ovh.mybinder.org', 'ovh.mybinder.org:8893', 'turing.mybinder.org']
df.columns
Index(['provider', 'schema', 'timestamp', 'spec', 'status', 'origin', 'version', 'n', 'build_token', 'federation'], dtype='object')
%%time
uncategoricalize(df).groupby("provider").n.sum()
CPU times: user 170 ms, sys: 29.8 ms, total: 200 ms Wall time: 197 ms
provider Dataverse 339 Figshare 342 Gist 400411 Git 290204 GitHub 25982342 GitLab 100711 Hydroshare 514 Zenodo 2558 Name: n, dtype: int64
%%time
counts = (
uncategoricalize(df).groupby(["timestamp", "federation"]).n.sum()
)
counts = counts.unstack() # .fillna(0)
seven_day_counts = counts.rolling("7D").mean()
CPU times: user 267 ms, sys: 37.5 ms, total: 305 ms Wall time: 314 ms
seven_day_counts.plot()
plt.title("Daily user sessions (7-day average)")
# counts.plot(kind="area", stacked=True)
Text(0.5, 1.0, 'Daily user sessions (7-day average)')
seven_day_counts.plot(kind="area", stacked=True)
plt.title("Daily user sessions (cumulative)")
Text(0.5, 1.0, 'Daily user sessions (cumulative)')