Cournet, M., Sarrazin, E., Dumas, L., Michel, J., Guinet, J., Youssefi, D., Defonte, V., Fardet, Q., 2020. Ground-truth generation and disparity estimation for optical satellite imagery. ISPRS - International Archives of the Photogrammetry, Remote Sensing and Spatial Information Sciences.
Imports and external functions
import os
import matplotlib.pyplot as plt
import numpy as np
import rasterio
from pathlib import Path
from IPython.display import Image
import copy
import xarray as xr
import bokeh.plotting as bpl
from bokeh.plotting import figure
from bokeh.layouts import row, column
from bokeh.models import ColorBar, BasicTicker, LinearColorMapper, Legend
from bokeh.io import show, output_notebook
from jupyter_dash import JupyterDash
from dash import dcc, html
Imports of custom functions for visualization
from snippets.utils import *
Imports of pandora
# Load pandora imports
import pandora
from pandora.img_tools import create_dataset_from_inputs, get_metadata
from pandora.check_configuration import check_pipeline_section, concat_conf, get_config_pipeline, check_datasets
from pandora.state_machine import PandoraMachine
from pandora import import_plugin, check_conf
(Optional) If Pandora plugins are to be used, import them
Available Pandora Plugins include :
Import installed plugins
# Load plugins
import_plugin()
Provide output directory to write results
output_dir = os.path.join(os.getcwd(),"output")
# If necessary, create output dir
Path(output_dir).mkdir(exist_ok=True,parents=True)
Provide input data
# Paths to left and right images
img_left_path = "data/Cones_LEFT.tif"
img_right_path = "data/Cones_RIGHT.tif"
# Paths to masks (None if not provided)
left_mask_path = None
right_mask_path = None
# No data
no_data_left = np.nan
no_data_right = np.nan
Read input data and convert to dataset
input_config = {
"left": {"img": img_left_path, "mask": left_mask_path, "disp": [-60, 0], "nodata": no_data_left},
"right": {"img": img_right_path, "mask": right_mask_path, "disp": None, "nodata": no_data_right},
}
img_left = create_dataset_from_inputs(input_config=input_config["left"])
img_right = create_dataset_from_inputs(input_config=input_config["right"])
Check datasets: shape, format and content
check_datasets(img_left, img_right)
show_input_images(img_left, img_right)
Load ground truth if available
# If occlusion mask exists, adapt it to Pandora's convention before creating the ground_truth
# Masks known valid value. If None, the lowest value in the mask will be considered
valid_value = 1
adapted_mask_path = adapt_occlusion_mask(mask_path = "data/Occlusion_LEFT.png", output_dir = output_dir, valid_value = valid_value, title = "adapted_occlusion_mask")
# Read image of ground_thruth
ground_truth_input_config = {"img": "data/Cones_LEFT_GT.tif", "nodata": np.inf, "mask": adapted_mask_path, "disp":[-60, 0]}
ground_truth = create_dataset_from_inputs(input_config=ground_truth_input_config)
# Convert disparity map to Pandora's convention
ground_truth["disparity_map"] = xr.DataArray(np.copy(ground_truth.im),dims=['row', 'col'])
ground_truth["disparity_map"].values = - ground_truth["disparity_map"].values
ground_truth["validity_mask"] = xr.DataArray(np.copy(ground_truth.msk),dims=['row', 'col'])
ground_truth["validity_mask"].values = np.zeros(ground_truth["msk"].values.shape, dtype = int)
inv_idx = np.where(ground_truth["msk"].values != 0)
ground_truth["validity_mask"].values[inv_idx] = pandora.constants.PANDORA_MSK_PIXEL_OCCLUSION
The different types of masks can be selected for visualization
plot_disparity(ground_truth)
user_cfg_zncc = {
'input': {
"left": {
"img": img_left_path,
"disp": [-60, 0],
"mask": left_mask_path,
},
"right": {
"img": img_right_path,
"mask": right_mask_path
}
},
'pipeline': {
'matching_cost': {'matching_cost_method': 'zncc', 'window_size': 5, 'subpix': 1},
'disparity': {'disparity_method':'wta', "invalid_disparity": "NaN"},
'refinement': {'refinement_method': 'vfit'},
'validation': {'validation_method': 'cross_checking_accurate'},
}
}
pandora_machine = PandoraMachine()
disp_min = user_cfg_zncc["input"]["left"]["disp"][0]
disp_max = user_cfg_zncc["input"]["left"]["disp"][1]
metadata_left = get_metadata(user_cfg_zncc["input"]["left"]["img"], (disp_min, disp_max))
metadata_right = get_metadata(user_cfg_zncc["input"]["right"]["img"], disparity=None)
user_cfg_pipeline = get_config_pipeline(user_cfg_zncc)
cfg_pipeline_zncc = check_pipeline_section(user_cfg_pipeline, metadata_left, metadata_right, pandora_machine)['pipeline']
pandora_machine.run_prepare(user_cfg_zncc, img_left, img_right, disp_min, disp_max)
left_disp_zncc, right_disp_zncc = pandora.run(pandora_machine, img_left, img_right, user_cfg_zncc)
plot_disparity(left_disp_zncc)
compare_2_disparities(left_disp_zncc, "Disparity map Zncc", ground_truth, "Ground Truth")
Statistics are computed as defined in D. Scharstein and R. Szeliski. A taxonomy and evaluation of dense two-frame stereo correspondence algorithms. International Journal of Computer Vision, 47(1/2/3):7-42, April-June 2002. Microsoft Research Technical Report MSR-TR-2001-81, November 2001, part 5.1 Evaluation methodology, pp 11.
threshold = 1
error_zncc, total_bad_percentage, mean_error, std_error, invalid_percentage = get_error(left_disp_zncc, ground_truth, threshold)
print("Threshold = {}".format(threshold))
print("Total bad error point percentage = {:.2f}".format(total_bad_percentage))
print("Mean error = {:.2f}".format(mean_error))
print("Std error = {:.2f}".format(std_error))
print("Invalid point percentage = {:.2f}%".format(invalid_percentage))
compare_disparity_and_error(left_disp_zncc, "Disparity map Zncc", error_zncc, "Error with threshold 1")
Define pipeline for Census matching cost and SGM optimization
user_cfg_census_sgm = {
'input': {
"left": {
"img": img_left_path,
"disp": [-60, 0],
"mask": left_mask_path,
},
"right": {
"img": img_right_path,
"mask": right_mask_path
}
},
'pipeline': {
'matching_cost': {'matching_cost_method': 'census', 'window_size': 5, 'subpix': 1},
'optimization' : {'optimization_method': 'sgm'},
'disparity': {'disparity_method':'wta', "invalid_disparity": "NaN"},
'filter': {'filter_method': 'median'},
'refinement': {'refinement_method': 'vfit'},
'validation': {'validation_method': 'cross_checking_accurate'},
}
}
Instantiate and run the machine with the configuration
pandora_machine = PandoraMachine()
disp_min = user_cfg_census_sgm["input"]["left"]["disp"][0]
disp_max = user_cfg_census_sgm["input"]["left"]["disp"][1]
metadata_left = get_metadata(user_cfg_census_sgm["input"]["left"]["img"], (disp_min, disp_max))
metadata_right = get_metadata(user_cfg_census_sgm["input"]["right"]["img"], disparity=None)
user_cfg_pipeline = get_config_pipeline(user_cfg_census_sgm)
cfg_pipeline_census_sgm = check_pipeline_section(user_cfg_pipeline, metadata_left, metadata_right, pandora_machine)['pipeline']
pandora_machine.run_prepare(user_cfg_census_sgm, img_left, img_right)
Run matching cost
pandora_machine.run('matching_cost', user_cfg_census_sgm)
left_cv_census = copy.deepcopy(pandora_machine.left_cv)
right_cv_census = copy.deepcopy(pandora_machine.right_cv)
Run optimization
pandora_machine.run('optimization', user_cfg_census_sgm)
left_cv_census_sgm = copy.deepcopy(pandora_machine.left_cv)
right_cv_census_sgm = copy.deepcopy(pandora_machine.right_cv)
Run disparity
pandora_machine.run('disparity', user_cfg_census_sgm)
left_disparity_map_census_sgm = copy.deepcopy(pandora_machine.left_disparity)
right_disparity_map_census_sgm = copy.deepcopy(pandora_machine.right_disparity)
# Warning : cost volume may take a long time to appear (30s)
plot_1_cost_volume(left_cv_census_sgm, left_disparity_map_census_sgm, "Cost volume with Census matching cost and SGM optimization")
Run filter
pandora_machine.run('filter', user_cfg_census_sgm)
left_disparity_map_census_sgm_filtered = copy.deepcopy(pandora_machine.left_disparity)
right_disparity_map_census_sgm_filtered = copy.deepcopy(pandora_machine.right_disparity)
compare_2_disparities(left_disparity_map_census_sgm, "Disparity map Census SGM", left_disparity_map_census_sgm_filtered, "Disparity map Census SGM filtered")
Run refinement
pandora_machine.run('refinement', user_cfg_census_sgm)
left_disparity_map_census_sgm_refined = copy.deepcopy(pandora_machine.left_disparity)
right_disparity_map_census_sgm_refined = copy.deepcopy(pandora_machine.right_disparity)
compare_2_disparities(left_disparity_map_census_sgm_filtered, "Disparity map Census SGM", left_disparity_map_census_sgm_refined, "Disparity map Census SGM refined")
Run validation
pandora_machine.run('validation', user_cfg_census_sgm)
left_disparity_map_census_sgm_validated = copy.deepcopy(pandora_machine.left_disparity)
right_disparity_map_census_sgm_validated = copy.deepcopy(pandora_machine.right_disparity)
compare_2_disparities(left_disparity_map_census_sgm_filtered, "Disparity map Census SGM", left_disparity_map_census_sgm_validated, "Disparity map Census SGM after validation")
threshold = 1
error_census_sgm, total_bad_percentage, mean_error, std_error, invalid_percentage = get_error(left_disparity_map_census_sgm, ground_truth, threshold)
compare_3_disparities_and_error(left_disparity_map_census_sgm, "Disparity map Census SGM after disparity step", left_disparity_map_census_sgm_validated, "Disparity map Census SGM after validation", ground_truth, "Ground truth", error_census_sgm, "Error Census SGM after validation, threshold 1",)
user_cfg_census = {
'input': {
"left": {
"img": img_left_path,
"disp": [-60, 0],
"mask": left_mask_path,
},
"right": {
"img": img_right_path,
"mask": right_mask_path,
}
},
'pipeline': {
'matching_cost': {'matching_cost_method': 'census', 'window_size': 5, 'subpix': 1},
'disparity': {'disparity_method':'wta', "invalid_disparity": "NaN"},
'filter': {'filter_method': 'median'},
'refinement': {'refinement_method': 'vfit'},
'validation': {'validation_method': 'cross_checking_accurate'},
}
}
Instantiate and run the machine with the configuration
pandora_machine = PandoraMachine()
disp_min = user_cfg_census["input"]["left"]["disp"][0]
disp_max = user_cfg_census["input"]["left"]["disp"][1]
metadata_left = get_metadata(user_cfg_census["input"]["left"]["img"], (disp_min, disp_max))
metadata_right = get_metadata(user_cfg_census["input"]["right"]["img"], disparity=None)
user_cfg_pipeline = get_config_pipeline(user_cfg_census)
cfg_pipeline_census = check_pipeline_section(user_cfg_pipeline, metadata_left, metadata_right, pandora_machine)['pipeline']
pandora_machine.run_prepare(user_cfg_census, img_left, img_right)
Run matching cost
pandora_machine.run('matching_cost', user_cfg_census)
left_cv_census = copy.deepcopy(pandora_machine.left_cv)
right_cv_census = copy.deepcopy(pandora_machine.right_cv)
Run disparity
pandora_machine.run('disparity', user_cfg_census)
left_disparity_map_census = copy.deepcopy(pandora_machine.left_disparity)
right_disparity_map_census = copy.deepcopy(pandora_machine.right_disparity)
# Cost volume from current pipeline without optimization
cv_census = get_3D_cost_volume(left_cv_census, left_disparity_map_census)
# Cost volume from previous pipeline with SGM optimization
cv_census_sgm = get_3D_cost_volume(left_cv_census_sgm, left_disparity_map_census_sgm)
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = JupyterDash(__name__, external_stylesheets=external_stylesheets)
# This check is necessary for the notebooks_tests
if app is not None:
app.layout = html.Div(children=[
html.Div([
html.Div([
html.Div(children='''
Cost volume from current pipeline without optimization.
'''),
dcc.Graph(
id='graph1',
figure=cv_census,
style={'width': '100vh', 'height': '100vh'}
),
], className="six columns"),
html.Div([
html.Div(children='''
Cost volume from previous pipeline with SGM optimization
'''),
dcc.Graph(
id='graph2',
figure=cv_census_sgm,
style={'width': '100vh', 'height': '100vh'}
),
], className="six columns"),
], className = 'row'),
])
# This check is necessary for the notebooks_tests
if app is not None:
app.run_server(mode='inline', debug=True)
Run filter
pandora_machine.run('filter', user_cfg_census)
left_disp_map_census_filtered = copy.deepcopy(pandora_machine.left_disparity)
right_disp_map_census_filtered = copy.deepcopy(pandora_machine.right_disparity)
Run refinement
pandora_machine.run('refinement', user_cfg_census)
left_disp_map_census_refined = copy.deepcopy(pandora_machine.left_disparity)
right_disp_map_census_refined = copy.deepcopy(pandora_machine.right_disparity)
Run validation
pandora_machine.run('validation', user_cfg_census)
left_disp_map_census_validated = copy.deepcopy(pandora_machine.left_disparity)
right_disp_map_census_validated = copy.deepcopy(pandora_machine.right_disparity)
# Disparity map from current pipeline without optimization and disparity map from previous pipeline
compare_2_disparities(left_disp_map_census_validated, "Disparity map Census", left_disparity_map_census_sgm_validated, "Disparity map Census with SGM optimization")