We would like to create mosaics of the DSWx product that are relatively cloud free over different parts of the world. In this notebook, we demonstrate generating these mosaics over Australia and California. The shapefile for CA was obtained from here.
# GIS imports
import geopandas as gpd
import rasterio
from rasterio.merge import merge
from rasterio.crs import CRS
from rasterio.warp import transform_bounds, calculate_default_transform, reproject, Resampling
from shapely import Polygon
import fiona
# misc imports
from pystac_client import Client
import numpy as np
from collections import defaultdict
from multiprocessing import Pool
from pathlib import Path
import os
import json
# web imports
from urllib.request import urlopen
# URL of CMR service
STAC_URL = 'https://cmr.earthdata.nasa.gov/stac'
# Setup PySTAC client
provider_cat = Client.open(STAC_URL)
catalog = Client.open(f'{STAC_URL}/POCLOUD/')
collections = ["OPERA_L3_DSWX-HLS_PROVISIONAL_V1"]
# We would like to create mosaics for April 2023
date_range = "2023-04-01/2023-04-30"
# Load the geometry for Australia to retrieve bounding boxes for our search
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
australia_shape = world[world['name']=='Australia']
bbox = australia_shape.iloc[0].geometry.bounds
print(bbox)
opts = {
'bbox' : bbox,
'collections': collections,
'datetime' : date_range,
# querying by cloud cover does not work (04/27/23)
# We will instead filter results by parsing the associated XML files for each granule
# 'query':{
# 'eo:cloud_cover':{
# 'lt': 10
# },
# }
}
search = catalog.search(**opts)
items = search.get_all_items()
def filter_by_cloud_cover(item, threshold=10):
xml_url = item.assets['metadata'].href
response = urlopen(xml_url)
data_json = json.loads(response.read()) # the XML files are formatted as JSONs (?!), so we use a JSON reader
for item in data_json['AdditionalAttributes']:
if item['Name'] == 'PercentCloudCover':
break
c_cover = int(item['Values'][0])
if c_cover<=threshold:
return True
else:
return False
filtered_items = list(filter(filter_by_cloud_cover, items))
print(len(filtered_items))
604
def return_granule(item):
return item.assets['0_B01_WTR'].href
filtered_urls = list(map(return_granule, filtered_items))
print(len(filtered_urls))
output_path = Path('../data/australia')
if not output_path.exists():
output_path.mkdir(parents=True)
def download_data(file_list):
for f in file_list:
try:
os.system(f"wget {f} -nc -q -P {output_path}") # don't clobber, and download quietly
except:
pass
# We can parallelize the downloads
url_chunks = np.array_split(filtered_urls, 30)
with Pool() as pool:
_ = pool.map(download_data, url_chunks)
nfiles = len(list(output_path.glob('OPERA*.tif')))
print(nfiles)
# DSWx files have colormaps associated with them. Let's save it for later use
with rasterio.open(list(output_path.glob('OPERA*.tif'))[0]) as ds:
dswx_colormap = ds.colormap(1)
files_by_crs = defaultdict(list)
for f in [f for f in output_path.iterdir() if f.is_dir()]:
files_by_crs[f.name] = list(f.glob("OPERA*.tif"))
# Organize downloaded into folders by CRS
files_by_crs = defaultdict(list)
for i, f in enumerate(list(output_path.glob('*.tif'))):
with rasterio.open(f) as ds:
files_by_crs[ds.profile['crs'].to_string()].append(f)
def organize_by_crs(crs, file_list):
current_output_path = output_path/crs
if not current_output_path.exists():
current_output_path.mkdir()
for f in file_list:
f.rename(current_output_path/f.name)
_ = list(map(organize_by_crs, files_by_crs.keys(), files_by_crs.values()))
# This function will take a list of files in the same CRS and mosaic them, and then reproject it to
# EPSG:4326.
def process_file_batch(epsg_code, file_batch, output_filename, resolution_reduction_factor = 2):
dst_crs = 'EPSG:4326'
merged_img, merged_transform = merge(file_batch, method='min')
merged_output_bounds = rasterio.transform.array_bounds(merged_img.shape[-2], merged_img.shape[-1] , merged_transform)
kwargs = {
"src_crs": epsg_code,
"dst_crs": dst_crs,
"width":merged_img.shape[-1],
"height": merged_img.shape[-2],
"left": merged_output_bounds[0],
"bottom": merged_output_bounds[1],
"right": merged_output_bounds[2],
"top": merged_output_bounds[3],
"dst_width": merged_img.shape[-1]//resolution_reduction_factor,
"dst_height":merged_img.shape[-2]//resolution_reduction_factor
}
dst_transform, width, height = calculate_default_transform(**kwargs)
with rasterio.open(file_batch[0]) as src:
dst_kwargs = src.profile.copy()
dst_kwargs.update({
'height':height,
'width':width,
'transform':dst_transform,
'crs':dst_crs
})
with rasterio.open(output_filename, 'w', **dst_kwargs) as dst:
reproject(
source = merged_img,
destination = rasterio.band(dst, 1),
src_transform = merged_transform,
dst_transform = dst_transform,
src_crs = src.crs,
dst_crs = dst_crs,
resampling=Resampling.nearest
)
dst.write_colormap(1, dswx_colormap)
return output_filename
# Mosaicking a large number of files in one attempt will be time and memory intensive.
# Instead, we can mosaic chunks in parallel, and reduce the resolution of each mosaic by a factor of 2
# The mosaics generated in this step can then be subsequently combined similarly
for key in files_by_crs.keys():
mosaic_folder = (output_path/key/'mosaics')
mosaic_folder.mkdir(parents=True, exist_ok=True)
filenames = list((output_path/key).glob('*.tif'))
filename_chunks = np.array_split(filenames, 30)
output_filename = 'temp_{}_{}.tif'
function_inputs = []
function_inputs = [(key, chunk, mosaic_folder/output_filename.format(key, str(count).zfill(4))) for count, chunk in enumerate(filename_chunks) if len(chunk) > 0]
with Pool() as pool:
output_files = pool.starmap(process_file_batch, function_inputs)
mosaic_list = []
for folder in output_path.iterdir():
if folder.name == 'outputs':
pass
for file in list((folder /'mosaics').glob('*.tif')):
mosaic_list.append(file)
final_mosaic_path = Path('../data/australia/outputs')
if not final_mosaic_path.exists():
final_mosaic_path.mkdir()
process_file_batch('EPSG:4326', mosaic_list, Path(final_mosaic_path / 'final_mosaic.tif'))
ca_state = gpd.read_file('../data/shapefiles/california_state/CA_State_TIGER2016.shp')
ca_geom = ca_state.iloc[0].geometry
ca_bounds = ca_geom.bounds
if ca_state.crs.to_epsg() != 4326:
ca_bounds = transform_bounds(CRS.from_epsg(ca_state.crs.to_epsg()), CRS.from_epsg(4326), *ca_bounds)
print(ca_bounds)
(-124.48201686078049, 32.52883673637252, -114.13122247508855, 42.00950826967186)
# Searching for DSWx over CA
# search options can use either a bounding box or intersection with a shape, see below
STAC_URL = 'https://cmr.earthdata.nasa.gov/stac'
# Setup PySTAC client
provider_cat = Client.open(STAC_URL)
catalog = Client.open(f'{STAC_URL}/POCLOUD/')
collections = ["OPERA_L3_DSWX-HLS_PROVISIONAL_V1"]
date_range = "2023-04-01/2023-04-30"
opts = {
'bbox' : ca_bounds,
# uncomment to search by shape
# however, this currently returns an API error (05/15/23)
# 'intersects' : ca_geom,
'collections': collections,
'datetime' : date_range,
# querying by cloud cover does not work (04/27/23)
# We will instead filter results by parsing the associated XML files for each granule
# 'query':{
# 'eo:cloud_cover':{
# 'lt': 10
# },
# }
}
search = catalog.search(**opts)
items = search.get_all_items()
filtered_items = list(filter(filter_by_cloud_cover, items))
print(len(filtered_items))
--------------------------------------------------------------------------- APIError Traceback (most recent call last) Cell In[45], line 27 12 opts = { 13 # 'bbox' : ca_bounds, 14 'intersects' : ca_geom, (...) 23 # } 24 } 26 search = catalog.search(**opts) ---> 27 items = search.get_all_items() 29 filtered_items = list(filter(filter_by_cloud_cover, items)) 30 print(len(filtered_items)) File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac_client/item_search.py:844, in ItemSearch.get_all_items(self) 832 """DEPRECATED 833 834 .. deprecated:: 0.4.0 (...) 838 item_collection : ItemCollection 839 """ 840 warnings.warn( 841 "get_all_items() is deprecated, use item_collection() instead.", 842 DeprecationWarning, 843 ) --> 844 return self.item_collection() File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac_client/item_search.py:745, in ItemSearch.item_collection(self) 737 """ 738 Get the matching items as a :py:class:`pystac.ItemCollection`. 739 740 Return: 741 ItemCollection: The item collection 742 """ 743 # Bypass the cache here, so that we can pass __preserve_dict__ 744 # without mutating what's in the cache. --> 745 feature_collection = self.item_collection_as_dict.__wrapped__(self) 746 # already signed in item_collection_as_dict 747 return ItemCollection.from_dict( 748 feature_collection, preserve_dict=False, root=self.client 749 ) File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac_client/item_search.py:766, in ItemSearch.item_collection_as_dict(self) 753 """ 754 Get the matching items as an item-collection-like dict. 755 (...) 763 Dict : A GeoJSON FeatureCollection 764 """ 765 features = [] --> 766 for page in self._stac_io.get_pages( 767 self.url, self.method, self.get_parameters() 768 ): 769 for feature in page["features"]: 770 features.append(feature) File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac_client/stac_api_io.py:248, in StacApiIO.get_pages(self, url, method, parameters) 236 def get_pages( 237 self, 238 url: str, 239 method: Optional[str] = None, 240 parameters: Optional[Dict[str, Any]] = None, 241 ) -> Iterator[Dict[str, Any]]: 242 """Iterator that yields dictionaries for each page at a STAC paging 243 endpoint, e.g., /collections, /search 244 245 Return: 246 Dict[str, Any] : JSON content from a single page 247 """ --> 248 page = self.read_json(url, method=method, parameters=parameters) 249 if not (page.get("features") or page.get("collections")): 250 return None File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac/stac_io.py:202, in StacIO.read_json(self, source, *args, **kwargs) 185 def read_json(self, source: HREF, *args: Any, **kwargs: Any) -> Dict[str, Any]: 186 """Read a dict from the given source. 187 188 See :func:`StacIO.read_text <pystac.StacIO.read_text>` for usage of (...) 200 given source. 201 """ --> 202 txt = self.read_text(source, *args, **kwargs) 203 return self.json_loads(txt) File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac_client/stac_api_io.py:123, in StacApiIO.read_text(self, source, *args, **kwargs) 121 href = str(source) 122 if _is_url(href): --> 123 return self.request(href, *args, **kwargs) 124 else: 125 with open(href) as f: File ~/mambaforge/envs/mosaics/lib/python3.11/site-packages/pystac_client/stac_api_io.py:171, in StacApiIO.request(self, href, method, headers, parameters) 169 raise APIError(str(err)) 170 if resp.status_code != 200: --> 171 raise APIError.from_response(resp) 172 try: 173 return resp.content.decode("utf-8") APIError: {"message":"If the problem persists please contact cmr-support@earthdata.nasa.gov","errors":["An unexpected error occurred. We have been alerted and are working to resolve the problem.","request entity too large"]}
filtered_urls = list(map(return_granule, filtered_items))
print(len(filtered_urls))
output_path = Path('../data/california')
files_by_crs = defaultdict(list)
for f in [f for f in output_path.iterdir() if f.is_dir()]:
files_by_crs[f.name] = list(f.glob("OPERA*.tif"))
output_path = Path('../data/california')
if not output_path.exists():
output_path.mkdir(parents=True)
# We can parallelize the downloads
url_chunks = np.array_split(filtered_urls, 30)
with Pool() as pool:
_ = pool.map(download_data, url_chunks)
nfiles = len(list(output_path.glob('*.tif')))
print(nfiles)
# Organize downloaded into folders by CRS
files_by_crs = defaultdict(list)
for i, f in enumerate(list(output_path.glob('*.tif'))):
with rasterio.open(f) as ds:
files_by_crs[ds.profile['crs'].to_string()].append(f)
_ = list(map(organize_by_crs, files_by_crs.keys(), files_by_crs.values()))
# DSWx files have colormaps associated with them. Let's save it for later use
with rasterio.open(files_by_crs[list(files_by_crs.keys())[0]][0]) as ds:
dswx_colormap = ds.colormap(1)
for key in files_by_crs.keys():
mosaic_folder = (output_path/key/'mosaics')
mosaic_folder.mkdir(parents=True, exist_ok=True)
filenames = list((output_path/key).glob('*.tif'))
filename_chunks = np.array_split(filenames, 30)
output_filename = 'temp_{}_{}.tif'
function_inputs = []
function_inputs = [(key, chunk, mosaic_folder/output_filename.format(key, str(count).zfill(4)), 1) for count, chunk in enumerate(filename_chunks) if len(chunk) > 0]
with Pool() as pool:
output_files = pool.starmap(process_file_batch, function_inputs)
mosaic_list = []
for folder in output_path.iterdir():
if folder.name == 'outputs':
pass
for file in list((folder /'mosaics').glob('*.tif')):
mosaic_list.append(file)
final_mosaic_path = Path('../data/california/outputs')
if not final_mosaic_path.exists():
final_mosaic_path.mkdir()
process_file_batch('EPSG:4326', mosaic_list, Path(final_mosaic_path / 'final_mosaic.tif'), 1)