Thomas Viehmann tv@lernapparat.de
Dieser Code und die Verfahren werden in https://lernapparat.de/rki-nowcasting/ dokumentiert.
Bitte beachten Sie die Hinweise dort zu den Bedingungen der Nutzung dieses Codes und der Berechnungsresultate.
import pandas
if 'get_ipython' in dir():
INTERACTIVE = True
%matplotlib inline
else:
INTERACTIVE = False
from matplotlib import pyplot
import numpy
import datetime
import math
import matplotlib
import scipy.stats
#import seaborn
import pathlib
#seaborn.set()
import os
import shutil
import itertools
def reverse_cumprod(x):
return x[::-1].cumprod()[::-1]
def reverse_cumsum(x):
return x[::-1].cumsum()[::-1]
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
def smooth(s, size=4):
return pandas.Series(numpy.convolve(s, numpy.full((size,), 1/size))[:-(size-1)], index=s.index, name=s.name)
bl_abbr = {"Deutschland": "D", "Baden-Württemberg": "BW", "Bayern": "BY", "Berlin": "BE", "Brandenburg": "BB", "Bremen": "HB", "Hamburg": "HH",
"Hessen": "HE", "Mecklenburg-Vorpommern": "MV", "Niedersachsen": "NI", "Nordrhein-Westfalen": "NW",
"Rheinland-Pfalz": "RP", "Saarland": "SL", "Sachsen": "SN", "Sachsen-Anhalt": "ST", "Schleswig-Holstein": "SH",
"Thüringen": "TH"}
p_output = pathlib.Path('./outputs/')
def copyfile(src, dst):
tmp = str(dst)+'.tmp'
shutil.copy(src, tmp)
os.replace(tmp, dst)
dfs = []
for date in pandas.date_range('2020-04-29', '2020-05-21', freq='D'):
dstr = date.date().isoformat()
adf = pandas.read_csv(f'data/rki_covid19.{dstr}.csv')
adf['Datenstand'] = pandas.to_datetime(adf.Datenstand.apply(lambda s: '-'.join(s.split(',')[0].split('.')[::-1])))
assert (date==adf['Datenstand'].min()) and (date==adf['Datenstand'].max())
dfs.append(adf)
large_df = pandas.concat(dfs, axis=0, sort=True)
for k in ('Meldedatum', 'Datenstand', 'Refdatum'):
large_df[k] = pandas.to_datetime(large_df[k])
large_df['delay'] = (large_df.Meldedatum - large_df.Refdatum).dt.days # nur gültig für IstErkrankungsbeginn==1
large_df['Eingangsdatum'] = large_df.Datenstand - pandas.Timedelta(days=1) # nur gültig für NeuerFall==1
large_df['delay_eingang'] = (large_df.Eingangsdatum - large_df.Refdatum).dt.days # nur gültig für NeuerFall==1
# df['Meldedatum'] = df.Meldedatum.dt.tz_localize(None)
def plot_vergleich(eigene, vergleich, titel, fn=None):
eigene_smooth = smooth(eigene)[vergleich.index.min():vergleich.index.max()]
pyplot.figure(figsize=(15, 5))
pyplot.suptitle(titel)
pyplot.subplot(1,3,1)
pyplot.plot(eigene_smooth, label='TV')
pyplot.plot(vergleich, label='RKI')
pyplot.xticks(rotation=90)
pyplot.legend()
pyplot.subplot(1,3,2)
pyplot.plot(eigene_smooth[-20:], label='TV')
pyplot.plot(vergleich[-20:], label='RKI')
pyplot.xticks(rotation=90)
pyplot.legend()
pyplot.subplot(1,3,3)
pyplot.plot(eigene_smooth[-20:]-vergleich[-20:], label='Differenz')
pyplot.xticks(rotation=90)
pyplot.legend()
if fn is not None:
pyplot.savefig(fn, bbox_inches = "tight")
pyplot.show()
pyplot.close()
def faelle_mit_erkrankungsbeginn_nach_erkrankungsbeginn(large_df):
# eigentlich müsste es Delay zum jeweiligen Eingang sein. Kennen wir i.A. aber nicht
known = large_df[(large_df.Datenstand == large_df.Datenstand.max())
& (large_df.IstErkrankungsbeginn == 1)
& (large_df.NeuerFall >= 0)
& (large_df.delay >= 0)
& (large_df.delay <= 30)].groupby(['Refdatum']).AnzahlFall.sum()
known = known.reindex(pandas.date_range('2020-01-01', large_df.Eingangsdatum.max()), fill_value=0)
return known
def faelle_ohne_erkrankungsbeginn_nach_rekonstruiertem_eingangsdatum(large_df, missing_tol=2):
unbekannte_roh = large_df[(large_df.IstErkrankungsbeginn == 0)
& (large_df.NeuerFall >= 0)].groupby(['Datenstand', 'NeuerFall', 'Meldedatum', 'Altersgruppe', 'Geschlecht']).AnzahlFall.sum()
unbekannte_roh = unbekannte_roh.reindex(pandas.MultiIndex.from_product(unbekannte_roh.index.levels, names=unbekannte_roh.index.names),
fill_value=0)
new_cases_melde = {}
old_cases = None
newer_sum = 0
for d in unbekannte_roh.index.levels[0][::-1]:
if old_cases is None:
old_cases = unbekannte_roh[d, 0]
new_cases = unbekannte_roh[d, 1]
else:
last_old_cases = old_cases
old_cases = (old_cases - unbekannte_roh.loc[d, 1]).clip(lower=0)
new_cases = last_old_cases - old_cases
newer_sum += new_cases.sum()
# Eingangsdatum = Datum vor Datenstand
new_cases_melde[d - pandas.Timedelta(days=1)] = new_cases
new_cases = pandas.DataFrame(new_cases_melde).sort_index(axis=1)
new_new = new_cases.sum(level=[1,2])
unknown_new_cases_per_day = pandas.concat([old_cases[:d-pandas.Timedelta(days=2)].unstack(level=0, fill_value=0), new_new], axis=1)
missing = old_cases[d-pandas.Timedelta(days=1):]
missing = missing[missing > 0]
# Wir verlieren für D nach Alter/Geschlecht mind. 2 Fälle, die offenbar nie als neuer Fall markiert sind.
# Für Teil-Daten ggf. mehr.
print(missing.sum())
assert missing.sum() <= missing_tol
return unknown_new_cases_per_day
def verzug_von_erkrankung_bis_eingang(large_df):
bekannte_roh = large_df[(large_df.IstErkrankungsbeginn == 1)
& (large_df.NeuerFall == 1)
& (large_df.delay_eingang >= 0)
& (large_df.delay_eingang <= 30)].groupby(['Eingangsdatum', 'Altersgruppe', 'Geschlecht', 'delay_eingang']).AnzahlFall.sum()
bekannte_roh = bekannte_roh.unstack(level=0, fill_value=0)
bekannte_roh = bekannte_roh.reindex(pandas.MultiIndex.from_product(bekannte_roh.index.levels[:-1]+[pandas.RangeIndex(0, 31)], names=bekannte_roh.index.names),
fill_value=0)
bekannte_roh_old = large_df[(large_df.IstErkrankungsbeginn == 1)
& (large_df.Datenstand == large_df.Datenstand.min())
& (large_df.NeuerFall == 0)
& (large_df.delay >= 0)
& (large_df.delay <= 30)].groupby(['Meldedatum', 'Altersgruppe', 'Geschlecht', 'delay']).AnzahlFall.sum()
bekannte_roh_old = bekannte_roh_old.reindex(pandas.MultiIndex.from_product(bekannte_roh_old.index.levels, names=bekannte_roh_old.index.names),
fill_value=0).unstack(level=0)
# wir verlieren einen Fall
assert bekannte_roh_old.loc[:, bekannte_roh.columns.min():].sum().sum() <= 1
delay_known_new_cases = pandas.concat([bekannte_roh_old.loc[:, :bekannte_roh.columns.min()-pandas.Timedelta(days=1)], bekannte_roh], axis=1)
delay_known_new_cases = delay_known_new_cases.reindex(pandas.date_range('2020-01-01',
delay_known_new_cases.columns.max()), axis=1, fill_value=0)
delay_known_new_cases.index.names = ['Altersgruppe', 'Geschlecht', 'delay']
delay_known_new_cases.columns.name = 'Eingangsdatum'
return delay_known_new_cases
def verzug_von_erkrankung_bis_eingang_nach_erkrankung(large_df):
bekannte_roh = large_df[(large_df.IstErkrankungsbeginn == 1)
& (large_df.NeuerFall == 1)
& (large_df.delay_eingang >= 0)
& (large_df.delay_eingang <= 30)].groupby(['Refdatum', 'Altersgruppe', 'Geschlecht', 'delay_eingang']).AnzahlFall.sum()
bekannte_roh_old = large_df[(large_df.IstErkrankungsbeginn == 1)
& (large_df.Datenstand == large_df.Datenstand.min())
& (large_df.NeuerFall == 0)
& (large_df.delay >= 0)
& (large_df.delay <= 30)].groupby(['Refdatum', 'Altersgruppe', 'Geschlecht', 'delay']).AnzahlFall.sum()
bekannte_roh.index.names = bekannte_roh.index.names[:-1] + ['delay']
bekannte_roh = bekannte_roh.unstack(level=0, fill_value=0)
bekannte_roh_old = bekannte_roh_old.unstack(level=0, fill_value=0)
bekannte_roh = bekannte_roh.reindex(pandas.MultiIndex.from_product(bekannte_roh.index.levels[:-1]+[pandas.RangeIndex(0, 31)],
names=bekannte_roh.index.names),
fill_value=0)
bekannte_roh_old = bekannte_roh_old.reindex(pandas.MultiIndex.from_product(bekannte_roh_old.index.levels[:-1]+[pandas.RangeIndex(0, 31)],
names=bekannte_roh_old.index.names),
fill_value=0)
return bekannte_roh.add(bekannte_roh_old, fill_value=0)
def imputiere_erkrankungsdaten(unknown_new_cases_per_day, delay_known_new_cases, reuse_tol=200, completely_missing_tol=0):
dist = None
melde_range = pandas.date_range('2020-01-01', unknown_new_cases_per_day.columns.max(), name='Refdatum')
reuse_count = 0
completely_missing_count = 0
imputed_by_day_and_cat = {}
for date in unknown_new_cases_per_day.columns:
#print ("###", dw_start, unknown_new_cases_per_day.loc[:, dw_start: dw_end].sum())
##['A00-A04', 'A05-A14', 'A15-A34', 'A35-A59', 'A60-A79', 'A80+', 'unbekannt']:
for ag in unknown_new_cases_per_day.index.levels[0]:
for g in unknown_new_cases_per_day.index.levels[1]:
section = unknown_new_cases_per_day.loc[ag, g]
section = section[date:date] # note: This is including the upper boundary!
section = section.reindex(melde_range, fill_value=0)
count = section.sum()
unnormed_dist = delay_known_new_cases.loc[ag, g].loc[:, date]
unnormed_count = unnormed_dist.sum()
if unnormed_count == 0 and count > 0:
# print("### Warning", date, ag, g, count)
reuse_count += count
elif unnormed_count > 0:
dist = unnormed_dist / unnormed_count
if count > 0 and dist is not None:
imputed_section = numpy.convolve(section, dist[::-1])[-len(section):] # we only want temporal padding
imputed_by_day_and_cat[(date, ag, g)] = pandas.Series(imputed_section, index=melde_range)
assert count - imputed_section.sum() < 2, "loosing too many cases in imputation"
elif count > 0:
completely_missing_count += count
if INTERACTIVE:
print("reused:", reuse_count, "completely missing:", completely_missing_count)
assert completely_missing_count <= completely_missing_tol # in D eigentlich nicht.
assert reuse_count <= reuse_tol # Stand 17. Mai: 179 Einzelfälle ohne andere Fälle in gleicher Kategorie in D
imputed = pandas.DataFrame(imputed_by_day_and_cat).T
imputed.index.names = ['Eingangsdatum', 'Altersgruppe', 'Geschlecht']
#imputed.columns.name = 'Eingangsdatum'
imputed = imputed.reindex(pandas.date_range('2020-01-01', large_df.Eingangsdatum.max(), name=imputed.columns.names[0]), axis=1)
return imputed
def nowcast_lawless(known_cases, imputed, delay_cases):
assert delay_cases.columns.name == "Refdatum"
counts = delay_cases.iloc[:, -8-30:].sum(level=2).T[::-1].values
cumulative_over_days_per_delay = numpy.concatenate([numpy.zeros_like(counts[:1]), counts.cumsum(0)], axis=0)
counts8 = numpy.tril(cumulative_over_days_per_delay[8:]-cumulative_over_days_per_delay[:-8]) # sum of 8 preceeding days
n_x = numpy.diagonal(counts8)
N_x = counts8.sum(1)
g_hat = n_x / N_x
F_hat = (1-g_hat[::-1]).cumprod()[::-1]
imputed_total = known_cases.add(imputed, fill_value=0)
nowcast = imputed_total.copy()
for i in range(1, len(F_hat)):
nowcast.iloc[-i] /= F_hat[i]
return nowcast
def qualitaetschecks(large_df):
with pyplot.style.context("seaborn"):
dstr = large_df.Datenstand.max().date().isoformat()
fn = f'./data/nowcast_imputed_known_from_graph_{dstr}.csv'
if not os.path.exists(fn):
print("Keine Vergleichsdaten vorhanden")
return
df_vergleich = pandas.read_csv(fn, index_col=0)
df_vergleich.index = pandas.to_datetime(df_vergleich.index)
faelle_angegeben = faelle_mit_erkrankungsbeginn_nach_erkrankungsbeginn(large_df)
smooth_known = smooth(faelle_angegeben[df_vergleich.known.index.min():df_vergleich.known.index.max()])
plot_vergleich(faelle_angegeben, df_vergleich.known, 'angegebener Erkrankungsbeginn',
p_output / f'angegeben_vs_rki.{dstr}.svg')
unknown_new_cases_per_day = faelle_ohne_erkrankungsbeginn_nach_rekonstruiertem_eingangsdatum(large_df)
nach_meldedatum = large_df[(large_df.Datenstand == large_df.Datenstand.max())
& (large_df.IstErkrankungsbeginn == 0)
& (large_df.NeuerFall >= 0)].groupby('Meldedatum').AnzahlFall.sum()
pyplot.figure()
pyplot.title('Fälle ohne Erkrankungsbeginn')
pyplot.plot(nach_meldedatum['2020-04-28':], label='nach Meldedatum')
pyplot.plot(unknown_new_cases_per_day.sum()['2020-04-28':], label='nach vermutetem Übermittlungsdatum')
pyplot.xticks(rotation=90)
pyplot.legend()
pyplot.savefig(p_output / f'melde_vs_uebermittlung.{dstr}.svg', bbox_inches = "tight")
pyplot.show()
pyplot.close()
delay_known_new_cases = verzug_von_erkrankung_bis_eingang(large_df)
pyplot.figure()
pyplot.title('Verteilung der Zeit zwischen Erkrankung und Eingang am RKI')
new_delays = delay_known_new_cases.sum(level=2).loc[:, '2020-04-29':]
pyplot.bar(new_delays.index, new_delays.sum(1)/new_delays.sum().sum(), alpha=0.5, label='neuere per Datenstand')
old_delays = delay_known_new_cases.sum(level=2).loc[:, :'2020-04-28']
pyplot.bar(new_delays.index, old_delays.sum(1)/old_delays.sum().sum(), alpha=0.5, label='ältere per Meldedatum')
pyplot.legend()
pyplot.savefig(p_output / f'erkrankung_uebermittlung_verzoegerung.{dstr}.svg', bbox_inches = "tight")
pyplot.show()
pyplot.close()
imputed_x = imputiere_erkrankungsdaten(unknown_new_cases_per_day, delay_known_new_cases,
completely_missing_tol=1, reuse_tol=300)
imputed_erkr = imputed_x.sum()
plot_vergleich(imputed_erkr, df_vergleich.imputed - df_vergleich.known, '(nur) imputierte Fälle',
p_output / f'imputiert_vs_rki.{dstr}.svg')
delay_cases_per_onset = verzug_von_erkrankung_bis_eingang_nach_erkrankung(large_df)
# reweight delays by imputed cases.
factors = (imputed_x.sum(level=(1,2)) / delay_cases_per_onset.sum(level=(0,1))).fillna(0)+1
delay_imputed_cases = delay_cases_per_onset * factors
nowcast = nowcast_lawless(faelle_angegeben, imputed_erkr, delay_imputed_cases)
plot_vergleich(nowcast, df_vergleich.nowcast, 'Nowcast (gesamt)', p_output / f'nowcast_vs_rki.{dstr}.svg')
if INTERACTIVE:
qualitaetschecks(large_df)
Keine Vergleichsdaten vorhanden
def plot_nowcast(known, imputed, nowcast, title, min_date='2020-03-02', chop_days=3, fn=None):
pyplot.figure(figsize=(15,5))
kno_s = smooth(known)[min_date:][:-chop_days]
imp_s = smooth(imputed.add(known, fill_value=0))[min_date:][:-chop_days] # nowcast ist brutto, imputed netto...
now_s = smooth(nowcast)[min_date:][:-chop_days]
r_4 = now_s[-1]/now_s[-5]
now_s7 = smooth(nowcast, size=7)[min_date:][:-chop_days]
r_7 = now_s7[-1]/now_s7[-5]
title += f" $R_4={r_4:.2f}$, $R_7={r_7:.2f}$"
pyplot.title(title)
pyplot.bar(now_s.index, now_s, label='nowcast')
pyplot.bar(imp_s.index, imp_s, label='imputiert')
pyplot.bar(kno_s.index, kno_s, label='angegeben')
pyplot.legend()
pyplot.xticks(rotation=90)
if fn is not None:
pyplot.savefig(fn, bbox_inches = "tight")
pyplot.show()
pyplot.close()
return r_4, r_7
def plot_r(nowcast, num_days=15, chop_days=3):
snc = smooth(nowcast)[-num_days-chop_days:-chop_days]
snc7 = smooth(nowcast, size=7)[-num_days-chop_days:-chop_days]
pyplot.plot(snc[4:]/snc[:-4].values, label='4-Tages-R')
pyplot.plot(snc7[4:]/snc7[:-4].values, label='7-Tages-R')
pyplot.xticks(rotation=90)
pyplot.legend()
pyplot.close()
#plot_r(nowcast)
delay_known_new_cases = verzug_von_erkrankung_bis_eingang(large_df)
delay_known_new_cases_per_refdatum = verzug_von_erkrankung_bis_eingang_nach_erkrankung(large_df)
def compute_and_plot(df, title, delay_known_new_cases, min_date='2020-03-02', chop_days=3):
dstr = df.Datenstand.max().date().isoformat()
known = faelle_mit_erkrankungsbeginn_nach_erkrankungsbeginn(df)
short = bl_abbr.get(title, title).lower().replace(' ', '_')
unknown_new_cases_per_day = faelle_ohne_erkrankungsbeginn_nach_rekonstruiertem_eingangsdatum(
df, missing_tol=20)
imputed_x = imputiere_erkrankungsdaten(unknown_new_cases_per_day, delay_known_new_cases,
completely_missing_tol=1, reuse_tol=300)
imputed = imputed_x.sum()
# reweight delays by imputated cases.
factors = (imputed_x.sum(level=(1,2)) / delay_known_new_cases_per_refdatum.sum(level=(0,1))).fillna(0)+1
delay_imputed_cases = delay_known_new_cases_per_refdatum * factors
nowcast = nowcast_lawless(known, imputed, delay_imputed_cases)
fn = p_output / f'nowcast.{short}.{dstr}.svg'
r_4, r_7 = plot_nowcast(known, imputed, nowcast, title+' '+dstr, fn=fn)
copyfile(fn, p_output / f'nowcast.{short}.latest.svg')
imputed_total = imputed.add(known, fill_value=0)
kno_s = smooth(known)[min_date:][:-chop_days]
imp_s = smooth(imputed_total)[min_date:][:-chop_days] # nowcast ist brutto, imputed netto...
now_s = smooth(nowcast)[min_date:][:-chop_days]
return (pandas.DataFrame({
short+ '_bekannt_unsmooth': known[min_date:][:-chop_days],
short+ '_imputiert_unsmooth': imputed_total[min_date:][:-chop_days],
short+ '_nowcast_unsmooth': nowcast[min_date:][:-chop_days],
short+ '_bekannt_smooth': kno_s,
short+ '_imputiert_smooth': imp_s,
short+ '_nowcast_smooth': now_s,
}).fillna(0), r_4, r_7) # sometimes we don't have cases
results = {}
for bl in list(bl_abbr.keys())+['Der Norden']:
if bl == 'Deutschland':
bl_df = large_df
elif bl == 'Der Norden':
bl_df = large_df[(large_df.Bundesland.isin({'Bremen', 'Hamburg', 'Mecklenburg-Vorpommern', 'Niedersachsen', 'Schleswig-Holstein'}))]
else:
bl_df = large_df[large_df.Bundesland == bl]
# nutzt globale Annahmen für Imputation/Nowcasting
with pyplot.style.context("seaborn"):
results[bl] = compute_and_plot(bl_df, bl, delay_known_new_cases)
7 reused: 195 completely missing: 0
1 reused: 28 completely missing: 0
18 reused: 47 completely missing: 1
1 reused: 2 completely missing: 0
3 reused: 2 completely missing: 0
3 reused: 7 completely missing: 0
1 reused: 8 completely missing: 0
4 reused: 8 completely missing: 0
0 reused: 0 completely missing: 0
4 reused: 20 completely missing: 0
11 reused: 32 completely missing: 0
2 reused: 25 completely missing: 0
2 reused: 11 completely missing: 0
2 reused: 0 completely missing: 0
11 reused: 2 completely missing: 0
5 reused: 2 completely missing: 1
2 reused: 0 completely missing: 0
6 reused: 37 completely missing: 1
results_df = pandas.concat([r[0] for r in results.values()], axis=1)
dstr = large_df.Datenstand.max().date().isoformat()
fn = p_output / f'nowcast_results.{dstr}.csv'
results_df.to_csv(fn)
copyfile(fn, p_output / 'nowcast_results.latest.csv')