#!/usr/bin/env python # coding: utf-8 # In[1]: # require spatial partitioning features from dask-geopandas main get_ipython().system('pip install -qU git+https://github.com/geopandas/dask-geopandas') # # STAC query perf # # This notebook looks into the performance of the STAC section from Caleb's mosaiks tutorial. The problem: given a bunch of (semi-randomly distributed) Points, find the "best" STAC item, where # # 1. The STAC item covers the point # 2. probably other stuff... haven't checked. # # And do it quickly. Our baseline is ~30 minutes. # In[1]: import pandas as pd import dask_geopandas import geopandas import shapely.geometry import pystac_client # The dataset is 100,000 points in the US. # In[2]: df = pd.read_csv( "https://files.codeocean.com/files/verified/fa908bbc-11f9-4421-8bd3-72a4bf00427f_v2.0/data/int/applications/population/outcomes_sampled_population_CONTUS_16_640_UAR_100000_0.csv?download", # noqa: E501 index_col=0, na_values=[-999], ) points = df.dropna()[["lon", "lat"]] population = df.dropna()["population"] gdf = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.lon, df.lat)) gdf # # Option 1: Request with a semi-complex polygon # # Take the unary union of the points, request that. But there are a few problems: # # 1. We physically can't post the JSON representation of the MultiPoint polygon; it's too big. # 2. Even if we could, it's not clear how well the database would handle a very complex polygon like that. # # So we have to chunk up the requests. Option 1 does it naively, just take the union of the points in that chunk and use that as the `intersects`. # In[41]: N = 100 # also 200, 400 intersects = shapely.geometry.mapping(gdf.geometry[:N].unary_union) # In[ ]: import pystac_client search_start = "2018-01-01" search_end = "2019-12-31" catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1") # The time frame in which we search for non-cloudy imagery search = catalog.search( collections=["sentinel-2-l2a"], intersects=intersects, datetime=[search_start, search_end], query={"eo:cloud_cover": {"lt": 10}}, limit=500 ) get_ipython().run_line_magic('time', 'result = search.get_all_items_as_dict()') len(result["features"]) # See the "results" section below. Cursory profiling says we spend 91% waiting on I/O, 9% parsing JSON. # # ## Option 2: Parallelize... carefully. # # There are two issues with that previous approach: # # 1. The points are randomly distributed, so we'd re-fetch items a bunch of times. A single STAC item will cover mnay points. Caleb's code handled this by checking if a point was covered before fetching an item. # 2. It's serial. # # We'll try to parallelize the requests now, fetching multiple results at once. But if we spatially partitoin the data, we solve both those issues. Nearby points will be in the same partition, so we don't have to worry about re-fetching an item multiple times. And since there shouldn't be (much) overlap between partitions, we can do the fetching in parallel. # In[4]: import dask import tlz import dask.array # dask-geopandas implements spatial partitioning. This will assign an integer to each Point so that nearby points in lat / lon are nearby in hilbert space. We'll sort by that. # In[5]: get_ipython().run_cell_magic('time', '', 'ddf = dask_geopandas.from_geopandas(gdf, npartitions=1)\nhd = ddf.hilbert_distance().compute()\ngdf["hd"] = hd\ngdf = gdf.sort_values("hd")\n') # Then we partition the dataset (just using dask.array for the convenience functions of figuring out chunk sizes / slicing). We'll have 8 chunks. # In[6]: chunks = dask.array.core.normalize_chunks(len(gdf) // 8, (len(gdf),)) slices = dask.array.core.slices_from_chunks(chunks) slices = [x[0] for x in slices] gdfs = [gdf.geometry[x] for x in slices] # Each of these has a small footprint relative to the original. # In[7]: [x.unary_union.convex_hull.area / gdf.unary_union.convex_hull.area for x in gdfs] # Now do the results (these are "method 2" in the table below) # In[ ]: N = 100 # also 200, 400, len(gdfs[0]) intersects = shapely.geometry.mapping(gdfs[0].geometry[:N].unary_union) search_start = "2018-01-01" search_end = "2019-12-31" catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1") # The time frame in which we search for non-cloudy imagery search = catalog.search( collections=["sentinel-2-l2a"], intersects=intersects, datetime=[search_start, search_end], query={"eo:cloud_cover": {"lt": 10}}, limit=500 ) get_ipython().run_line_magic('time', 'result = search.get_all_items_as_dict()') len(result["features"]) # OK, now parallelize that. # In[166]: def f(intersects): search_start = "2018-01-01" search_end = "2019-12-31" catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1") # The time frame in which we search for non-cloudy imagery search = catalog.search( collections=["sentinel-2-l2a"], intersects=intersects, datetime=[search_start, search_end], query={"eo:cloud_cover": {"lt": 10}}, limit=500 ) return search.get_all_items_as_dict() # In[171]: from distributed import Client, wait # In[168]: client = Client() client # In[170]: get_ipython().run_line_magic('time', 'intersects_s = [shapely.geometry.mapping(x.geometry.unary_union) for x in gdfs]') # In[173]: futures = client.map(f, intersects_s) get_ipython().run_line_magic('time', '_ = wait(futures)') # In[174]: get_ipython().run_line_magic('time', 'results = [fut.result() for fut in futures]') # In[176]: import itertools # In[183]: results2 = list(itertools.chain(*[x["features"] for x in results])) # In[184]: len(results2) # ## Results # # | Method | n_points | n_items | time (s) | items/s | items / point # | ------ | -------- | -------- | -------- | ------- | ------------- | # | 1 | 100 | 3,819 | 15 | 254 | 38 | # | 1 | 200 | 6,783 | 42 | 162 | 33 | # | 1 | 400 | 11,581 | 40 | 290 | 29 | # | 2 | 100 | 200 | 1 | 200 | 2 | # | 2 | 200 | 356 | 2 | 178 | 1.8 | # | 2 | 400 | 545 | 2.3 | 236 | 1.3 | # | 2 | 12,500 | 9,114 | 90 | 101 | 0.7 | # | 2* | 100,000 | 49,437 | 143** | 346 | 0.5 | # # # \* Option 2 parallelized # # \*\* Option 118s fetching items + 25s moving between processes. # # ## Thoughts # # 1. This is pretty complicated. Not really sure where this would live. Maybe pystac-client, but it's not going to do spatial partitioning. Maybe the endpoint? I guess user-code for now... # 2. The serialization is painful (25s on top of the 2 minutes waiting for the endpoint). Given that we're still spending ~90% of our time on I/O, it might be worth trying out an async client. Then we can parallelize even further (on a single thread). At some point we'd block the event loop, which would be bad, but it's only ~10% of the time. With a faster JSON parser that'd be even lower.