#!/usr/bin/env python # coding: utf-8 # # Pandora: a new stereo matching framework # # # *Cournet, M., Sarrazin, E., Dumas, L., Michel, J., Guinet, J., Youssefi, D., Defonte, V., Fardet, Q., 2020. Ground-truth generation and disparity estimation for optical satellite imagery. ISPRS - International Archives of the Photogrammetry, Remote Sensing and Spatial Information Sciences.* # # Analysis demo # Imports and external functions # In[ ]: import os import matplotlib.pyplot as plt import numpy as np import rasterio from pathlib import Path from IPython.display import Image import copy import xarray as xr # In[ ]: import bokeh.plotting as bpl from bokeh.plotting import figure from bokeh.layouts import row, column from bokeh.models import ColorBar, BasicTicker, LinearColorMapper, Legend from bokeh.io import show, output_notebook from jupyter_dash import JupyterDash from dash import dcc, html # Imports of custom functions for visualization # In[ ]: from snippets.utils import * # Imports of pandora # In[ ]: # Load pandora imports import pandora from pandora.img_tools import create_dataset_from_inputs, get_metadata from pandora.check_configuration import check_pipeline_section, concat_conf, get_config_pipeline, check_datasets from pandora.state_machine import PandoraMachine from pandora import import_plugin, check_conf # (Optional) If Pandora plugins are to be used, import them # Available Pandora Plugins include : # - MC-CNN Matching cost computation # - SGM Optimization # ## 1. Load plugins, input images and ground truth # Import installed plugins # In[ ]: # Load plugins import_plugin() # Provide output directory to write results # In[ ]: output_dir = Path.cwd() / "output" # If necessary, create output dir output_dir.mkdir(exist_ok=True,parents=True) output_dir = str(output_dir) # Provide input data # In[ ]: # Paths to left and right images img_left_path = "data/Cones_LEFT.tif" img_right_path = "data/Cones_RIGHT.tif" # Paths to masks (None if not provided) left_mask_path = None right_mask_path = None # No data no_data_left = np.nan no_data_right = np.nan # Read input data and convert to dataset # In[ ]: input_config = { "left": {"img": img_left_path, "mask": left_mask_path, "disp": [-60, 0], "nodata": no_data_left}, "right": {"img": img_right_path, "mask": right_mask_path, "disp": None, "nodata": no_data_right}, } # In[ ]: img_left = create_dataset_from_inputs(input_config=input_config["left"]) img_right = create_dataset_from_inputs(input_config=input_config["right"]) # Check datasets: shape, format and content # In[ ]: check_datasets(img_left, img_right) # ### Visualize input data # In[ ]: show_input_images(img_left, img_right) # Load ground truth if available # In[ ]: # If occlusion mask exists, adapt it to Pandora's convention before creating the ground_truth # Masks known valid value. If None, the lowest value in the mask will be considered valid_value = 1 adapted_mask_path = adapt_occlusion_mask(mask_path = "data/Occlusion_LEFT.png", output_dir = output_dir, valid_value = valid_value, title = "adapted_occlusion_mask") # Read image of ground_thruth ground_truth_input_config = {"img": "data/Cones_LEFT_GT.tif", "nodata": np.inf, "mask": adapted_mask_path, "disp":[-60, 0]} ground_truth = create_dataset_from_inputs(input_config=ground_truth_input_config) # Convert disparity map to Pandora's convention ground_truth["disparity_map"] = -ground_truth.im ground_truth["validity_mask"] = xr.where(ground_truth.msk != 0, pandora.constants.PANDORA_MSK_PIXEL_OCCLUSION, 0) # ### Visualize ground truth # The different types of masks can be selected for visualization # In[ ]: plot_disparity(ground_truth) # ## 1. Run Pandora and analyze output disparity and statistics # ### Define pipeline for Zncc matching cost # In[ ]: user_cfg_zncc = { 'input': { "left": { "img": img_left_path, "disp": [-60, 0], "mask": left_mask_path, }, "right": { "img": img_right_path, "mask": right_mask_path } }, 'pipeline': { 'matching_cost': {'matching_cost_method': 'zncc', 'window_size': 5, 'subpix': 1}, 'disparity': {'disparity_method':'wta', "invalid_disparity": "NaN"}, 'refinement': {'refinement_method': 'vfit'}, 'validation': {'validation_method': 'cross_checking_accurate'}, } } # ### Instantiate and run the machine with the configuration # In[ ]: pandora_machine = PandoraMachine() # In[ ]: disp_min = user_cfg_zncc["input"]["left"]["disp"][0] disp_max = user_cfg_zncc["input"]["left"]["disp"][1] # In[ ]: metadata_left = get_metadata(user_cfg_zncc["input"]["left"]["img"], (disp_min, disp_max)) metadata_right = get_metadata(user_cfg_zncc["input"]["right"]["img"], disparity=None) user_cfg_pipeline = get_config_pipeline(user_cfg_zncc) cfg_pipeline_zncc = check_pipeline_section(user_cfg_pipeline, metadata_left, metadata_right, pandora_machine)['pipeline'] # In[ ]: pandora_machine.run_prepare(user_cfg_zncc, img_left, img_right) # In[ ]: left_disp_zncc, right_disp_zncc = pandora.run(pandora_machine, img_left, img_right, user_cfg_zncc) # ### Visualize output disparity # In[ ]: plot_disparity(left_disp_zncc) # ### Visually compare the disparity map with the ground truth # In[ ]: compare_2_disparities(left_disp_zncc, "Disparity map Zncc", ground_truth, "Ground Truth") # ### Obtain error and statistics between the disparity map and the ground truth at a given threshold # Statistics are computed as defined in D. Scharstein and R. Szeliski. A taxonomy and evaluation of dense two-frame stereo correspondence algorithms. # International Journal of Computer Vision, 47(1/2/3):7-42, April-June 2002. # Microsoft Research Technical Report MSR-TR-2001-81, November 2001, part 5.1 Evaluation methodology, pp 11. # In[ ]: threshold = 1 error_zncc, total_bad_percentage, mean_error, std_error, invalid_percentage = get_error(left_disp_zncc, ground_truth, threshold) # In[ ]: print("Threshold = {}".format(threshold)) print("Total bad error point percentage = {:.2f}".format(total_bad_percentage)) print("Mean error = {:.2f}".format(mean_error)) print("Std error = {:.2f}".format(std_error)) print("Invalid point percentage = {:.2f}%".format(invalid_percentage)) # ### Visualize disparity map and its error at a given threhold # In[ ]: compare_disparity_and_error(left_disp_zncc, "Disparity map Zncc", error_zncc, "Error with threshold 1") # ## 2. Run Pandora step by step and analyze intermediate results # Define pipeline for Census matching cost and SGM optimization # In[ ]: user_cfg_census_sgm = { 'input': { "left": { "img": img_left_path, "disp": [-60, 0], "mask": left_mask_path, }, "right": { "img": img_right_path, "mask": right_mask_path } }, 'pipeline': { 'matching_cost': {'matching_cost_method': 'census', 'window_size': 5, 'subpix': 1}, 'optimization' : {'optimization_method': 'sgm'}, 'disparity': {'disparity_method':'wta', "invalid_disparity": "NaN"}, 'filter': {'filter_method': 'median'}, 'refinement': {'refinement_method': 'vfit'}, 'validation': {'validation_method': 'cross_checking_accurate'}, } } # Instantiate and run the machine with the configuration # In[ ]: pandora_machine = PandoraMachine() # In[ ]: disp_min = user_cfg_census_sgm["input"]["left"]["disp"][0] disp_max = user_cfg_census_sgm["input"]["left"]["disp"][1] # In[ ]: metadata_left = get_metadata(user_cfg_census_sgm["input"]["left"]["img"], (disp_min, disp_max)) metadata_right = get_metadata(user_cfg_census_sgm["input"]["right"]["img"], disparity=None) user_cfg_pipeline = get_config_pipeline(user_cfg_census_sgm) cfg_pipeline_census_sgm = check_pipeline_section(user_cfg_pipeline, metadata_left, metadata_right, pandora_machine)['pipeline'] # In[ ]: pandora_machine.run_prepare(user_cfg_census_sgm, img_left, img_right) # Run matching cost # In[ ]: pandora_machine.run('matching_cost', user_cfg_census_sgm) left_cv_census = copy.deepcopy(pandora_machine.left_cv) right_cv_census = copy.deepcopy(pandora_machine.right_cv) # Run optimization # In[ ]: pandora_machine.run('optimization', user_cfg_census_sgm) left_cv_census_sgm = copy.deepcopy(pandora_machine.left_cv) right_cv_census_sgm = copy.deepcopy(pandora_machine.right_cv) # Run disparity # In[ ]: pandora_machine.run('disparity', user_cfg_census_sgm) left_disparity_map_census_sgm = copy.deepcopy(pandora_machine.left_disparity) right_disparity_map_census_sgm = copy.deepcopy(pandora_machine.right_disparity) # ### Visualize Census cost volume with optimization # In[ ]: # Warning : cost volume may take a long time to appear (30s) plot_1_cost_volume(left_cv_census_sgm, left_disparity_map_census_sgm, "Cost volume with Census matching cost and SGM optimization") # Run filter # In[ ]: pandora_machine.run('filter', user_cfg_census_sgm) left_disparity_map_census_sgm_filtered = copy.deepcopy(pandora_machine.left_disparity) right_disparity_map_census_sgm_filtered = copy.deepcopy(pandora_machine.right_disparity) # ### Visualize disparity map before and after filter # In[ ]: compare_2_disparities(left_disparity_map_census_sgm, "Disparity map Census SGM", left_disparity_map_census_sgm_filtered, "Disparity map Census SGM filtered") # Run refinement # In[ ]: pandora_machine.run('refinement', user_cfg_census_sgm) left_disparity_map_census_sgm_refined = copy.deepcopy(pandora_machine.left_disparity) right_disparity_map_census_sgm_refined = copy.deepcopy(pandora_machine.right_disparity) # ### Visualize disparity map before and after refinement # In[ ]: compare_2_disparities(left_disparity_map_census_sgm_filtered, "Disparity map Census SGM", left_disparity_map_census_sgm_refined, "Disparity map Census SGM refined") # Run validation # In[ ]: pandora_machine.run('validation', user_cfg_census_sgm) left_disparity_map_census_sgm_validated = copy.deepcopy(pandora_machine.left_disparity) right_disparity_map_census_sgm_validated = copy.deepcopy(pandora_machine.right_disparity) # ### Visualize disparity map before and after validation # In[ ]: compare_2_disparities(left_disparity_map_census_sgm_filtered, "Disparity map Census SGM", left_disparity_map_census_sgm_validated, "Disparity map Census SGM after validation") # ### Obtain error and statistics between the disparity map and the ground truth at a given threshold # In[ ]: threshold = 1 error_census_sgm, total_bad_percentage, mean_error, std_error, invalid_percentage = get_error(left_disparity_map_census_sgm, ground_truth, threshold) # ### Visualize 3 chosen disparity maps and one error map at a given threhold # In[ ]: compare_3_disparities_and_error(left_disparity_map_census_sgm, "Disparity map Census SGM after disparity step", left_disparity_map_census_sgm_validated, "Disparity map Census SGM after validation", ground_truth, "Ground truth", error_census_sgm, "Error Census SGM after validation, threshold 1",) # ## 2.1 To see the effect of SGM optimization, define pipeline for Census matching cost WITHOUT SGM optimization # In[ ]: user_cfg_census = { 'input': { "left": { "img": img_left_path, "disp": [-60, 0], "mask": left_mask_path, }, "right": { "img": img_right_path, "mask": right_mask_path, } }, 'pipeline': { 'matching_cost': {'matching_cost_method': 'census', 'window_size': 5, 'subpix': 1}, 'disparity': {'disparity_method':'wta', "invalid_disparity": "NaN"}, 'filter': {'filter_method': 'median'}, 'refinement': {'refinement_method': 'vfit'}, 'validation': {'validation_method': 'cross_checking_accurate'}, } } # Instantiate and run the machine with the configuration # In[ ]: pandora_machine = PandoraMachine() # In[ ]: disp_min = user_cfg_census["input"]["left"]["disp"][0] disp_max = user_cfg_census["input"]["left"]["disp"][1] # In[ ]: metadata_left = get_metadata(user_cfg_census["input"]["left"]["img"], (disp_min, disp_max)) metadata_right = get_metadata(user_cfg_census["input"]["right"]["img"], disparity=None) user_cfg_pipeline = get_config_pipeline(user_cfg_census) cfg_pipeline_census = check_pipeline_section(user_cfg_pipeline, metadata_left, metadata_right, pandora_machine)['pipeline'] # In[ ]: pandora_machine.run_prepare(user_cfg_census, img_left, img_right) # Run matching cost # In[ ]: pandora_machine.run('matching_cost', user_cfg_census) left_cv_census = copy.deepcopy(pandora_machine.left_cv) right_cv_census = copy.deepcopy(pandora_machine.right_cv) # Run disparity # In[ ]: pandora_machine.run('disparity', user_cfg_census) left_disparity_map_census = copy.deepcopy(pandora_machine.left_disparity) right_disparity_map_census = copy.deepcopy(pandora_machine.right_disparity) # ### Compare Census cost volume without and with optimization # In[ ]: # Cost volume from current pipeline without optimization cv_census = get_3D_cost_volume(left_cv_census, left_disparity_map_census) # Cost volume from previous pipeline with SGM optimization cv_census_sgm = get_3D_cost_volume(left_cv_census_sgm, left_disparity_map_census_sgm) # In[ ]: external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] app = JupyterDash(__name__, external_stylesheets=external_stylesheets) # In[ ]: # This check is necessary for the notebooks_tests if app is not None: app.layout = html.Div(children=[ html.Div([ html.Div([ html.Div(children=''' Cost volume from current pipeline without optimization. '''), dcc.Graph( id='graph1', figure=cv_census, style={'width': '100vh', 'height': '100vh'} ), ], className="six columns"), html.Div([ html.Div(children=''' Cost volume from previous pipeline with SGM optimization '''), dcc.Graph( id='graph2', figure=cv_census_sgm, style={'width': '100vh', 'height': '100vh'} ), ], className="six columns"), ], className = 'row'), ]) # #### Unfortunately, the following function does not work with Binder, given the current network incompatibilities between Binder-Jupyter-Bokehserver. # # #### To do so, please run the notebook locally. # In[ ]: # This check is necessary for the notebooks_tests if app is not None: app.run_server(mode='inline', debug=True) # Run filter # In[ ]: pandora_machine.run('filter', user_cfg_census) left_disp_map_census_filtered = copy.deepcopy(pandora_machine.left_disparity) right_disp_map_census_filtered = copy.deepcopy(pandora_machine.right_disparity) # Run refinement # In[ ]: pandora_machine.run('refinement', user_cfg_census) left_disp_map_census_refined = copy.deepcopy(pandora_machine.left_disparity) right_disp_map_census_refined = copy.deepcopy(pandora_machine.right_disparity) # Run validation # In[ ]: pandora_machine.run('validation', user_cfg_census) left_disp_map_census_validated = copy.deepcopy(pandora_machine.left_disparity) right_disp_map_census_validated = copy.deepcopy(pandora_machine.right_disparity) # ### Compare disparity map without and with optimization # In[ ]: # Disparity map from current pipeline without optimization and disparity map from previous pipeline compare_2_disparities(left_disp_map_census_validated, "Disparity map Census", left_disparity_map_census_sgm_validated, "Disparity map Census with SGM optimization")