# require spatial partitioning features from dask-geopandas main
!pip install -qU git+https://github.com/geopandas/dask-geopandas
This notebook looks into the performance of the STAC section from Caleb's mosaiks tutorial. The problem: given a bunch of (semi-randomly distributed) Points, find the "best" STAC item, where
And do it quickly. Our baseline is ~30 minutes.
import pandas as pd
import dask_geopandas
import geopandas
import shapely.geometry
import pystac_client
The dataset is 100,000 points in the US.
df = pd.read_csv(
"https://files.codeocean.com/files/verified/fa908bbc-11f9-4421-8bd3-72a4bf00427f_v2.0/data/int/applications/population/outcomes_sampled_population_CONTUS_16_640_UAR_100000_0.csv?download", # noqa: E501
index_col=0,
na_values=[-999],
)
points = df.dropna()[["lon", "lat"]]
population = df.dropna()["population"]
gdf = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.lon, df.lat))
gdf
V1 | V1.1 | ID | lon | lat | population | geometry | |
---|---|---|---|---|---|---|---|
1 | 1 | 1 | 1225,1595 | -103.046735 | 37.932314 | 0.085924 | POINT (-103.04673 37.93231) |
2 | 2 | 2 | 1521,2455 | -91.202438 | 34.647579 | 0.808222 | POINT (-91.20244 34.64758) |
3 | 3 | 3 | 1745,865 | -113.100614 | 32.071427 | NaN | POINT (-113.10061 32.07143) |
4 | 4 | 4 | 828,3849 | -72.003660 | 42.116711 | 101.286320 | POINT (-72.00366 42.11671) |
5 | 5 | 5 | 1530,2831 | -86.024002 | 34.545552 | 28.181724 | POINT (-86.02400 34.54555) |
... | ... | ... | ... | ... | ... | ... | ... |
99996 | 99996 | 99996 | 343,2466 | -91.050941 | 46.876806 | 1.782686 | POINT (-91.05094 46.87681) |
99997 | 99997 | 99997 | 634,3922 | -70.998272 | 44.067430 | 8.383357 | POINT (-70.99827 44.06743) |
99998 | 99998 | 99998 | 357,2280 | -93.612615 | 46.744854 | 8.110552 | POINT (-93.61261 46.74485) |
99999 | 99999 | 99999 | 1122,351 | -120.179647 | 39.042563 | NaN | POINT (-120.17965 39.04256) |
100000 | 100000 | 100000 | 1449,2691 | -87.952143 | 35.459263 | 1.972914 | POINT (-87.95214 35.45926) |
100000 rows × 7 columns
Take the unary union of the points, request that. But there are a few problems:
So we have to chunk up the requests. Option 1 does it naively, just take the union of the points in that chunk and use that as the intersects
.
N = 100 # also 200, 400
intersects = shapely.geometry.mapping(gdf.geometry[:N].unary_union)
import pystac_client
search_start = "2018-01-01"
search_end = "2019-12-31"
catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
# The time frame in which we search for non-cloudy imagery
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=[search_start, search_end],
query={"eo:cloud_cover": {"lt": 10}},
limit=500
)
%time result = search.get_all_items_as_dict()
len(result["features"])
See the "results" section below. Cursory profiling says we spend 91% waiting on I/O, 9% parsing JSON.
There are two issues with that previous approach:
We'll try to parallelize the requests now, fetching multiple results at once. But if we spatially partitoin the data, we solve both those issues. Nearby points will be in the same partition, so we don't have to worry about re-fetching an item multiple times. And since there shouldn't be (much) overlap between partitions, we can do the fetching in parallel.
import dask
import tlz
import dask.array
dask-geopandas implements spatial partitioning. This will assign an integer to each Point so that nearby points in lat / lon are nearby in hilbert space. We'll sort by that.
%%time
ddf = dask_geopandas.from_geopandas(gdf, npartitions=1)
hd = ddf.hilbert_distance().compute()
gdf["hd"] = hd
gdf = gdf.sort_values("hd")
CPU times: user 2.68 s, sys: 35.3 ms, total: 2.72 s Wall time: 2.72 s
Then we partition the dataset (just using dask.array for the convenience functions of figuring out chunk sizes / slicing). We'll have 8 chunks.
chunks = dask.array.core.normalize_chunks(len(gdf) // 8, (len(gdf),))
slices = dask.array.core.slices_from_chunks(chunks)
slices = [x[0] for x in slices]
gdfs = [gdf.geometry[x] for x in slices]
Each of these has a small footprint relative to the original.
[x.unary_union.convex_hull.area / gdf.unary_union.convex_hull.area for x in gdfs]
[0.13028532040879806, 0.12214785701877728, 0.10287657846703617, 0.11040811099462544, 0.12402116650254451, 0.11945109930242395, 0.19199158363292582, 0.14604634899881044]
Now do the results (these are "method 2" in the table below)
N = 100 # also 200, 400, len(gdfs[0])
intersects = shapely.geometry.mapping(gdfs[0].geometry[:N].unary_union)
search_start = "2018-01-01"
search_end = "2019-12-31"
catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
# The time frame in which we search for non-cloudy imagery
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=[search_start, search_end],
query={"eo:cloud_cover": {"lt": 10}},
limit=500
)
%time result = search.get_all_items_as_dict()
len(result["features"])
OK, now parallelize that.
def f(intersects):
search_start = "2018-01-01"
search_end = "2019-12-31"
catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
# The time frame in which we search for non-cloudy imagery
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=[search_start, search_end],
query={"eo:cloud_cover": {"lt": 10}},
limit=500
)
return search.get_all_items_as_dict()
from distributed import Client, wait
client = Client()
client
Client-2ffa1890-0cd6-11ec-819f-762058baa00d
Connection method: Cluster object | Cluster type: LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
0f4b1569
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 4 | Total memory: 32.00 GiB |
Status: running | Using processes: True |
Scheduler-de550c97-4c3b-4268-87f8-17145014fcb8
Comm: tcp://127.0.0.1:45805 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 32.00 GiB |
Comm: tcp://127.0.0.1:43843 | Total threads: 1 |
Dashboard: http://127.0.0.1:43997/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:39055 | |
Local directory: /home/jovyan/tmp/dask-worker-space/worker-tnp3tnds |
Comm: tcp://127.0.0.1:35385 | Total threads: 1 |
Dashboard: http://127.0.0.1:45793/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:35025 | |
Local directory: /home/jovyan/tmp/dask-worker-space/worker-_rdmhesb |
Comm: tcp://127.0.0.1:36509 | Total threads: 1 |
Dashboard: http://127.0.0.1:45757/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:39959 | |
Local directory: /home/jovyan/tmp/dask-worker-space/worker-bu9_4a28 |
Comm: tcp://127.0.0.1:39463 | Total threads: 1 |
Dashboard: http://127.0.0.1:45279/status | Memory: 8.00 GiB |
Nanny: tcp://127.0.0.1:43999 | |
Local directory: /home/jovyan/tmp/dask-worker-space/worker-p8vud057 |
%time intersects_s = [shapely.geometry.mapping(x.geometry.unary_union) for x in gdfs]
CPU times: user 1.49 s, sys: 62.4 ms, total: 1.56 s Wall time: 1.49 s
futures = client.map(f, intersects_s)
%time _ = wait(futures)
CPU times: user 6.16 s, sys: 632 ms, total: 6.79 s Wall time: 1min 58s
%time results = [fut.result() for fut in futures]
CPU times: user 17.3 s, sys: 1.86 s, total: 19.2 s Wall time: 25.4 s
import itertools
results2 = list(itertools.chain(*[x["features"] for x in results]))
len(results2)
49437
Method | n_points | n_items | time (s) | items/s | items / point |
---|---|---|---|---|---|
1 | 100 | 3,819 | 15 | 254 | 38 |
1 | 200 | 6,783 | 42 | 162 | 33 |
1 | 400 | 11,581 | 40 | 290 | 29 |
2 | 100 | 200 | 1 | 200 | 2 |
2 | 200 | 356 | 2 | 178 | 1.8 |
2 | 400 | 545 | 2.3 | 236 | 1.3 |
2 | 12,500 | 9,114 | 90 | 101 | 0.7 |
2* | 100,000 | 49,437 | 143** | 346 | 0.5 |
* Option 2 parallelized
** Option 118s fetching items + 25s moving between processes.