#!/usr/bin/env python # coding: utf-8 # # Aggregate statistics for mybinder.org # # # My scratch notebook for exploring the [mybinder analytics archive](https://archive.analytics.mybinder.org) # # 1. Download raw events with gsutil (very quick after first run, ~2 minutes to start): # # ```bash # mkdir -p ./events/ # gsutil -m rsync -r gs://binder-events-archive/ ./events/ # ``` # 2. Repack data with pandas for faster re-loading (~3 minutes for fresh data): # - convert categorical columns to pandas categoricals (saves space, some loading time, but # - serialize to feather # - aggregate counts, dropping ref column and rounding timestamps. This significantly reduces the row count, # without losing any _useful_ information. Ref could be kept without _too_ much cost, I think. # - Aggregations are: # - files per day, week, or month # - counts per day or hour # # 3. Load files with pandas and plot (~1s) # In[1]: import datetime import pandas as pd import altair as alt from pathlib import Path from functools import lru_cache from urllib.request import urlretrieve from concurrent.futures import ThreadPoolExecutor from tqdm.notebook import tqdm category_columns = ["schema", "version", "provider", "status", "origin"] def categoricalize(df): """Ensure categorical columns are categorical For more efficient storage, processing """ dtypes = {} for col in category_columns: if col in df.columns: dtypes[col] = "category" if dtypes: return df.astype(dtypes) else: return df def uncategoricalize(df): """revert categories groupby is _super_ slow on categoricals https://github.com/pandas-dev/pandas/issues/32976 """ dtypes = {} for col, dtype in df.dtypes.items(): if isinstance(dtype, pd.CategoricalDtype): dtypes[col] = dtype.categories.dtype if dtypes: return df.astype(dtypes) else: return df @lru_cache(maxsize=60) def _read_df(path): df = pd.read_json(path, lines=True) return categoricalize(df) @lru_cache(maxsize=10) def _read_dfs(*paths): dfs = [_read_df(path) for path in paths] if len(dfs) == 1: return dfs[0] else: # concatenate, preserve categoricals with new values return ( categoricalize(pd.concat(dfs, ignore_index=True)) .sort_values(["timestamp"]) .reset_index(drop=True) ) # In[2]: get_ipython().run_cell_magic('time', '', 'events_dir = Path("events")\nagg_dir = Path("aggregated")\nagg_dir.mkdir(exist_ok=True)\ndaily_dir = agg_dir.joinpath("daily")\ndaily_dir.mkdir(exist_ok=True)\nweekly_dir = agg_dir.joinpath("weekly")\nweekly_dir.mkdir(exist_ok=True)\nmonthly_dir = agg_dir.joinpath("monthly")\nmonthly_dir.mkdir(exist_ok=True)\n\nfor parent in (daily_dir, weekly_dir, monthly_dir):\n parent.joinpath("by-hour").mkdir(exist_ok=True)\n parent.joinpath("by-day").mkdir(exist_ok=True)\n\n# daily-by-hour\n# weekly-by-day\n\njsonl_fmt = f"{events_dir}/events-%Y-%m-%d.jsonl"\ndaily_fmt = f"{daily_dir}/daily-%Y-%m-%d.feather"\nweekly_fmt = f"{weekly_dir}/weekly-%Y-%m-%d"\n\ndef rounded_count(df, freq="H"):\n # copy to avoid leaving our new column in the df\n df = df.copy()\n # add counting column\n df["n"] = 1\n df["timestamp"] = df.timestamp.dt.round(freq)\n # exclude ref from aggregations\n groupby = list(set(df.columns).difference({"ref", "n"}))\n # uncategoricalize because groupby is crazy slow with categoricals\n # must faster to copy the whole df multiple times!\n return uncategoricalize(df).groupby(groupby).n.count().reset_index()\n\n\ndef _agg_and_save(src_list, window, date_str, debug=False):\n for src in src_list:\n if not os.path.exists(src):\n print(f"Missing file to aggregate by {window} for {date_str}: {src}")\n return\n dest_fmt = str(agg_dir.joinpath("{window}/{agg}/{window}-{agg}-{date_str}.feather"))\n dest_hourly = dest_fmt.format(window=window, agg="by-hour", date_str=date_str)\n dest_daily = dest_fmt.format(window=window, agg="by-day", date_str=date_str)\n\n if os.path.exists(dest_hourly) and os.path.exists(dest_daily):\n if debug:\n print(f"already have {dest_hourly} and {dest_daily}")\n return\n\n df = _read_dfs(*src_list)\n if debug:\n print(f"Aggregating {len(df)} rows to {dest_hourly} and {dest_hourly}")\n\n h = rounded_count(df, freq="H")\n h.to_feather(dest_hourly)\n if debug:\n print(\n f"Wrote {len(h)}/{len(df)} ({len(h) / len(df):.0%}) rows to {dest_hourly}"\n )\n\n d = rounded_count(df, freq="D")\n d.to_feather(dest_daily)\n if debug:\n print(f"Wrote {len(d)}/{len(df)} ({len(d) / len(df):.0%}) rows to {dest_daily}")\n\n\ndef aggregate_day(day):\n # hourly counts by r\n src = day.strftime(jsonl_fmt)\n date_str = day.strftime("%Y-%m-%d")\n _agg_and_save([src], "daily", date_str)\n\n\ndef aggregate_week(day):\n iso_day = day.isocalendar()\n week_start = day - datetime.timedelta(days=iso_day.weekday - 1)\n date_str = day.strftime("%Y-w%W")\n\n src_list = []\n for i in range(7):\n day = week_start + datetime.timedelta(days=i)\n assert day.isocalendar().week == iso_day.week\n src = day.strftime(jsonl_fmt)\n src_list.append(src)\n _agg_and_save(src_list, "weekly", date_str)\n\n\ndef aggregate_month(day):\n src_list = []\n month = day.month\n day = datetime.date(year=day.year, month=day.month, day=1)\n date_str = day.strftime("%Y-%m")\n while day.month == month:\n src_list.append(day.strftime(jsonl_fmt))\n day = day + datetime.timedelta(days=1)\n _agg_and_save(src_list, "monthly", date_str)\n\n\ndef aggregate(start_date=datetime.date(2019, 1, 1), end_date=datetime.date.today()):\n day = start_date\n total_days = int((end_date - start_date).total_seconds() // (24 * 3600))\n days = tqdm(unit="day", desc="days", total=total_days)\n weeks = tqdm(unit="week", desc="weeks", total = total_days // 7)\n months = tqdm(unit="month", desc="months", total = total_days // 31)\n\n while day < end_date:\n aggregate_day(day)\n if day.isocalendar().weekday == 7:\n aggregate_week(day)\n weeks.update(1)\n if (day + datetime.timedelta(days=1)).month != day.month:\n aggregate_month(day)\n months.update(1)\n day += datetime.timedelta(days=1)\n days.update(1)\n days.close()\n weeks.close()\n months.close()\n\n\naggregate()\n') # In[3]: get_ipython().system('du -hs events') # In[4]: get_ipython().system('du -hs aggregated/*') # In[5]: import matplotlib.pyplot as plt # In[6]: get_ipython().run_cell_magic('time', '', '\ndef get_monthly_data(by="day"):\n frames = [pd.read_feather(f) for f in monthly_dir.glob(f"by-{by}/*.feather")]\n return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True))\n\ndef get_weekly_data(by="day"):\n frames = [pd.read_feather(f) for f in weekly_dir.glob(f"by-{by}/*.feather")]\n return categoricalize(pd.concat(frames).sort_values("timestamp").reset_index(drop=True))\n\n\ndf = get_weekly_data()\ndf.origin.fillna("gke.mybinder.org", inplace=True)\n') # In[7]: len(df) # In[8]: df.n.sum() # In[9]: get_ipython().run_cell_magic('time', '', 'uncategoricalize(df).groupby("provider").n.sum()\n') # In[10]: origins = { 'binder.mybinder.turing.ac.uk': "turing.mybinder.org", "binder.mybinder.ovh": "ovh.mybinder.org", "ovh2.mybinder.org": "ovh.mybinder.org", "ovh-test.mybinder.org": "ovh.mybinder.org", "notebooks.gesis.org": "gesis.mybinder.org", "notebooks-test.gesis.org": "gesis.mybinder.org", "gke2.mybinder.org": "gke.mybinder.org", "gke1.mybinder.org": "gke.mybinder.org", } df["federation"] = df.origin.apply(lambda x: origins.get(x, x)).str.split(".").str[0] list(df.origin.unique()), list(df.federation.unique()) # jovian.ml flooded unique repos that weren't really unique # In[11]: df.loc[df.spec.str.contains("jovian.ml"), "spec"] = "jovian.ml/$ref" # In[14]: get_ipython().run_cell_magic('time', '', 'uncategoricalize(df).groupby("provider").n.sum().sort_values(ascending=False)\n') # In[15]: print(f"{df.n.sum():,d}") # In[16]: monthly_count = df.groupby([df.timestamp.dt.strftime("%Y-%m")]).n.sum().cumsum().reset_index() monthly_count.head() # In[17]: alt.Chart( monthly_count, title="Total user sessions on mybinder.org", width=300, height=300, ).mark_line().encode( x=alt.X("timestamp:T", title="date"), y=alt.Y("n", title="sessions"), ) # Create derivative 'repo' column, stripping unresolved ref from spec # # 1. git/gitlab have url-encoded repo as first part, with ref after '/' # 2. gist/github have user/(repo|gist)[/ref], so repo is first _two_ parts # In[18]: df["repo"] = df.spec strip_spec = df.provider.isin({"Git", "GitLab"}) df.loc[strip_spec, ["repo"]] = df[strip_spec].spec.str.split("/", n=1).str[0] github = df.provider.isin({"GitHub", "Gist"}) df.loc[github, ["repo"]] = df[github].spec.str.split("/", n=2).str[:2].str.join("/") # In[19]: print(f"Total unique repos ever: {len(df.repo.unique()):,d}") # In[20]: print(f"Total unique repos in 2022: {len(df[df.timestamp.dt.year==2022].repo.unique()):,d}") # In[21]: len(df[["provider", "repo"]].apply(lambda row: f"{row.provider}:{row.repo}", axis=1).unique()) # In[22]: df.provider.value_counts() # In[23]: df.repo.value_counts().head(10) # In[24]: #without_jovyan = df[~df.spec.str.contains("jovian.ml")] #d = without_jovyan monthly = df.groupby([df.timestamp.dt.strftime("%Y-%m"), 'repo']).n.sum().reset_index().groupby('timestamp').repo.count().reset_index() # In[25]: monthly.tail() # In[26]: alt.Chart(monthly[:-1], title="Monthly unique repositories").mark_bar().encode(x="timestamp:T", y="repo:Q") # In[27]: yearly = df.groupby([df.timestamp.dt.strftime("%Y"), 'repo']).n.sum().reset_index().groupby('timestamp').repo.count().reset_index() alt.Chart(yearly).mark_bar().encode(x="timestamp", y="repo") # In[28]: get_ipython().run_cell_magic('time', '', 'counts = (\n uncategoricalize(df).groupby(["timestamp", "federation"]).n.sum()\n)\ncounts = counts.unstack() # .fillna(0)\nseven_day_counts = counts.rolling("7D").mean()\n') # In[29]: seven_day_counts.plot() plt.title("Daily user sessions (7-day average)") # counts.plot(kind="area", stacked=True) # In[30]: seven_day_counts.plot(kind="area", stacked=True) plt.title("Daily user sessions (cumulative)")