#!/usr/bin/env python # coding: utf-8 # # Aggregating mybinder.org events archive # # Collect data from https://archive.analytics.mybinder.org # and aggregate into fewer, more efficient .feather files, # for easier loading and plotting of ALL THE DATA. # # Rather than loading all the original event data, the `ref` column is dropped, # counting builds per _spec_ (not quite the same as repo, since spec includes the _unresolved_ ref) # on a given time interval (hourly or daily). # This reduces the storage size by ~120x without losing any data I'm interested in. # # It's not that big, but file load times are long with the original JSON, and it all fits in a pretty small (~100MB) feather file. # # # | which | files | total size | # | ----- | ----- | -----------| # | original `.jsonl` | 1199 | 6.3GB | # | daily files, counts by hour `.feather` | 1199 | 147MB | # | monthly files, counts by day `.feather` | 40 | 56MB | # # In[1]: import datetime import pandas as pd import altair as alt from pathlib import Path from functools import lru_cache from urllib.request import urlretrieve from concurrent.futures import ThreadPoolExecutor from tqdm.notebook import tqdm category_columns = ["schema", "version", "provider", "status", "origin"] def categoricalize(df): """Ensure categorical columns are categorical For more efficient storage, processing """ dtypes = {} for col in category_columns: if col in df.columns: dtypes[col] = "category" if dtypes: return df.astype(dtypes) else: return df def uncategoricalize(df): """revert categories groupby is _super_ slow on categoricals https://github.com/pandas-dev/pandas/issues/32976 """ dtypes = {} for col, dtype in df.dtypes.items(): if isinstance(dtype, pd.CategoricalDtype): dtypes[col] = dtype.categories.dtype if dtypes: return df.astype(dtypes) else: return df @lru_cache(maxsize=60) def _read_df(path): df = pd.read_json(path, lines=True) return categoricalize(df) @lru_cache(maxsize=10) def _read_dfs(*paths): dfs = [_read_df(path) for path in paths] if len(dfs) == 1: return dfs[0] else: # concatenate, preserve categoricals with new values return ( categoricalize(pd.concat(dfs, ignore_index=True)) .sort_values(["timestamp"]) .reset_index(drop=True) ) # Quickest way to get the whole archive (~6.5G): # # ```bash # gsutil -m rsync -r gs://binder-events-archive/ ./events/ # ``` # In[2]: events_dir = Path("events") agg_dir = Path("aggregated") agg_dir.mkdir(exist_ok=True) daily_dir = agg_dir.joinpath("daily") daily_dir.mkdir(exist_ok=True) weekly_dir = agg_dir.joinpath("weekly") weekly_dir.mkdir(exist_ok=True) monthly_dir = agg_dir.joinpath("monthly") monthly_dir.mkdir(exist_ok=True) for parent in (daily_dir, weekly_dir, monthly_dir): parent.joinpath("by-hour").mkdir(exist_ok=True) parent.joinpath("by-day").mkdir(exist_ok=True) # daily-by-hour # weekly-by-day jsonl_fmt = f"{events_dir}/events-%Y-%m-%d.jsonl" daily_fmt = f"{daily_dir}/daily-%Y-%m-%d.feather" weekly_fmt = f"{weekly_dir}/weekly-%Y-%m-%d" def rounded_count(df, freq="H"): # copy to avoid leaving our new column in the df df = df.copy() # add counting column df["n"] = 1 df["timestamp"] = df.timestamp.dt.round(freq) # exclude ref from aggregations groupby = list(set(df.columns).difference({"ref", "n"})) # uncategoricalize because groupby is crazy slow with categoricals # must faster to copy the whole df multiple times! return uncategoricalize(df).groupby(groupby).n.count().reset_index() def _agg_and_save(src_list, window, date_str, debug=False): for src in src_list: if not os.path.exists(src): print(f"Missing file to aggregate by {window} for {date_str}: {src}") return dest_fmt = str(agg_dir.joinpath("{window}/{agg}/{window}-{agg}-{date_str}.feather")) dest_hourly = dest_fmt.format(window=window, agg="by-hour", date_str=date_str) dest_daily = dest_fmt.format(window=window, agg="by-day", date_str=date_str) if os.path.exists(dest_hourly) and os.path.exists(dest_daily): if debug: print(f"already have {dest_hourly} and {dest_daily}") return df = _read_dfs(*src_list) if debug: print(f"Aggregating {len(df)} rows to {dest_hourly} and {dest_hourly}") h = rounded_count(df, freq="H") h.to_feather(dest_hourly) if debug: print( f"Wrote {len(h)}/{len(df)} ({len(h) / len(df):.0%}) rows to {dest_hourly}" ) d = rounded_count(df, freq="D") d.to_feather(dest_daily) if debug: print(f"Wrote {len(d)}/{len(df)} ({len(d) / len(df):.0%}) rows to {dest_daily}") def aggregate_day(day): # hourly counts by r src = day.strftime(jsonl_fmt) date_str = day.strftime("%Y-%m-%d") _agg_and_save([src], "daily", date_str) def aggregate_week(day): iso_day = day.isocalendar() week_start = day - datetime.timedelta(days=iso_day.weekday - 1) date_str = day.strftime("%Y-w%W") src_list = [] for i in range(7): day = week_start + datetime.timedelta(days=i) assert day.isocalendar().week == iso_day.week src = day.strftime(jsonl_fmt) src_list.append(src) _agg_and_save(src_list, "weekly", date_str) def aggregate_month(day): src_list = [] month = day.month day = datetime.date(year=day.year, month=day.month, day=1) date_str = day.strftime("%Y-%m") while day.month == month: src_list.append(day.strftime(jsonl_fmt)) day = day + datetime.timedelta(days=1) _agg_and_save(src_list, "monthly", date_str) def aggregate(start_date=datetime.date(2019, 1, 1), end_date=datetime.date.today()): day = start_date total_days = int((end_date - start_date).total_seconds() // (24 * 3600)) days = tqdm(unit="day", desc="days", total=total_days) weeks = tqdm(unit="week", desc="weeks", total = total_days // 7) months = tqdm(unit="month", desc="months", total = total_days // 30) while day < end_date: aggregate_day(day) if day.isocalendar().weekday == 7: aggregate_week(day) weeks.update(1) if (day + datetime.timedelta(days=1)).month != day.month: aggregate_month(day) months.update(1) day += datetime.timedelta(days=1) days.update(1) days.close() weeks.close() months.close() aggregate() # In[3]: import matplotlib.pyplot as plt # In[4]: import glob def get_monthly_data(by="day"): frames = [pd.read_feather(f) for f in monthly_dir.glob(f"by-{by}/*.feather")] return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True)) def get_weekly_data(by="day"): frames = [pd.read_feather(f) for f in weekly_dir.glob(f"by-{by}/*.feather")] return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True)) df = get_monthly_data() df.origin.fillna("gke.mybinder.org", inplace=True) # In[5]: len(df) # In[6]: origins = { 'binder.mybinder.turing.ac.uk': "turing.mybinder.org", "binder.mybinder.ovh": "ovh.mybinder.org", "notebooks.gesis.org": "gesis.mybinder.org", "gke2.mybinder.org": "gke.mybinder.org", "gke1.mybinder.org": "gke.mybinder.org", } df["federation"] = df.origin.apply(lambda x: origins.get(x, x)).str.split(".").str[0] # df.origin.unique() # In[7]: df2021 = df[df.timestamp.dt.year==2021].copy() len(df2021) # In[8]: print(f"Total launches in 2021: {df2021.n.sum():,d}") # Create derivative 'repo' column, stripping unresolved ref from spec # # 1. git/gitlab have url-encoded repo as first part, with ref after '/' # 2. gist/github have user/(repo|gist)[/ref], so repo is first _two_ parts # In[9]: # start with spec as-is, most providers don't include ref df2021["repo"] = df2021.spec strip_spec = df2021.provider.isin({"Git", "GitLab"}) df2021.loc[strip_spec, ["repo"]] = df2021[strip_spec].spec.str.split("/", 1).str[0] github = df2021.provider.isin({"GitHub", "Gist"}) df2021.loc[github, ["repo"]] = df2021[github].spec.str.split("/", 2).str[:2].str.join("/") # In[10]: print(f"Total unique repos in 2021: {len(df2021.repo.unique()):,d}") # In[11]: len(df2021[["provider", "repo"]].apply(lambda row: f"{row.provider}:{row.repo}", axis=1).unique()) # In[12]: df.provider.value_counts() # In[13]: df2021.repo.value_counts().head(10) # In[14]: counts = uncategoricalize(df).groupby(["timestamp", "federation"]).n.sum() counts = counts.unstack() # .fillna(0) seven_day_counts = counts.rolling("7D").mean() seven_day_counts.plot() plt.title("Daily user sessions (7-day average)") # counts.plot(kind="area", stacked=True) # In[15]: seven_day_counts.plot(kind="area", stacked=True) plt.title("Daily user sessions (cumulative)")