#!/usr/bin/env python # coding: utf-8 # ## Creating LiveOcean reference files using Kerchunk # # See https://github.com/fsspec/kerchunk for latest issues/changes and https://fsspec.github.io/kerchunk/ for documentation # In[1]: import fsspec import ujson from kerchunk.hdf import SingleHdf5ToZarr from kerchunk.combine import MultiZarrToZarr import pickle import dask.bag # Open existing reference file and check if new dates have been added to azure data store # In[2]: reference_file = 's3://esip-qhub-public/LiveOcean/LiveOcean_reference.json' #Location where final reference will be stored json_dir = 's3://esip-qhub-public/LiveOcean/individual/' #folder where individual reference files will be stored # the esip-qhub-public bucket can be read from without credentials but requires credentials to write to # In[3]: fs = fsspec.filesystem(protocol='s3', anon = False) #Filesystem where references are saved to fs_data = fsspec.filesystem('abfs', account_name='pm2') #Filesystem to open Netcdf files # In[4]: nc_files = fs_data.glob('cas6-v0-u0kb/*/*.nc') #get all available netcdf files nc_files = [f for f in nc_files if not f.split('-')[-1].split('.')[0] == '0001'] #exclude 0001 files nc_files = sorted(['abfs://'+f for f in nc_files]) #prepend azure protocol # https://microsoft.github.io/AzureTipsAndTricks/blog/tip88.html etags are inteded for exactly this purpose, checking that the file has not been modified since last visit # In[5]: #get etags of all nc_files new_etags = {} for f in nc_files: new_etags[f] = fs_data.info(f)['etag'] #get previously saved dictionary containing etags at time of last update with fs.open('s3://esip-qhub-public/LiveOcean/etags.pickle', 'rb') as p: existing_etags = pickle.load(p) #get files that are new or updated files_to_process = [] for file in nc_files: if file in list(existing_etags): #if the file has previously existed if not new_etags[file] == existing_etags[file]: #check if it has been modified files_to_process.append(file) #add to update list if modified else: #do nothing pass else: files_to_process.append(file) #if the file has newly been added, add to update list # In[6]: print(len(nc_files)) print(len(files_to_process)) # Generate individual reference files for this new data # In[ ]: so = dict(anon=False, skip_instance_cache=True) #arguments to fs_data.open #create unique name for each individual reference json def file_name(file): p = file.split('/') fname = p[-1] dname = p[-2] return f'{json_dir}{dname}_{fname}.json' #generate the individual reference jsons. def gen_json(nc_file_url): with fs_data.open(nc_file_url, **so) as infile: h5chunks = SingleHdf5ToZarr(infile, nc_file_url, inline_threshold=300) # inline threshold adjusts the Size below which binary blocks are included directly in the output # a higher inline threshold can result in a larger json file but faster loading time outf = file_name(nc_file_url) with fs.open(outf, 'wb') as f: f.write(ujson.dumps(h5chunks.translate()).encode()); #write the individual references as jsons # In[ ]: # this spins up a cluster on qhub, if running locally rather use the commented out cell below this one import os import sys sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib')) import ebdpy as ebd profile = 'esip-qhub' region = 'us-west-2' endpoint = f's3.{region}.amazonaws.com' ebd.set_credentials(profile=profile, region=region, endpoint=endpoint) worker_max = 10 client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max, region=region, use_existing_cluster=True, adaptive_scaling=True, wait_for_cluster=False, worker_profile='Small Worker', propagate_env=True) # In[ ]: cluster.scale(30) # In[ ]: #from dask.distributed import Client, LocalCluster #cluster = LocalCluster() # Launches a scheduler and workers locally #client = Client(cluster) # Connect to distributed cluster and override default # In[ ]: ### updated_files = nc_files #to rurun whole thing to ensure everything is up to date # In[ ]: # here we use dask bag to queue up the parrallel computation. Similarly dask delayed could be use. b = dask.bag.from_sequence(files_to_process, npartitions=30 ) b1 = b.map(gen_json) # In[ ]: # run the gen_json computations _ = b1.compute(retries=5) # In[ ]: #close the cluster client.close() cluster.close() # Now we take these individual reference files and consolidate them into a single reference file. At present MultiZarrToZarr will only combine from the individual json reference files and can not append to the existing combined reference json. # In[ ]: json_list = fs.glob(f'{json_dir}*.json') #open all the available individual reference jsons. json_list = sorted(['s3://'+f for f in json_list]) #prepend the AWS S3 protocol # In[ ]: topts = dict(anon=True, skip_instance_cache=True) # target options for opening the reference jsons ropts = dict(account_name='pm2', skip_instance_cache=True) # remote options for opening the netcdf files # In[ ]: # create the combined json mzz = MultiZarrToZarr(json_list, remote_protocol = 'abfs', remote_options = ropts, target_options = topts, concat_dims = ['ocean_time'], identical_dims=['lat_psi','lat_rho','lat_u','lat_v', 'lon_psi','lon_rho','lon_u','lon_v'] ) # In[ ]: get_ipython().run_cell_magic('time', '', "#compute the combined json and write to storage. At present this can not be run in parallel. \nwith fs.open(reference_file, 'wb') as f:\n f.write(ujson.dumps(mzz.translate()).encode())\n") # In[ ]: #overwrite old etags with new ones with fs.open('s3://esip-qhub-public/LiveOcean/etags.pickle', 'wb') as p: pickle.dump(new_etags, p) # In[ ]: