Household data: Processing Notebook
This Notebook is part of the Household Data Package of Open Power System Data. |
This Notebook handles missing data, performs calculations and aggragations and creates the output files.
Executing this script till the end will create a new version of the data package.
The Version number specifies the local directory for the data
We include a note on what has been changed.
version = '2020-04-15'
changes = 'Second release of all CoSSMic households'
# Python modules
import os
import numpy as np
import pandas as pd
import json
import yaml
import sqlite3
import itertools
import hashlib
import pytz
from shutil import copyfile
from datetime import datetime, date, timedelta, time
# Reload modules with execution of any code, to avoid having to restart
# the kernel after editing timeseries_scripts
%load_ext autoreload
%autoreload 2
# make sure the working directory is this file's directory
try:
os.chdir(home_path)
except NameError:
home_path = os.getcwd()
config_path = os.path.join(home_path, 'conf')
data_path = os.path.join(home_path, 'household_data', version, 'original_data')
out_path = os.path.join(home_path, 'household_data', version)
temp_path = os.path.join(home_path, 'household_data', 'temp')
os.makedirs(out_path, exist_ok=True)
os.makedirs(temp_path, exist_ok=True)
os.chdir(temp_path)
import logging.config
logging_file = os.path.join(config_path, 'logging.cfg')
logging.config.fileConfig(logging_file)
# Scripts from household repository package
from household.download import download
from household.read import read
from household.tools import update_sets
from household.validation import validate
from household.visualization import visualize
from household.imputation import make_equidistant, fill_nan, resample_markers
from household.make_json import make_json
# Additional verbosity like printing out additional CSV files to verify feed integrity
verbose = False
Select the time range to read and process data.
Default: all data.
Type None
to process all available data.
start_from_user = None # i.e. date(2017, 1, 1)
end_from_user = None # i.e. date(2017, 1, 31)
The raw data can be downloaded as a zip file from the OPSD Server. To do this, specify an archive version to use, that has been cached on the OPSD server as input.
archive_version = '2020-04-15'
Optionally, specify a subset of households to process.
The next cell prints the available sources and datasets.
with open(os.path.join(config_path, 'households.yml'), 'r') as f:
households = yaml.load(f.read(), Loader=yaml.FullLoader)
for household_name, household_data in households.items():
household_id = household_name.replace(' ', '').lower()
household_data['id'] = household_id
household_data['name'] = household_name
print(yaml.dump({household_name: list(household_data['series'].keys())}, default_flow_style=False))
Copy from its output and paste to following cell to get the right format.
Type subset = None
to ignore any subset selection and include all data.
subset = yaml.load('''
Example household:
- example_series
''', Loader=yaml.FullLoader)
subset = None
Now eliminate households and feeds not in subset.
if subset: # eliminate households and feeds not in subset
households = {household_name: {k: v for k, v in households[household_name].items()}
for household_name, series_list in subset.items()}
for household_name in subset.keys():
series_ignore = []
for series_name in households[household_name]['series']:
if not series_name in subset[household_name]:
series_ignore.append(series_name)
for series_name in series_ignore:
del households[household_name]['series'][series_name]
This section: download raw data to process.
If the original data does not exist, it will be downloaded from the OPSD Server and extracted in a local directory
download(out_path, version=archive_version)
This section: Read each downloaded file into a pandas-DataFrame and merge data from different sources if it has the same time resolution.
Set the title of the rows at the top of the data used to store metadata internally. The order of this list determines the order of the levels in the resulting output.
headers = ['region', 'household', 'type', 'unit', 'feed']
Loop through households and feeds to do the reading
# For each source in the household dictionary
household_data = {}
for household in households.values():
data = read(household['name'], household['dir'], household['region'], household['type'],
household['series'], headers,
start_from_user=start_from_user,
end_from_user=end_from_user)
household_data[household['id']] = data
With the raw data being the unvalidated measurements of a research prototype under development, certain steps should be taken, to remove clearly incorrect measured data points or react to other events, such as the counter reset of an energy meter.
Save the validated DataFrames to disk. This way you have the raw data to fall back to if something goes wrong in the remainder of this notebook without having to repeat the previos steps.
os.makedirs('raw_data', exist_ok=True)
for household in households.values():
data = validate(household, household_data[household['id']], config_dir=config_path, verbose=verbose)
data.columns.names = headers
data.to_pickle(os.path.join('raw_data', household['id']+'.pickle'))
Load the data series saved above
household_data = {}
for household in households.values():
household_data[household['id']] = pd.read_pickle(os.path.join('raw_data', household['id']+'.pickle'))
This section: Handle missing data and aggregation to a regular, equidistant index, as well as 15 and 60 minute intervals.
Most of the acquired household data comes in irregular time intervals and is highly unsynchronized for different data series.
To improve readability and comparability, regular intervals will be assumed and interpolated according to existing values. Gaps greater than 15 minutes will be left untouched.
os.makedirs('fixed_data', exist_ok=True)
for household in households.values():
data = make_equidistant(household, household_data[household['id']], 1)
data.to_pickle(os.path.join('fixed_data', household['id']+'.pickle'))
Load the equidistant data series
household_data = {}
for household in households.values():
household_data[household['id']] = pd.read_pickle(os.path.join('fixed_data', household['id']+'.pickle'))
Patch missing data. At this stage, only small gaps (up to 1 hour) are filled by linear interpolation. This catched most of the missing data due to daylight savings time transitions or failed radio transmissions, while filling bigger gaps with values from the prior day or prior week.
The exact locations of missing data are stored in the data_nan
DataFrames.
Where data has been interpolated, it is marked in a new column comment
. For eaxample the comment residential_004_pv;
means that in the original data, there is a gap in the solar generation timeseries from the Resident 4 in the time period where the marker appears.
Patch the datasets and display the location of missing Data in the original data.
os.makedirs('filled_data', exist_ok=True)
for household in households.values():
data, data_nan = fill_nan(household_data[household['id']], household['name'], headers, config_dir=config_path)
data.to_pickle(os.path.join('filled_data', household['id']+'.pickle'))
writer = pd.ExcelWriter(os.path.join('filled_data', household['id']+'_NaN.xlsx'))
data_nan.to_excel(writer, 'NaN')
writer.save()
if verbose:
visualize(data)
Load all patched data sets
data_sets = {}
with open(os.path.join(config_path, 'households.yml'), 'r') as f:
households_full = yaml.load(f.read(), Loader=yaml.FullLoader)
for household_name in households_full:
household_id = household_name.replace(' ', '').lower()
#data_file = os.path.join('raw_data', household_id+'.pickle')
#if os.path.isfile(data_file):
# data = pd.read_pickle(data_file)
# update_sets('raw', data, data_sets)
data_file = os.path.join('filled_data', household_id+'.pickle')
if os.path.isfile(data_file):
data = pd.read_pickle(data_file)
update_sets('1min', data, data_sets)
As some data comes in 1-minute, other (older series) in 3-minute intervals, a harmonization can be done. With most billing intervals being at 15-minute and 60-minute ranges, a resampling to those intervals can be done to improve the overview over the data.
The marker column is resampled separately in such a way that all information on where data has been interpolated is preserved.
To do this, condense the marker column to both 15 and 60 minutes resolution, resample the series and update the condensed markers
data = data_sets['1min']
minute = data.index[0].minute + (15 - data.index[0].minute % 15)
hour = data.index[0].hour
if(minute > 59):
minute = 0
hour += 1
start_15 = data.index[0].replace(hour=hour, minute=minute, second=0)
minute = data.index[-1].minute + (15 - data.index[-1].minute % 15)
hour = data.index[-1].hour
if(minute > 59):
minute = 0
hour += 1
end_15 = data.index[-1].replace(hour=hour, minute=minute, second=0)
index_15 = pd.date_range(start=start_15, end=end_15, freq='15min')
marker_15 = data['interpolated'].groupby(
pd.Grouper(freq='15min', closed='left', label='left')
).agg(resample_markers).reindex(index_15)
data_sets['15min'] = data.resample('15min').last()
data_sets['15min']['interpolated'] = marker_15
start_60 = data.index[0].replace(minute=0, second=0)
end_60 = data.index[-1].replace(minute=0, second=0)
index_60 = pd.date_range(start=start_60, end=end_60, freq='60min')
marker_60 = data['interpolated'].groupby(
pd.Grouper(freq='60min', closed='left', label='left')
).agg(resample_markers).reindex(index_60)
data_sets['60min'] = data.resample('60min').last()
data_sets['60min']['interpolated'] = marker_60
The index column of th data sets defines the start of the timeperiod represented by each row of that data set in UTC time. We include an additional column for the CE(S)T Central European (Summer-) Time, as this might help aligning the output data with other data sources.
info_cols = {'utc': 'utc_timestamp',
'cet': 'cet_cest_timestamp',
'marker': 'interpolated'}
for res_key, df in data_sets.items():
if df.empty:
continue
if df.index.tzinfo is None or df.index.tzinfo.utcoffset(df.index) is None:
df.index = df.index.tz_localize('UTC')
df.index.rename(info_cols['utc'], inplace=True)
df.insert(0, info_cols['cet'], df.index.tz_convert('Europe/Berlin'))
Create a final savepoint
os.makedirs('final_data', exist_ok=True)
for res_key, data_set in data_sets.items():
data_set.to_pickle(os.path.join('final_data', res_key+'.pickle'))
data_sets = {}
#data_sets['raw'] = pd.read_pickle(os.path.join('final_data', 'raw.pickle'))
data_sets['1min'] = pd.read_pickle(os.path.join('final_data', '1min.pickle'))
data_sets['15min'] = pd.read_pickle(os.path.join('final_data', '15min.pickle'))
data_sets['60min'] = pd.read_pickle(os.path.join('final_data', '60min.pickle'))
This section: create the metadata, both general and column-specific. All metadata we be stored as a JSON file.
# change to out_path directory
os.chdir(out_path)
os.getcwd()
make_json(data_sets, info_cols, version, changes, headers)
This section: Save as Data Package (data in CSV, metadata in JSON file). All files are saved in the directory of this notebook. Alternative file formats (SQL, XLSX) are also exported. Takes about 1 hour to run.
Cut off the data outside of [start_from_user:end_from_user]
for res_key, df in data_sets.items():
# First, convert userinput to UTC time to conform with data_set.index
if start_from_user:
start_from_user = (
pytz.timezone('Europe/Berlin')
.localize(datetime.combine(start_from_user, time()))
.astimezone(pytz.timezone('UTC')))
if end_from_user and 'min' in res_key:
end_from_user = (
pytz.timezone('Europe/Berlin')
.localize(datetime.combine(end_from_user, time()))
.astimezone(pytz.timezone('UTC'))
# appropriate offset to inlude the end of period
+ timedelta(days=1, minutes=-int(res_key[:res_key.index('min')])))
# Then cut off the data_set
data_sets[res_key] = df.loc[start_from_user:end_from_user, :]
Convert the UTC, as well as the Central European Time indexes to ISO-8601
for res_key, df in data_sets.items():
df.iloc[:,0] = df.iloc[:,0].dt.strftime('%Y-%m-%dT%H:%M:%S%z')
Data are provided in three different "shapes":
The different shapes need to be created internally befor they can be saved to files. Takes about 1 minute to run.
data_sets_singleindex = {}
data_sets_multiindex = {}
data_sets_stacked = {}
for res_key, df in data_sets.items():
# MultIndex
data_sets_multiindex[res_key + '_multiindex'] = df
# SingleIndex
df_singleindex = df.copy()
# use first 3 levels of multiindex to create singleindex
df_singleindex.columns = [
col[0] if col[0] in info_cols.values()
else next(iter([l for l in col if l in df.columns.get_level_values('region')] or []), None) + \
'_' + next(iter([l for l in col if l in df.columns.get_level_values('household')] or []), None) + \
'_' + next(iter([l for l in col if l in df.columns.get_level_values('feed')] or []), None)
for col in df.columns.values]
data_sets_singleindex[res_key + '_singleindex'] = df_singleindex
# Stacked
stacked = df.copy()
stacked.drop(info_cols['cet'], axis=1, inplace=True)
stacked.columns = stacked.columns.droplevel(['region', 'type', 'unit'])
stacked = stacked.transpose().stack(dropna=True).to_frame(name='data')
data_sets_stacked[res_key + '_stacked'] = stacked
This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete.
for res_key, df in data_sets_singleindex.items():
if res_key.startswith('raw'):
continue
df_sql = df.copy()
df_sql.index = df_sql.index.strftime('%Y-%m-%dT%H:%M:%SZ')
table = 'household_data_' + res_key
df_sql.to_sql(table, sqlite3.connect('household_data.sqlite'),
if_exists='replace', index_label=info_cols['utc'])
Writing the full tables to Excel takes extremely long. As a workaround, only the first 5 rows are exported. The rest of the data can than be inserted manually from the _multindex.csv
files.
writer = pd.ExcelWriter('household_data.xlsx')
for res_key, df in data_sets_multiindex.items():
if res_key.startswith('raw') or res_key.startswith('1min'):
# Excel max sheet size is 1048576 rows, while raw and 1min resolution data has a lot more
continue
df_csv = df.copy()
df_csv.index = df_csv.index.strftime('%Y-%m-%dT%H:%M:%SZ')
df_csv.to_excel(writer, res_key.split('_')[0], float_format='%.3f',
merge_cells=True)
writer.save()
# itertoools.chain() allows iterating over multiple dicts at once
for res_key, df in itertools.chain(
data_sets_singleindex.items(),
data_sets_multiindex.items(),
data_sets_stacked.items()
):
filename = 'household_data_' + res_key + '.csv'
df.to_csv(filename, float_format='%.3f',
date_format='%Y-%m-%dT%H:%M:%SZ')
We publish SHA-checksums for the outputfiles on GitHub to allow verifying the integrity of outputfiles on the OPSD server.
def get_sha_hash(path, blocksize=65536):
sha_hasher = hashlib.sha256()
with open(path, 'rb') as f:
buffer = f.read(blocksize)
while len(buffer) > 0:
sha_hasher.update(buffer)
buffer = f.read(blocksize)
return sha_hasher.hexdigest()
files = os.listdir(out_path)
# Create checksums.txt in the output directory
with open('checksums.txt', 'w') as f:
for file_name in files:
if file_name.split('.')[-1] in ['csv', 'sqlite', 'xlsx']:
file_hash = get_sha_hash(file_name)
f.write('{},{}\n'.format(file_name, file_hash))
# Copy the file to root directory from where it will be pushed to GitHub,
# leaving a copy in the version directory for reference
copyfile('checksums.txt', os.path.join(home_path, 'checksums.txt'))