import yaml
# load menu
with open("mnt/city-directories/01-user-input/menu.yml", 'r') as f:
menu = yaml.safe_load(f)
if menu['toolbox']:
import os
import glob
import math
import geopandas as gpd
import pandas as pd
import numpy as np
import pint
from pathlib import Path
import matplotlib.pyplot as plt
import rasterio
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio.plot import show
from rasterstats import zonal_stats
from osgeo import gdal, gdalconst
from scipy.ndimage import generic_filter
from shapely.geometry import LineString
from shapely.ops import linemerge, unary_union
import fiona
import osmnx as ox
from shapely.geometry import LineString, mapping
from skimage import measure
from shapely.ops import unary_union
from rasterstats import zonal_stats
from affine import Affine
from rasterio.features import geometry_mask
import fiona
from rasterio.crs import CRS
import warnings
from rasterio.merge import merge
from rasterio.transform import from_bounds
import csv
from shapely.geometry import LineString, MultiPoint
from shapely.ops import split, snap
from rasterio.mask import mask
import rasterio.features
from rasterio.enums import Resampling
from rasterio.vrt import WarpedVRT
from rasterio.features import shapes
from shapely.geometry import shape
# SET UP ##############################################
# load city inputs files, to be updated for each city scan
with open("city_inputs.yml", 'r') as f:
city_inputs = yaml.safe_load(f)
city = city_inputs['city_name'].replace(' ', '_').lower()
country = city_inputs['country_name'].replace(' ', '_').lower()
# load global inputs, such as data sources that generally remain the same across scans
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
# transform the input shp to correct prj (epsg 4326)
aoi_file = gpd.read_file(city_inputs['AOI_path']).to_crs(epsg = 4326)
features = aoi_file.geometry
# Define output folder ---------
output_folder = Path('mnt/city-directories/02-process-output')
if not os.path.exists(output_folder):
os.mkdir(output_folder)
#merge pluvial
def merge_pluvial_files():
matching_files = glob.glob(os.path.join(output_folder, f"{city}_pluvial_2020_*.tif"))
if matching_files:
src_files_to_merge = [rasterio.open(pluvial_file) for pluvial_file in matching_files]
try:
merged_data, merged_transform = merge(src_files_to_merge)
merged_crs = src_files_to_merge[0].crs
output_file = os.path.join(output_folder, f"{city}_merged_pluvial_data.tif")
with rasterio.open(output_file, 'w', driver='GTiff',
width=merged_data.shape[2], height=merged_data.shape[1],
count=1, dtype=merged_data.dtype,
crs=merged_crs, transform=merged_transform) as dst:
dst.write(merged_data)
print(f"Merged pluvial data saved as {output_file}")
return output_file
except Exception as e:
print(f"Error occurred while merging: {e}")
return None
else:
print("Error: No pluvial files found.")
return None
merged_pluvial_data = merge_pluvial_files()
Merged pluvial data saved as mnt/city-directories/02-process-output/goris_merged_pluvial_data.tif
#merge pluvial
def merge_pluvial_files_UTM():
matching_files = glob.glob(os.path.join(output_folder, f"{city}_pluvial_2020_*_utm.tif"))
if matching_files:
src_files_to_merge = [rasterio.open(pluvial_file) for pluvial_file in matching_files]
merged_data, merged_transform = merge(src_files_to_merge)
merged_crs = src_files_to_merge[0].crs
output_file = os.path.join(output_folder, f"{city}_merged_pluvial_data_utm.tif")
with rasterio.open(output_file, 'w', driver='GTiff',
width=merged_data.shape[2], height=merged_data.shape[1],
count=1, dtype=merged_data.dtype,
crs=merged_crs, transform=merged_transform) as dst:
dst.write(merged_data)
print(f"Merged pluvial data saved as {output_file}")
return output_file
else:
print("Error: No pluvial files found.")
return None
merged_pluvial_data_utm = merge_pluvial_files_UTM()
Merged pluvial data saved as mnt/city-directories/02-process-output/goris_merged_pluvial_data_utm.tif
def merge_fluvial_files():
matching_files = glob.glob(os.path.join(output_folder, f"{city}_fluvial_2020_*.tif"))
if matching_files:
src_files_to_merge = [rasterio.open(fluvial_file) for fluvial_file in matching_files]
merged_data, merged_transform = merge(src_files_to_merge)
merged_crs = src_files_to_merge[0].crs
output_file = os.path.join(output_folder, f"{city}_merged_fluvial_data.tif")
with rasterio.open(output_file, 'w', driver='GTiff',
width=merged_data.shape[2], height=merged_data.shape[1],
count=1, dtype=merged_data.dtype,
crs=merged_crs, transform=merged_transform) as dst:
dst.write(merged_data)
print(f"Merged fluvial data saved as {output_file}")
return output_file
else:
print("Error: No fluvial files found.")
return None
merged_fluvial_data = merge_fluvial_files()
Merged fluvial data saved as mnt/city-directories/02-process-output/goris_merged_fluvial_data.tif
def merge_fluvial_files_UTM():
matching_files = glob.glob(os.path.join(output_folder, f"{city}_fluvial_2020_*_utm.tif"))
if matching_files:
src_files_to_merge = [rasterio.open(fluvial_file) for fluvial_file in matching_files]
merged_data, merged_transform = merge(src_files_to_merge)
merged_crs = src_files_to_merge[0].crs # Use the CRS of the first file
output_file = os.path.join(output_folder, f"{city}_merged_fluvial_data_utm.tif")
with rasterio.open(output_file, 'w', driver='GTiff',
width=merged_data.shape[2], height=merged_data.shape[1],
count=1, dtype=merged_data.dtype,
crs=merged_crs, transform=merged_transform) as dst:
dst.write(merged_data)
print(f"Merged fluvial data saved as {output_file}")
return output_file
else:
print("Error: No fluvial files found.")
return None
merged_fluvial_data_UTM = merge_fluvial_files_UTM()
Merged fluvial data saved as mnt/city-directories/02-process-output/goris_merged_fluvial_data_utm.tif
#merge comb files and save a merged file (everything with a value is 1, otherwise 0 )
#merge coastal files and save a merged file (everything with a value is 1, otherwise 0 )
#resampling data
def resample_raster(input_raster, target_shape):
# Resample raster to match the target shape
data = input_raster.read(1, out_shape=target_shape, resampling=Resampling.nearest)
return data
#PLOT CHECK
def plot_data():
if menu.get('flood') and menu.get('wsf'):
pluvial_path = os.path.join(output_folder, f"{city}_merged_pluvial_data_utm.tif")
wsf_path = os.path.join(output_folder, f"{city}_wsf_utm.tiff")
# Plot features
fig, ax = plt.subplots(figsize=(10, 10))
features_utm = features.to_crs('EPSG:32638')
features_utm.plot(ax=ax, facecolor='none', edgecolor='red')
# Plot pluvial data
with rasterio.open(pluvial_path) as pluvial_src:
show(pluvial_src, ax=ax, cmap='Blues', alpha=0.5)
# Plot WSF data
with rasterio.open(wsf_path) as wsf_src:
show(wsf_src, ax=ax, cmap='Reds', alpha=0.5)
ax.set_title("Features, Pluvial Data, and WSF")
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
plt.show()
plt.savefig
#WSF and Pu
def get_pu_wsf():
if menu.get('flood') and menu.get('wsf'):
pu_path = os.path.join(output_folder, f"{city}_pluvial_2020_lt1_utm.tif")
wsf_path = os.path.join(output_folder, f"{city}_wsf_utm.tiff")
# Reproject features to UTM
features_utm = features.to_crs('EPSG:32638')
with rasterio.open(pu_path) as src_pluvial:
pluvial_data, pluvial_transform = mask(src_pluvial, features_utm.geometry, crop=True)
pluvial_data = pluvial_data[0]
pluvial_affine = pluvial_transform
pluvial_resolution = abs(pluvial_transform[0] * pluvial_transform[4])
with rasterio.open(wsf_path) as src_wsf:
wsf_data, wsf_transform = mask(src_wsf, features_utm.geometry, crop=True)
wsf_data = wsf_data[0]
wsf_affine = wsf_transform
min_height = min(pluvial_data.shape[0], wsf_data.shape[0])
min_width = min(pluvial_data.shape[1], wsf_data.shape[1])
pluvial_data = pluvial_data[:min_height, :min_width]
wsf_data = wsf_data[:min_height, :min_width]
unique_years = np.unique(wsf_data)
unique_years = unique_years[unique_years != 0]
unique_years = unique_years[unique_years != 1985]
stats_by_year = {}
for year in unique_years:
masked_wsf_data = np.where(wsf_data == year, 1, 0)
masked_flooded_data = masked_wsf_data * pluvial_data
stats = zonal_stats(features_utm.geometry, masked_flooded_data, affine=pluvial_transform, stats="sum", nodata=-9999)
area = stats[0]['sum'] * pluvial_resolution
stats_by_year[year] = area
return stats_by_year
else:
print("Flood or WSF menu not selected.")
return None
# Stats by year
# Function to calculate flooded area from WSF data
'''
def get_pu_wsf(output_folder, city, menu, features):
if menu.get('flood') and menu.get('wsf'):
pu_path = os.path.join(output_folder, f"{city}_merged_pluvial_data_utm.tif")
wsf_path = os.path.join(output_folder, f"{city}_wsf_utm.tiff")
# Reproject features to UTM
features_utm = features.to_crs('EPSG:32638')
with rasterio.open(pu_path) as src_pluvial:
pluvial_data, pluvial_transform = mask(src_pluvial, features_utm.geometry, crop=True)
pluvial_data = pluvial_data[0]
pluvial_affine = pluvial_transform
pluvial_resolution = abs(pluvial_transform[0] * pluvial_transform[4])
with rasterio.open(wsf_path) as src_wsf:
wsf_data, wsf_transform = mask(src_wsf, features_utm.geometry, crop=True)
wsf_data = wsf_data[0]
wsf_affine = wsf_transform
# Ensure both arrays have the same shape
min_height = min(pluvial_data.shape[0], wsf_data.shape[0])
min_width = min(pluvial_data.shape[1], wsf_data.shape[1])
pluvial_data = pluvial_data[:min_height, :min_width]
wsf_data = wsf_data[:min_height, :min_width]
# Calculate flooded area for each year
pixel_areas = {}
unique_years = np.unique(wsf_data)
for year in unique_years:
if year != 0 and year >= 1986: # Exclude background value and years before 1986
# Mask WSF data for the current year
masked_wsf_data = np.where(wsf_data == year, 1, 0)
# Multiply with pluvial data to get flooded area
masked_flooded_data = masked_wsf_data * pluvial_data
# Calculate area
area = np.count_nonzero(masked_flooded_data) * pluvial_resolution
pixel_areas[year] = area
return pixel_areas
else:
print("Flood or WSF menu not selected.")
return None
# Call the function and print results
pu_wsf_areas = get_pu_wsf(output_folder, city, menu, features)
if stats_by_year is not None:
years = list(stats_by_year.keys())
areas = list(stats_by_year.values())
# Filter years for plotting
years_to_plot = [1986, 1995, 2005, 2015]
areas_to_plot = [stats_by_year.get(year, np.nan) for year in years_to_plot]
# Interpolate missing years' data for a smoother curve
interp_years = np.arange(min(years_to_plot), max(years_to_plot) + 1)
interp_areas = np.interp(interp_years, years_to_plot, areas_to_plot)
plt.figure(figsize=(10, 6))
plt.plot(interp_years, interp_areas, marker='o', linestyle='-')
plt.title('Flooded Area from 1986 to 2015 (Interpolated)')
plt.xlabel('Year')
plt.ylabel('Area (square units)')
plt.grid(True)
plt.tight_layout()
plt.show()
else:
print("No areas calculated.")
'''
'\ndef get_pu_wsf(output_folder, city, menu, features):\n if menu.get(\'flood\') and menu.get(\'wsf\'):\n pu_path = os.path.join(output_folder, f"{city}_merged_pluvial_data_utm.tif")\n wsf_path = os.path.join(output_folder, f"{city}_wsf_utm.tiff")\n\n # Reproject features to UTM\n features_utm = features.to_crs(\'EPSG:32638\') \n\n with rasterio.open(pu_path) as src_pluvial:\n pluvial_data, pluvial_transform = mask(src_pluvial, features_utm.geometry, crop=True)\n pluvial_data = pluvial_data[0] \n pluvial_affine = pluvial_transform\n pluvial_resolution = abs(pluvial_transform[0] * pluvial_transform[4]) \n\n with rasterio.open(wsf_path) as src_wsf:\n wsf_data, wsf_transform = mask(src_wsf, features_utm.geometry, crop=True)\n wsf_data = wsf_data[0] \n wsf_affine = wsf_transform\n\n # Ensure both arrays have the same shape\n min_height = min(pluvial_data.shape[0], wsf_data.shape[0])\n min_width = min(pluvial_data.shape[1], wsf_data.shape[1])\n pluvial_data = pluvial_data[:min_height, :min_width]\n wsf_data = wsf_data[:min_height, :min_width]\n\n # Calculate flooded area for each year\n pixel_areas = {}\n unique_years = np.unique(wsf_data)\n for year in unique_years:\n if year != 0 and year >= 1986: # Exclude background value and years before 1986\n # Mask WSF data for the current year\n masked_wsf_data = np.where(wsf_data == year, 1, 0)\n # Multiply with pluvial data to get flooded area\n masked_flooded_data = masked_wsf_data * pluvial_data\n # Calculate area\n area = np.count_nonzero(masked_flooded_data) * pluvial_resolution\n pixel_areas[year] = area\n\n return pixel_areas\n else:\n print("Flood or WSF menu not selected.")\n return None\n\n# Call the function and print results\npu_wsf_areas = get_pu_wsf(output_folder, city, menu, features)\nif stats_by_year is not None:\n years = list(stats_by_year.keys())\n areas = list(stats_by_year.values())\n\n # Filter years for plotting\n years_to_plot = [1986, 1995, 2005, 2015]\n areas_to_plot = [stats_by_year.get(year, np.nan) for year in years_to_plot]\n\n # Interpolate missing years\' data for a smoother curve\n interp_years = np.arange(min(years_to_plot), max(years_to_plot) + 1)\n interp_areas = np.interp(interp_years, years_to_plot, areas_to_plot)\n\n plt.figure(figsize=(10, 6))\n plt.plot(interp_years, interp_areas, marker=\'o\', linestyle=\'-\')\n plt.title(\'Flooded Area from 1986 to 2015 (Interpolated)\')\n plt.xlabel(\'Year\')\n plt.ylabel(\'Area (square units)\')\n plt.grid(True)\n plt.tight_layout()\n plt.show()\nelse:\n print("No areas calculated.")\n'
#Fluvial and WSF
def get_fu_wsf():
if menu.get('flood') and menu.get('wsf'):
fu_path = os.path.join(output_folder, f"{city}_fluvial_2020_lt1_utm.tif") # Update the file path for fluvial data
wsf_path = os.path.join(output_folder, f"{city}_wsf_utm.tiff")
# Reproject features to UTM
features_utm = features.to_crs('EPSG:32638')
with rasterio.open(fu_path) as src_fluvial: # Open the fluvial data
fluvial_data, fluvial_transform = mask(src_fluvial, features_utm.geometry, crop=True)
fluvial_data = fluvial_data[0]
fluvial_affine = fluvial_transform
fluvial_resolution = abs(fluvial_transform[0] * fluvial_transform[4])
with rasterio.open(wsf_path) as src_wsf:
wsf_data, wsf_transform = mask(src_wsf, features_utm.geometry, crop=True)
wsf_data = wsf_data[0]
wsf_affine = wsf_transform
# Ensure both arrays have the same shape
min_height = min(fluvial_data.shape[0], wsf_data.shape[0])
min_width = min(fluvial_data.shape[1], wsf_data.shape[1])
fluvial_data = fluvial_data[:min_height, :min_width]
wsf_data = wsf_data[:min_height, :min_width]
# Get unique years from the WSF data
unique_years = np.unique(wsf_data)
unique_years = unique_years[unique_years != 0]
unique_years = unique_years[unique_years != 1985]
stats_by_year = {}
for year in unique_years:
masked_wsf_data = np.where(wsf_data == year, 1, 0)
masked_flooded_data = masked_wsf_data * fluvial_data
stats = zonal_stats(features_utm.geometry, masked_flooded_data, affine=fluvial_transform, stats="sum", nodata=-9999)
area = stats[0]['sum'] * fluvial_resolution
stats_by_year[year] = area
return stats_by_year
else:
print("Flood or WSF menu not selected.")
return None
#Do the same for comb files
#Do the same for coastal files
#Population Pluvial #Bramka's reference
def normalize_raster(src, dst_shape):
"""
Normalize the raster to a consistent shape using WarpedVRT.
"""
src_profile = src.profile
src_profile['count'] = 1 # Ensure only one band is read
with WarpedVRT(src, **src_profile) as vrt:
data = vrt.read(1, out_shape=dst_shape)
return data
def get_pu_pop_norm():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
if menu.get('flood') and menu.get('population'):
pu_path = os.path.join(output_folder, f"{city}_pluvial_2020_lt1.tif")
try:
with rasterio.open(pu_path) as pu_src:
pu_shape = pu_src.shape
merged_pluvial_data = pu_src.read(1)
merged_pluvial_data_transform = pu_src.transform
except Exception as e:
print(f"Error opening merged pluvial data raster: {e}")
return
pop_path = os.path.join(output_folder, f"{city}_population.tif")
try:
with rasterio.open(pop_path) as pop_src:
pop_data = normalize_raster(pop_src, pu_shape)
pop_transform = pop_src.transform
except Exception as e:
print(f"Error opening population raster: {e}")
return
pop_data_clipped = np.clip(pop_data, 0, None)
pop_percentile_60 = np.percentile(pop_data_clipped, 60)
total_count = np.sum((merged_pluvial_data == 1) & (pop_data_clipped > pop_percentile_60))
total_pixels = np.sum(merged_pluvial_data == 1)
percentage = (total_count / total_pixels) * 100
print(f"{percentage:.2f}% of densely populated areas are located within the rainwater flood risk zone with a minimum depth of 15 cm")
csv_path = os.path.join(output_folder, 'pu_pop_area.csv')
df = pd.DataFrame({'File Name': 'Combined', 'Percentage': [percentage]})
df.to_csv(csv_path, index=False)
print(f"Result saved to {csv_path}")
else:
print("Flood or population menu not selected.")
#Population Fluvial
def normalize_raster(src, dst_shape):
"""
Normalize the raster to a consistent shape using WarpedVRT.
"""
src_profile = src.profile
src_profile['count'] = 1 # Ensure only one band is read
with WarpedVRT(src, **src_profile) as vrt:
data = vrt.read(1, out_shape=dst_shape)
return data
def get_fu_pop_norm():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
if menu.get('flood') and menu.get('population'):
fu_path = os.path.join(output_folder, f"{city}_fluvial_2020_lt1.tif")
try:
with rasterio.open(fu_path) as fu_src:
fu_shape = fu_src.shape
merged_fluvial_data = fu_src.read(1)
merged_fluvial_data_transform = fu_src.transform
except Exception as e:
print(f"Error opening merged fluvial data raster: {e}")
return
pop_path = os.path.join(output_folder, f"{city}_population.tif")
try:
with rasterio.open(pop_path) as pop_src:
pop_data = normalize_raster(pop_src, fu_shape)
pop_transform = pop_src.transform
except Exception as e:
print(f"Error opening population raster: {e}")
return
pop_data_clipped = np.clip(pop_data, 0, None)
pop_percentile_60 = np.percentile(pop_data_clipped, 60) #60
total_count = np.sum((merged_fluvial_data == 1) & (pop_data_clipped > pop_percentile_60))
total_pixels = np.sum(merged_fluvial_data == 1)
percentage = (total_count / total_pixels) * 100
print(f"{percentage:.2f}% of densely populated areas are located within the fluvial flood risk zone with a minimum depth of 10 cm")
csv_path = os.path.join(output_folder, 'fu_pop_area.csv')
df = pd.DataFrame({'File Name': 'Combined', 'Percentage': [percentage]})
df.to_csv(csv_path, index=False)
print(f"Result saved to {csv_path}")
else:
print("Flood or population menu not selected.")
# Pu Amenities
def get_pu_am():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
if menu.get('flood') and menu.get('amenities'):
pu_path = os.path.join(output_folder, f"{city}_pluvial_2020_lt1.tif")
try:
with rasterio.open(pu_path) as pu_src:
merged_pluvial_data = pu_src.read(1)
merged_pluvial_data_transform = pu_src.transform
merged_pluvial_data_shape = merged_pluvial_data.shape
except Exception as e:
print(f"Error opening merged pluvial data raster: {e}")
return
stats_list = []
for category in ['health', 'police', 'fire','schools']:
shapefile_path = os.path.join(output_folder, f"{city}_osm_{category}", f"{city}_osm_{category}.shp")
try:
amenities = gpd.read_file(shapefile_path)
with rasterio.open(pu_path) as src:
affine = src.transform
stats = zonal_stats(amenities, merged_pluvial_data, nodata=0, affine=affine, stats=["count"], geojson_out=True)
count_overlap = sum([feature["properties"]["count"] for feature in stats])
total_count = len(amenities)
percentage = (count_overlap / total_count) * 100
stats_list.append({'Category': category, 'Overlap': count_overlap, 'Total': total_count, 'Percentage': percentage})
print(f"{count_overlap} of {total_count} ({percentage:.2f}%) {category} are located in a riverine flood risk zone with a minimum depth of 15 cm.")
except Exception as e:
if category == 'fire':
print("Fire stations do not exist")
else:
print(f"Error processing {category} shapefile: {e}")
df = pd.DataFrame(stats_list)
excel_file = os.path.join(output_folder, 'pu_osmpt.xlsx')
df.to_excel(excel_file, index=False)
print(f"Statistics saved to {excel_file}")
# Suppress warnings
warnings.filterwarnings("ignore", category=rasterio.errors.NotGeoreferencedWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
#PU Roads
'''
def get_pu_roads():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
# File paths
pu_path = os.path.join(output_folder, f"{city}_pluvial_2020_lt1.tif")
roads_path = os.path.join('output/goris_edges.shp')
# Read pluvial data
try:
with rasterio.open(pu_path) as pu_src:
merged_pluvial_data = pu_src.read(1)
transform = pu_src.transform # Get the affine transformation
nodata_value = pu_src.nodata # Get the nodata value
except Exception as e:
print(f"Error opening merged pluvial data raster: {e}")
return
# Read roads data
try:
roads = gpd.read_file(roads_path)
except Exception as e:
print(f"Error reading road network shapefile: {e}")
return
# Filter highways from the roads dataset
highways = roads[roads['highway'] == 'primary']
# Convert length to meters
highways['length_m'] = highways['geometry'].length
# Get the total length of highways
total_length_highways = highways['length_m'].sum()
# Perform zonal stats to get length of flooded highways
try:
stats = zonal_stats(highways.geometry, merged_pluvial_data, affine=transform, nodata=nodata_value, stats="max")
total_length_flooded_highways = sum([s['max'] for s in stats if s['max'] is not None])
print(f"Total length of highways flooded due to pluvial conditions: {total_length_flooded_highways:.2f} meters")
# Calculate percentage of flooded roads
if total_length_highways > 0:
percentage_flooded_roads = (total_length_flooded_highways / total_length_highways) * 100
print(f"Percentage of highways flooded due to pluvial conditions: {percentage_flooded_roads:.2f}%")
else:
print("No highways found.")
except Exception as e:
print(f"Error calculating zonal statistics: {e}")
return
# Call the function
get_pu_roads()
'''
'\ndef get_pu_roads():\n with open("global_inputs.yml", \'r\') as f:\n global_inputs = yaml.safe_load(f)\n\n # File paths\n pu_path = os.path.join(output_folder, f"{city}_pluvial_2020_lt1.tif")\n roads_path = os.path.join(\'output/goris_edges.shp\')\n\n # Read pluvial data\n try:\n with rasterio.open(pu_path) as pu_src:\n merged_pluvial_data = pu_src.read(1)\n transform = pu_src.transform # Get the affine transformation\n nodata_value = pu_src.nodata # Get the nodata value\n except Exception as e:\n print(f"Error opening merged pluvial data raster: {e}")\n return\n\n # Read roads data\n try:\n roads = gpd.read_file(roads_path)\n except Exception as e:\n print(f"Error reading road network shapefile: {e}")\n return\n # Filter highways from the roads dataset\n highways = roads[roads[\'highway\'] == \'primary\']\n\n # Convert length to meters\n highways[\'length_m\'] = highways[\'geometry\'].length\n\n # Get the total length of highways\n total_length_highways = highways[\'length_m\'].sum()\n\n # Perform zonal stats to get length of flooded highways\n try:\n stats = zonal_stats(highways.geometry, merged_pluvial_data, affine=transform, nodata=nodata_value, stats="max")\n total_length_flooded_highways = sum([s[\'max\'] for s in stats if s[\'max\'] is not None])\n print(f"Total length of highways flooded due to pluvial conditions: {total_length_flooded_highways:.2f} meters")\n \n # Calculate percentage of flooded roads\n if total_length_highways > 0:\n percentage_flooded_roads = (total_length_flooded_highways / total_length_highways) * 100\n print(f"Percentage of highways flooded due to pluvial conditions: {percentage_flooded_roads:.2f}%")\n else:\n print("No highways found.")\n \n except Exception as e:\n print(f"Error calculating zonal statistics: {e}")\n return\n\n# Call the function\nget_pu_roads()\n'
#Pluvial Roads
def get_pu_roads():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
pu_path = os.path.join(output_folder, f"{city}_pluvial_2020_lt1.tif")
roads_path = os.path.join('output/goris_edges.shp')
try:
with rasterio.open(pu_path) as pu_src:
merged_pluvial_data = pu_src.read(1)
transform = pu_src.transform
nodata_value = pu_src.nodata
shapes_gen = shapes(merged_pluvial_data, transform=transform)
merged_pluvial_polygons = [shape(shape_item) for shape_item, _ in shapes_gen]
pluvial_geometry = gpd.GeoDataFrame(geometry=merged_pluvial_polygons, crs=pu_src.crs)
except Exception as e:
print(f"Error opening merged pluvial data raster: {e}")
return
try:
roads = gpd.read_file(roads_path)
except Exception as e:
print(f"Error reading road network shapefile: {e}")
return
roads = roads.to_crs(pluvial_geometry.crs)
# Filter highways with specific keywords in the "highway" column
highways_filtered = roads[(roads["highway"] == 'primary') |
(roads["highway"] == 'trunk') |
(roads["highway"] == 'motorway')]
# Explode the filtered highways to smaller fragments
exploded_highways = highways_filtered.explode(index_parts=False)
intersections = gpd.overlay(exploded_highways, pluvial_geometry, how='intersection')
intersections['length'] = intersections.length
total_length = intersections['length'].sum()
total_length_highways = exploded_highways['length'].sum()
percentage = (total_length / total_length_highways) * 100
print(f"Total length of highways intersecting pluvial data: {total_length:.2f} meters")
print(f"Percentage of highways intersecting pluvial data: {percentage:.4f}%")
# Plot fluvial data
with rasterio.open(pu_path) as pluvial_src:
show(pluvial_src, ax=ax, cmap='Blues', alpha=0.5)
# Plot fluvial roads
exploded_highways.plot(ax=ax, facecolor='none', edgecolor='red')
features.plot(ax=ax, facecolor='none', edgecolor='red')
pluvial_geometry.plot(ax=ax, facecolor='none', edgecolor='blue')
ax.set_title("pluvial Roads and pluvial Data")
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
plt.show()
# Fu Amenities
def get_fu_am():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
if menu.get('flood') and menu.get('amenities'):
fu_path = os.path.join(output_folder, f"{city}_fluvial_2020_lt1.tif")
try:
with rasterio.open(fu_path) as fu_src:
merged_fluvial_data = fu_src.read(1)
merged_fluvial_transform = fu_src.transform
merged_fluvial_shape = merged_fluvial_data.shape
except Exception as e:
print(f"Error opening merged fluvial data raster: {e}")
return
stats_list = []
for category in ['health', 'police', 'fire','schools']:
shapefile_path = os.path.join(output_folder, f"{city}_osm_{category}", f"{city}_osm_{category}.shp")
try:
amenities = gpd.read_file(shapefile_path)
with rasterio.open(fu_path) as src:
affine = src.transform
stats = zonal_stats(amenities, merged_fluvial_data, nodata=0, affine=affine, stats=["count"], geojson_out=True)
count_overlap = sum([feature["properties"]["count"] for feature in stats])
total_count = len(amenities)
percentage = (count_overlap / total_count) * 100
stats_list.append({'Category': category, 'Overlap': count_overlap, 'Total': total_count, 'Percentage': percentage})
print(f"{count_overlap} of {total_count} ({percentage:.2f}%) {category} are located in a riverine flood risk zone with a minimum depth of 15 cm.")
except Exception as e:
if category == 'fire':
print("Fire stations do not exist")
else:
print(f"Error processing {category} shapefile: {e}")
df = pd.DataFrame(stats_list)
excel_file = os.path.join(output_folder, 'fu_osmpt.xlsx')
df.to_excel(excel_file, index=False)
print(f"Statistics saved to {excel_file}")
# Suppress warnings
warnings.filterwarnings("ignore", category=rasterio.errors.NotGeoreferencedWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
#FU Roads
def get_fu_roads():
with open("global_inputs.yml", 'r') as f:
global_inputs = yaml.safe_load(f)
# File paths
fu_path = os.path.join(output_folder, f"{city}_fluvial_2020_lt1.tif")
roads_path = os.path.join('output/goris_edges.shp')
# Read fluvial data
try:
with rasterio.open(fu_path) as fu_src:
merged_fluvial_data = fu_src.read(1)
transform = fu_src.transform # Get the affine transformation
nodata_value = fu_src.nodata # Get the nodata value
except Exception as e:
print(f"Error opening merged pluvial data raster: {e}")
return
# Read roads data
try:
roads = gpd.read_file(roads_path)
except Exception as e:
print(f"Error reading road network shapefile: {e}")
return
# Filter highways from the roads dataset
highways = roads[roads['highway'] == 'primary']
# Convert length to meters
highways['length_m'] = highways['geometry'].length
# Get the total length of highways
total_length_highways = highways['length_m'].sum()
# Perform zonal stats to get length of flooded highways
try:
stats = zonal_stats(highways.geometry, merged_fluvial_data, affine=transform, nodata=nodata_value, stats="max")
total_length_flooded_highways = sum([s['max'] for s in stats if s['max'] is not None])
print(f"Total length of highways flooded due to pluvial conditions: {total_length_flooded_highways:.2f} meters")
# Calculate percentage of flooded roads
if total_length_highways > 0:
percentage_flooded_roads = (total_length_flooded_highways / total_length_highways) * 100
print(f"Percentage of highways flooded due to pluvial conditions: {percentage_flooded_roads:.2f}%")
else:
print("No highways found.")
except Exception as e:
print(f"Error calculating zonal statistics: {e}")
return
# Call the function
#Comb WSF
#Comb Population
#Comb Amenities
#Coastal WSF
#Coastal Population
#Coastal Amenities