import pandas as pd
# a hypothetical ESM dataset to be combined
columns = ['baz', 'bar', 'foo', 'path']
df = pd.DataFrame([['X', 'A', '1', 'file1'],
['X', 'A', '2', 'file2'],
['X', 'B', '1', 'file3'],
['X', 'B', '2', 'file4'],
['Y', 'A', '1', 'file5'],
['Y', 'A', '2', 'file6'],
['Y', 'B', '1', 'file7'],
['Y', 'B', '2', 'file8'],],
columns=columns)
df
baz | bar | foo | path | |
---|---|---|---|---|
0 | X | A | 1 | file1 |
1 | X | A | 2 | file2 |
2 | X | B | 1 | file3 |
3 | X | B | 2 | file4 |
4 | Y | A | 1 | file5 |
5 | Y | A | 2 | file6 |
6 | Y | B | 1 | file7 |
7 | Y | B | 2 | file8 |
# dummy functions for opening / merging data
# in the real world these are xarray functions
def open_dataset(d):
return f'OPEN({d})'
def join_new(*d):
return 'join_new(' + ', '.join(d) + ')'
def join_existing(*d):
return 'join_existing(' + ', '.join(d) + ')'
def union(*d):
return 'union(' + ', '.join(d) + ')'
# a hypothetical esm-collection json snippet telling us how to aggregate
aggregations = [
{
"type": "union",
"attribute_name": "baz"
},
{
"type": "join_existing",
"attribute_name": "bar"
},
{
"type": "join_new",
"attribute_name": "foo",
},
]
# convert json to a dictionary with columns as keys
aggregation_dict = {}
for agg in aggregations:
key = agg['attribute_name']
rest = agg.copy()
del rest['attribute_name']
aggregation_dict[key] = rest
agg_columns = list(aggregation_dict.keys())
# the number of aggregation columns determines the level of recursion
n_agg = len(agg_columns)
aggregation_dict
{'baz': {'type': 'union'}, 'bar': {'type': 'join_existing'}, 'foo': {'type': 'join_new'}}
# create a multiindex on the dataframe
mi = df.set_index(agg_columns)['path']
def to_nested_dict(df):
"""Converts a multiindex series to nested dict"""
if hasattr(df.index, 'levels') and len(df.index.levels) > 1:
ret = {}
for k, v in df.groupby(level=0):
ret[k] = to_nested_dict(v.droplevel(0))
return ret
else:
return df.to_dict()
nd = to_nested_dict(mi)
nd
{'X': {'A': {'1': 'file1', '2': 'file2'}, 'B': {'1': 'file3', '2': 'file4'}}, 'Y': {'A': {'1': 'file5', '2': 'file6'}, 'B': {'1': 'file7', '2': 'file8'}}}
agg_function_map = {'union': union,
'join_existing': join_existing,
'join_new': join_new}
def apply_aggregation(v, level=0):
"""Recursively descend into nested dictionary and aggregate items.
level tells how deep we are."""
assert level <= n_agg
if level == n_agg:
# bottom of the hierarchy - should be an actual path at this point
return open_dataset(v)
else:
agg_column = agg_columns[level]
agg_function = agg_function_map[aggregation_dict[agg_column]['type']]
# we don't actually care about the keys
dsets = [apply_aggregation(w, level=level+1)
for w in v.values()]
return agg_function(*dsets)
apply_aggregation(nd)
'union(join_existing(join_new(OPEN(file1), OPEN(file2)), join_new(OPEN(file3), OPEN(file4))), join_existing(join_new(OPEN(file5), OPEN(file6)), join_new(OPEN(file7), OPEN(file8))))'