We would like to create mosaics of the DSWx product that are relatively cloud free over different parts of the world. In this notebook, we demonstrate generating these mosaics over Australia and California. The shapefile for CA was obtained from here.
# GIS imports
from datetime import datetime
import geopandas as gpd
import rasterio
from rasterio.merge import merge
from rasterio.crs import CRS
from rasterio.warp import transform_bounds, calculate_default_transform, reproject, Resampling
from shapely import Polygon
from shapely.geometry import box
import fiona
# misc imports
from pystac_client import Client
import numpy as np
from collections import defaultdict
from multiprocessing import Pool
from pathlib import Path
import os
import json
import sys
sys.path.append('../../../')
from src.dswx_utils import intersection_percent
# web imports
from urllib.request import urlopen
# URL of CMR service
STAC_URL = 'https://cmr.earthdata.nasa.gov/cloudstac/'
# Setup PySTAC client
provider_cat = Client.open(STAC_URL)
catalog = Client.open(f'{STAC_URL}/POCLOUD/')
collections = ['OPERA_L3_DSWX-HLS_V1']
# We would like to create mosaics for April 2023
date_range = "2023-04-01/2023-04-30"
start_date = datetime(2023, 4, 1) # in 2022-01-01 00:00:00 format
stop_date = datetime(2023, 4, 30) # in 2022-01-01 00:00:00 format
# Load the geometry for Australia to retrieve bounding boxes for our search
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
australia_shape = world[world['name']=='Australia']
bbox = australia_shape.iloc[0].geometry.bounds
aoi = box(bbox[0], bbox[1], bbox[2], bbox[3])
print(bbox)
search_params = {"collections": collections,
"intersects": aoi.__geo_interface__,
"datetime": [start_date, stop_date],
"max_items": 1000}
search = catalog.search(**search_params)
items = search.get_all_items()
def filter_by_cloud_cover(item, threshold=10):
xml_url = item.assets['metadata'].href
# circumvent invalid URL
try:
response = urlopen(xml_url)
data_json = json.loads(response.read()) # the XML files are formatted as JSONs (?!), so we use a JSON reader
except:
return False
for item in data_json['AdditionalAttributes']:
if item['Name'] == 'PercentCloudCover':
break
c_cover = int(item['Values'][0])
if c_cover<=threshold:
return True
else:
return False
filtered_items = list(filter(filter_by_cloud_cover, items))
print(len(filtered_items))
def return_granule(item):
return item.assets['0_B01_WTR'].href
filtered_urls = list(map(return_granule, filtered_items))
print(len(filtered_urls))
output_path = Path('../data/australia')
if not output_path.exists():
output_path.mkdir(parents=True)
def download_data(file_list):
for f in file_list:
os.system(f"wget {f} -nc -q -P {output_path}") # don't clobber, and download quietly
# We can parallelize the downloads
url_chunks = np.array_split(filtered_urls, 1)
with Pool() as pool:
_ = pool.map(download_data, url_chunks)
nfiles = len(list(output_path.glob('OPERA*.tif')))
print(nfiles)
# DSWx files have colormaps associated with them. Let's save it for later use
with rasterio.open(list(output_path.glob('OPERA*.tif'))[0]) as ds:
dswx_colormap = ds.colormap(1)
files_by_crs = defaultdict(list)
for f in [f for f in output_path.iterdir() if f.is_dir()]:
files_by_crs[f.name] = list(f.glob("OPERA*.tif"))
# Organize downloaded into folders by CRS
files_by_crs = defaultdict(list)
for i, f in enumerate(list(output_path.glob('*.tif'))):
with rasterio.open(f) as ds:
files_by_crs[ds.profile['crs'].to_string()].append(f)
def organize_by_crs(crs, file_list):
current_output_path = output_path/crs
if not current_output_path.exists():
current_output_path.mkdir()
for f in file_list:
f.rename(current_output_path/f.name)
_ = list(map(organize_by_crs, files_by_crs.keys(), files_by_crs.values()))
# This function will take a list of files in the same CRS and mosaic them, and then reproject it to
# EPSG:4326.
def process_file_batch(epsg_code, file_batch, output_filename, resolution_reduction_factor = 2):
dst_crs = 'EPSG:4326'
merged_img, merged_transform = merge(file_batch, method='min')
merged_output_bounds = rasterio.transform.array_bounds(merged_img.shape[-2], merged_img.shape[-1] , merged_transform)
kwargs = {
"src_crs": epsg_code,
"dst_crs": dst_crs,
"width":merged_img.shape[-1],
"height": merged_img.shape[-2],
"left": merged_output_bounds[0],
"bottom": merged_output_bounds[1],
"right": merged_output_bounds[2],
"top": merged_output_bounds[3],
"dst_width": merged_img.shape[-1]//resolution_reduction_factor,
"dst_height":merged_img.shape[-2]//resolution_reduction_factor
}
dst_transform, width, height = calculate_default_transform(**kwargs)
with rasterio.open(file_batch[0]) as src:
dst_kwargs = src.profile.copy()
dst_kwargs.update({
'height':height,
'width':width,
'transform':dst_transform,
'crs':dst_crs
})
with rasterio.open(output_filename, 'w', **dst_kwargs) as dst:
reproject(
source = merged_img,
destination = rasterio.band(dst, 1),
src_transform = merged_transform,
dst_transform = dst_transform,
src_crs = src.crs,
dst_crs = dst_crs,
resampling=Resampling.nearest
)
dst.write_colormap(1, dswx_colormap)
return output_filename
# Mosaicking a large number of files in one attempt will be time and memory intensive.
# Instead, we can mosaic chunks in parallel, and reduce the resolution of each mosaic by a factor of 2
# The mosaics generated in this step can then be subsequently combined similarly
for key in files_by_crs.keys():
mosaic_folder = (output_path/key/'mosaics')
mosaic_folder.mkdir(parents=True, exist_ok=True)
filenames = list((output_path/key).glob('*.tif'))
filename_chunks = np.array_split(filenames, 30)
output_filename = 'temp_{}_{}.tif'
function_inputs = []
function_inputs = [(key, chunk, mosaic_folder/output_filename.format(key, str(count).zfill(4))) for count, chunk in enumerate(filename_chunks) if len(chunk) > 0]
with Pool() as pool:
output_files = pool.starmap(process_file_batch, function_inputs)
mosaic_list = []
for folder in output_path.iterdir():
if folder.name == 'outputs':
pass
for file in list((folder /'mosaics').glob('*.tif')):
mosaic_list.append(file)
final_mosaic_path = Path('../data/australia/outputs')
if not final_mosaic_path.exists():
final_mosaic_path.mkdir()
process_file_batch('EPSG:4326', mosaic_list, Path(final_mosaic_path / 'final_mosaic.tif'))
ca_state = gpd.read_file('../data/shapefiles/california_state/CA_State_TIGER2016.shp')
ca_geom = ca_state.iloc[0].geometry
ca_bounds = ca_geom.bounds
if ca_state.crs.to_epsg() != 4326:
ca_bounds = transform_bounds(CRS.from_epsg(ca_state.crs.to_epsg()), CRS.from_epsg(4326), *ca_bounds)
print(ca_bounds)
aoi = box(ca_bounds[0], ca_bounds[1], ca_bounds[2], ca_bounds[3])
# Searching for DSWx over CA
# search options can use either a bounding box or intersection with a shape, see below
# URL of CMR service
STAC_URL = 'https://cmr.earthdata.nasa.gov/cloudstac/'
# Setup PySTAC client
provider_cat = Client.open(STAC_URL)
catalog = Client.open(f'{STAC_URL}/POCLOUD/')
collections = ['OPERA_L3_DSWX-HLS_V1']
date_range = "2023-04-01/2023-04-30"
start_date = datetime(2023, 4, 1) # in 2022-01-01 00:00:00 format
stop_date = datetime(2023, 4, 30) # in 2022-01-01 00:00:00 format
search_params = {"collections": collections,
"intersects": aoi.__geo_interface__,
"datetime": [start_date, stop_date],
"max_items": 1000}
search = catalog.search(**search_params)
items = search.get_all_items()
filtered_items = list(filter(filter_by_cloud_cover, items))
print(len(filtered_items))
filtered_urls = list(map(return_granule, filtered_items))
print(len(filtered_urls))
output_path = Path('../data/california')
files_by_crs = defaultdict(list)
for f in [f for f in output_path.iterdir() if f.is_dir()]:
files_by_crs[f.name] = list(f.glob("OPERA*.tif"))
output_path = Path('../data/california')
if not output_path.exists():
output_path.mkdir(parents=True)
# We can parallelize the downloads
url_chunks = np.array_split(filtered_urls, 1)
with Pool() as pool:
_ = pool.map(download_data, url_chunks)
nfiles = len(list(output_path.glob('*.tif')))
print(nfiles)
# Organize downloaded into folders by CRS
files_by_crs = defaultdict(list)
for i, f in enumerate(list(output_path.glob('*.tif'))):
with rasterio.open(f) as ds:
files_by_crs[ds.profile['crs'].to_string()].append(f)
_ = list(map(organize_by_crs, files_by_crs.keys(), files_by_crs.values()))
first_file = output_path.glob(f'*/{files_by_crs[list(files_by_crs.keys())[0]][0].name}')
first_file = list(first_file)[0]
# DSWx files have colormaps associated with them. Let's save it for later use
with rasterio.open(first_file) as ds:
dswx_colormap = ds.colormap(1)
for key in files_by_crs.keys():
mosaic_folder = (output_path/key/'mosaics')
mosaic_folder.mkdir(parents=True, exist_ok=True)
filenames = list((output_path/key).glob('*.tif'))
filename_chunks = np.array_split(filenames, 30)
output_filename = 'temp_{}_{}.tif'
function_inputs = []
function_inputs = [(key, chunk, mosaic_folder/output_filename.format(key, str(count).zfill(4)), 1) for count, chunk in enumerate(filename_chunks) if len(chunk) > 0]
with Pool() as pool:
output_files = pool.starmap(process_file_batch, function_inputs)
mosaic_list = []
for folder in output_path.iterdir():
if folder.name == 'outputs':
pass
for file in list((folder /'mosaics').glob('*.tif')):
mosaic_list.append(file)
final_mosaic_path = Path('../data/california/outputs')
if not final_mosaic_path.exists():
final_mosaic_path.mkdir()
process_file_batch('EPSG:4326', mosaic_list, Path(final_mosaic_path / 'final_mosaic.tif'), 1)