import numpy as np
from openpiv import filters
from openpiv.lib import replace_nans
from typing import Optional, Tuple
def replace_outliers(
u: np.ndarray,
v: np.ndarray,
invalid_mask: np.ndarray,
grid_mask: np.ndarray,
w: Optional[np.ndarray]=None,
method: str="localmean",
max_iter: int=5,
tol: float=1e-3,
kernel_size: int=1,
)-> Tuple[np.ndarray, np.ndarray,np.ndarray]:
"""Replace invalid vectors in an velocity field using an iterative image
inpainting algorithm.
The algorithm is the following:
1) For each element in the arrays of the ``u`` and ``v`` components,
replace it by a weighted average
of the neighbouring elements which are not invalid themselves. The
weights depends of the method type. If ``method=localmean`` weight
are equal to 1/( (2*kernel_size+1)**2 -1 )
2) Several iterations are needed if there are adjacent invalid elements.
If this is the case, inforation is "spread" from the edges of the
missing regions iteratively, until the variation is below a certain
threshold.
Parameters
----------
u : 2d or 3d np.ndarray
the u velocity component field
v : 2d or 3d np.ndarray
the v velocity component field
w : 2d or 3d np.ndarray
the w velocity component field
max_iter : int
the number of iterations
kernel_size : int
the size of the kernel, default is 1
method : str
the type of kernel used for repairing missing vectors
Returns
-------
uf : 2d or 3d np.ndarray
the smoothed u velocity component field, where invalid vectors have
been replaced
vf : 2d or 3d np.ndarray
the smoothed v velocity component field, where invalid vectors have
been replaced
wf : 2d or 3d np.ndarray
the smoothed w velocity component field, where invalid vectors have
been replaced
"""
# we shall now replace NaNs only at invalid_mask positions,
# regardless the grid_mask (which is a user-provided masked region)
u[invalid_mask] = np.nan
v[invalid_mask] = np.nan
wf = np.empty_like(u)
uf = replace_nans(
u, method=method, max_iter=max_iter, tol=tol,
kernel_size=kernel_size
)
vf = replace_nans(
v, method=method, max_iter=max_iter, tol=tol,
kernel_size=kernel_size
)
if isinstance(w, np.ndarray):
w[invalid_mask] = np.nan
wf = replace_nans(
w, method=method, max_iter=max_iter, tol=tol,
kernel_size=kernel_size
)
# reinforce grid_mask
uf = np.ma.masked_array(uf, mask=grid_mask)
vf = np.ma.masked_array(vf, mask=grid_mask)
wf = np.ma.masked_array(wf, mask=grid_mask)
return uf, vf, wf
# bottom right corner is masked
u = np.ma.copy(np.ones((5,5)))
u[3:,3:] = np.ma.masked
u[1,1] = np.nan
grid_mask = u.mask.copy()
u, grid_mask
(masked_array( data=[[1.0, 1.0, 1.0, 1.0, 1.0], [1.0, nan, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, --, --], [1.0, 1.0, 1.0, --, --]], mask=[[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, True, True], [False, False, False, True, True]], fill_value=1e+20), array([[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, True, True], [False, False, False, True, True]]))
# verify that replace_nans does not know about the mask
replace_nans(u, 1, 1e-3)
masked_array( data=[[1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, --, --], [1.0, 1.0, 1.0, --, --]], mask=[[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, True, True], [False, False, False, True, True]], fill_value=1e+20)
u, grid_mask
(masked_array( data=[[1.0, 1.0, 1.0, 1.0, 1.0], [1.0, nan, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, --, --], [1.0, 1.0, 1.0, --, --]], mask=[[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, True, True], [False, False, False, True, True]], fill_value=1e+20), array([[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, True, True], [False, False, False, True, True]]))
# but if we put nan on the masked region, it
# destroys the mask and the replace_nans also works there
# we shall later re-enforce the mask
u[3,3] = np.nan
u, grid_mask
(masked_array( data=[[1.0, 1.0, 1.0, 1.0, 1.0], [1.0, nan, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, nan, --], [1.0, 1.0, 1.0, --, --]], mask=[[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, True], [False, False, False, True, True]], fill_value=1e+20), array([[False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, True, True], [False, False, False, True, True]]))
invalid_mask = np.isnan(u.data)
invalid_mask
array([[False, False, False, False, False], [False, True, False, False, False], [False, False, False, False, False], [False, False, False, True, False], [False, False, False, False, False]])
uf,_,_ = replace_outliers(u,u,invalid_mask,grid_mask)
print(f'u \n {u}')
print(f'uf \n',uf)
u [[1.0 1.0 1.0 1.0 1.0] [1.0 nan 1.0 1.0 1.0] [1.0 1.0 1.0 1.0 1.0] [1.0 1.0 1.0 nan --] [1.0 1.0 1.0 -- --]] uf [[1.0 1.0 1.0 1.0 1.0] [1.0 1.0 1.0 1.0 1.0] [1.0 1.0 1.0 1.0 1.0] [1.0 1.0 1.0 -- --] [1.0 1.0 1.0 -- --]]