#!/usr/bin/env python # coding: utf-8 # # Test filters on masked arrays # In[1]: import numpy as np from openpiv import filters from openpiv.lib import replace_nans from typing import Optional, Tuple # In[2]: def replace_outliers( u: np.ndarray, v: np.ndarray, invalid_mask: np.ndarray, grid_mask: np.ndarray, w: Optional[np.ndarray]=None, method: str="localmean", max_iter: int=5, tol: float=1e-3, kernel_size: int=1, )-> Tuple[np.ndarray, np.ndarray,np.ndarray]: """Replace invalid vectors in an velocity field using an iterative image inpainting algorithm. The algorithm is the following: 1) For each element in the arrays of the ``u`` and ``v`` components, replace it by a weighted average of the neighbouring elements which are not invalid themselves. The weights depends of the method type. If ``method=localmean`` weight are equal to 1/( (2*kernel_size+1)**2 -1 ) 2) Several iterations are needed if there are adjacent invalid elements. If this is the case, inforation is "spread" from the edges of the missing regions iteratively, until the variation is below a certain threshold. Parameters ---------- u : 2d or 3d np.ndarray the u velocity component field v : 2d or 3d np.ndarray the v velocity component field w : 2d or 3d np.ndarray the w velocity component field max_iter : int the number of iterations kernel_size : int the size of the kernel, default is 1 method : str the type of kernel used for repairing missing vectors Returns ------- uf : 2d or 3d np.ndarray the smoothed u velocity component field, where invalid vectors have been replaced vf : 2d or 3d np.ndarray the smoothed v velocity component field, where invalid vectors have been replaced wf : 2d or 3d np.ndarray the smoothed w velocity component field, where invalid vectors have been replaced """ # we shall now replace NaNs only at invalid_mask positions, # regardless the grid_mask (which is a user-provided masked region) u[invalid_mask] = np.nan v[invalid_mask] = np.nan wf = np.empty_like(u) uf = replace_nans( u, method=method, max_iter=max_iter, tol=tol, kernel_size=kernel_size ) vf = replace_nans( v, method=method, max_iter=max_iter, tol=tol, kernel_size=kernel_size ) if isinstance(w, np.ndarray): w[invalid_mask] = np.nan wf = replace_nans( w, method=method, max_iter=max_iter, tol=tol, kernel_size=kernel_size ) # reinforce grid_mask uf = np.ma.masked_array(uf, mask=grid_mask) vf = np.ma.masked_array(vf, mask=grid_mask) wf = np.ma.masked_array(wf, mask=grid_mask) return uf, vf, wf # In[3]: # bottom right corner is masked u = np.ma.copy(np.ones((5,5))) u[3:,3:] = np.ma.masked u[1,1] = np.nan grid_mask = u.mask.copy() u, grid_mask # In[4]: # verify that replace_nans does not know about the mask replace_nans(u, 1, 1e-3) # In[5]: u, grid_mask # In[6]: # but if we put nan on the masked region, it # destroys the mask and the replace_nans also works there # we shall later re-enforce the mask u[3,3] = np.nan u, grid_mask # In[7]: invalid_mask = np.isnan(u.data) invalid_mask # In[8]: uf,_,_ = replace_outliers(u,u,invalid_mask,grid_mask) print(f'u \n {u}') print(f'uf \n',uf)