See https://github.com/fsspec/kerchunk for latest issues/changes and https://fsspec.github.io/kerchunk/ for documentation
import fsspec
import ujson
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import pickle
import dask.bag
Open existing reference file and check if new dates have been added to azure data store
reference_file = 's3://esip-qhub-public/LiveOcean/LiveOcean_reference.json' #Location where final reference will be stored
json_dir = 's3://esip-qhub-public/LiveOcean/individual/' #folder where individual reference files will be stored
# the esip-qhub-public bucket can be read from without credentials but requires credentials to write to
fs = fsspec.filesystem(protocol='s3', anon = False) #Filesystem where references are saved to
fs_data = fsspec.filesystem('abfs', account_name='pm2') #Filesystem to open Netcdf files
nc_files = fs_data.glob('cas6-v0-u0kb/*/*.nc') #get all available netcdf files
nc_files = [f for f in nc_files if not f.split('-')[-1].split('.')[0] == '0001'] #exclude 0001 files
nc_files = sorted(['abfs://'+f for f in nc_files]) #prepend azure protocol
https://microsoft.github.io/AzureTipsAndTricks/blog/tip88.html etags are inteded for exactly this purpose, checking that the file has not been modified since last visit
#get etags of all nc_files
new_etags = {}
for f in nc_files:
new_etags[f] = fs_data.info(f)['etag']
#get previously saved dictionary containing etags at time of last update
with fs.open('s3://esip-qhub-public/LiveOcean/etags.pickle', 'rb') as p:
existing_etags = pickle.load(p)
#get files that are new or updated
files_to_process = []
for file in nc_files:
if file in list(existing_etags): #if the file has previously existed
if not new_etags[file] == existing_etags[file]: #check if it has been modified
files_to_process.append(file) #add to update list if modified
else: #do nothing
pass
else:
files_to_process.append(file) #if the file has newly been added, add to update list
print(len(nc_files))
print(len(files_to_process))
1416 0
Generate individual reference files for this new data
so = dict(anon=False, skip_instance_cache=True) #arguments to fs_data.open
#create unique name for each individual reference json
def file_name(file):
p = file.split('/')
fname = p[-1]
dname = p[-2]
return f'{json_dir}{dname}_{fname}.json'
#generate the individual reference jsons.
def gen_json(nc_file_url):
with fs_data.open(nc_file_url, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, nc_file_url, inline_threshold=300)
# inline threshold adjusts the Size below which binary blocks are included directly in the output
# a higher inline threshold can result in a larger json file but faster loading time
outf = file_name(nc_file_url)
with fs.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode()); #write the individual references as jsons
# this spins up a cluster on qhub, if running locally rather use the commented out cell below this one
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 10
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=True, wait_for_cluster=False,
worker_profile='Small Worker',
propagate_env=True)
cluster.scale(30)
#from dask.distributed import Client, LocalCluster
#cluster = LocalCluster() # Launches a scheduler and workers locally
#client = Client(cluster) # Connect to distributed cluster and override default
### updated_files = nc_files #to rurun whole thing to ensure everything is up to date
# here we use dask bag to queue up the parrallel computation. Similarly dask delayed could be use.
b = dask.bag.from_sequence(files_to_process, npartitions=30 )
b1 = b.map(gen_json)
# run the gen_json computations
_ = b1.compute(retries=5)
#close the cluster
client.close()
cluster.close()
Now we take these individual reference files and consolidate them into a single reference file. At present MultiZarrToZarr will only combine from the individual json reference files and can not append to the existing combined reference json.
json_list = fs.glob(f'{json_dir}*.json') #open all the available individual reference jsons.
json_list = sorted(['s3://'+f for f in json_list]) #prepend the AWS S3 protocol
topts = dict(anon=True, skip_instance_cache=True) # target options for opening the reference jsons
ropts = dict(account_name='pm2', skip_instance_cache=True) # remote options for opening the netcdf files
# create the combined json
mzz = MultiZarrToZarr(json_list,
remote_protocol = 'abfs',
remote_options = ropts,
target_options = topts,
concat_dims = ['ocean_time'],
identical_dims=['lat_psi','lat_rho','lat_u','lat_v',
'lon_psi','lon_rho','lon_u','lon_v']
)
%%time
#compute the combined json and write to storage. At present this can not be run in parallel.
with fs.open(reference_file, 'wb') as f:
f.write(ujson.dumps(mzz.translate()).encode())
#overwrite old etags with new ones
with fs.open('s3://esip-qhub-public/LiveOcean/etags.pickle', 'wb') as p:
pickle.dump(new_etags, p)