"""Spatial statistical tools to estimate uncertainties related to DEMs"""
from __future__ import annotations
import math as m
import multiprocessing as mp
import os
import random
import warnings
from functools import partial
from typing import Callable, Union, Iterable, Optional, Sequence, Any
import itertools
import matplotlib.pyplot as plt
from numba import njit
import numpy as np
import pandas as pd
from scipy import integrate
from scipy.optimize import curve_fit
from skimage.draw import disk
from scipy.stats import binned_statistic, binned_statistic_2d, binned_statistic_dd
from xdem.spatial_tools import nmad
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
import skgstat as skg
from skgstat import models
[docs]def nd_binning(values: np.ndarray, list_var: Iterable[np.ndarray], list_var_names=Iterable[str], list_var_bins: Optional[Union[int,Iterable[Iterable]]] = None,
statistics: Iterable[Union[str, Callable, None]] = ['count', np.nanmedian ,nmad], list_ranges : Optional[Iterable[Sequence]] = None) \
-> pd.DataFrame:
"""
N-dimensional binning of values according to one or several explanatory variables.
Values input is a (N,) array and variable input is a list of flattened arrays of similar dimensions (N,).
For more details on the format of input variables, see documentation of scipy.stats.binned_statistic_dd.
:param values: values array (N,)
:param list_var: list (L) of explanatory variables array (N,)
:param list_var_names: list (L) of names of the explanatory variables
:param list_var_bins: count, or list (L) of counts or custom bin edges for the explanatory variables; defaults to 10 bins
:param statistics: list (X) of statistics to be computed; defaults to count, median and nmad
:param list_ranges: list (L) of minimum and maximum ranges to bin the explanatory variables; defaults to min/max of the data
:return:
"""
# we separate 1d, 2d and nd binning, because propagating statistics between different dimensional binning is not always feasible
# using scipy because it allows for several dimensional binning, while it's not straightforward in pandas
if list_var_bins is None:
list_var_bins = (10,) * len(list_var_names)
elif isinstance(list_var_bins,int):
list_var_bins = (list_var_bins,) * len(list_var_names)
# flatten the arrays if this has not been done by the user
values = values.ravel()
list_var = [var.ravel() for var in list_var]
statistics_name = [f if isinstance(f,str) else f.__name__ for f in statistics]
# get binned statistics in 1d: a simple loop is sufficient
list_df_1d = []
for i, var in enumerate(list_var):
df_stats_1d = pd.DataFrame()
# get statistics
for j, statistic in enumerate(statistics):
stats_binned_1d, bedges_1d = binned_statistic(var,values,statistic=statistic,bins=list_var_bins[i],range=list_ranges)[:2]
# save in a dataframe
df_stats_1d[statistics_name[j]] = stats_binned_1d
# we need to get the middle of the bins from the edges, to get the same dimension length
df_stats_1d[list_var_names[i]] = pd.IntervalIndex.from_breaks(bedges_1d,closed='left')
# report number of dimensions used
df_stats_1d['nd'] = 1
list_df_1d.append(df_stats_1d)
# get binned statistics in 2d: all possible 2d combinations
list_df_2d = []
if len(list_var)>1:
combs = list(itertools.combinations(list_var_names, 2))
for i, comb in enumerate(combs):
var1_name, var2_name = comb
# corresponding variables indexes
i1, i2 = list_var_names.index(var1_name), list_var_names.index(var2_name)
df_stats_2d = pd.DataFrame()
for j, statistic in enumerate(statistics):
stats_binned_2d, bedges_var1, bedges_var2 = binned_statistic_2d(list_var[i1],list_var[i2],values,statistic=statistic
,bins=[list_var_bins[i1],list_var_bins[i2]]
,range=list_ranges)[:3]
# get statistics
df_stats_2d[statistics_name[j]] = stats_binned_2d.flatten()
# derive interval indexes and convert bins into 2d indexes
ii1 = pd.IntervalIndex.from_breaks(bedges_var1,closed='left')
ii2 = pd.IntervalIndex.from_breaks(bedges_var2,closed='left')
df_stats_2d[var1_name] = [i1 for i1 in ii1 for i2 in ii2]
df_stats_2d[var2_name] = [i2 for i1 in ii1 for i2 in ii2]
# report number of dimensions used
df_stats_2d['nd'] = 2
list_df_2d.append(df_stats_2d)
# get binned statistics in nd, without redoing the same stats
df_stats_nd = pd.DataFrame()
if len(list_var)>2:
for j, statistic in enumerate(statistics):
stats_binned_2d, list_bedges = binned_statistic_dd(list_var,values,statistic=statistic,bins=list_var_bins,range=list_ranges)[0:2]
df_stats_nd[statistics_name[j]] = stats_binned_2d.flatten()
list_ii = []
# loop through the bin edges and create IntervalIndexes from them (to get both
for bedges in list_bedges:
list_ii.append(pd.IntervalIndex.from_breaks(bedges,closed='left'))
# create nd indexes in nd-array and flatten for each variable
iind = np.meshgrid(*list_ii)
for i, var_name in enumerate(list_var_names):
df_stats_nd[var_name] = iind[i].flatten()
# report number of dimensions used
df_stats_nd['nd'] = len(list_var_names)
# concatenate everything
list_all_dfs = list_df_1d + list_df_2d + [df_stats_nd]
df_concat = pd.concat(list_all_dfs)
# commenting for now: pd.MultiIndex can be hard to use
# df_concat = df_concat.set_index(list_var_names)
return df_concat
[docs]def get_empirical_variogram(dh: np.ndarray, coords: np.ndarray, **kwargs) -> pd.DataFrame:
"""
Get empirical variogram from skgstat.Variogram object
:param dh: elevation differences
:param coords: coordinates
:return: empirical variogram (variance, lags, counts)
"""
# deriving empirical variogram variance, bin, and count
try:
V = skg.Variogram(coordinates=coords, values=dh, normalize=False, **kwargs)
# return V.to_DataFrame()
exp = V.experimental
bins = V.bins
count = np.zeros(V.n_lags)
tmp_count = np.fromiter((g.size for g in V.lag_classes()), dtype=int)
count[0:len(tmp_count)] = tmp_count
# there are still some exceptions not well handled by skgstat
except ZeroDivisionError:
n_lags = kwargs.get('n_lags') or 10
exp, bins, count = (np.zeros(n_lags)*np.nan for i in range(3))
df = pd.DataFrame()
df = df.assign(exp=exp, bins=bins, count=count)
return df
[docs]def wrapper_get_empirical_variogram(argdict: dict, **kwargs) -> pd.DataFrame:
"""
Multiprocessing wrapper for get_empirical_variogram
:param argdict: Keyword argument to pass to get_empirical_variogram()
:return: empirical variogram (variance, lags, counts)
"""
print('Working on subsample '+str(argdict['i']) + ' out of '+str(argdict['max_i']))
return get_empirical_variogram(dh=argdict['dh'], coords=argdict['coords'], **kwargs)
[docs]def random_subset(dh: np.ndarray, coords: np.ndarray, nsamp: int) -> tuple[Union[np.ndarray, Any], Union[np.ndarray, Any]]:
"""
Subsampling of elevation differences with random coordinates
:param dh: elevation differences
:param coords: coordinates
:param nsamp: number of sammples for subsampling
:return: subsets of dh and coords
"""
if len(coords) > nsamp:
# TODO: maybe we can also introduce something to sample without replacement between all samples?
subset = np.random.choice(len(coords), nsamp, replace=False)
coords_sub = coords[subset]
dh_sub = dh[subset]
else:
coords_sub = coords
dh_sub = dh
return dh_sub, coords_sub
[docs]def create_circular_mask(shape: Union[int, Sequence[int]], center: Optional[list[float]] = None, radius: Optional[float] = None) -> np.ndarray:
"""
Create circular mask on a raster, defaults to the center of the array and it's half width
:param shape: shape of array
:param center: center
:param radius: radius
:return:
"""
w, h = shape
if center is None: # use the middle of the image
center = (int(w / 2), int(h / 2))
if radius is None: # use the smallest distance between the center and image walls
radius = min(center[0], center[1], w - center[0], h - center[1])
# skimage disk is not inclusive (correspond to distance_from_center < radius and not <= radius)
mask = np.zeros(shape, dtype=bool)
rr, cc = disk(center=center,radius=radius,shape=shape)
mask[rr, cc] = True
# manual solution
# Y, X = np.ogrid[:h, :w]
# dist_from_center = np.sqrt((X - center[0]) ** 2 + (Y - center[1]) ** 2)
# mask = dist_from_center < radius
return mask
[docs]def create_ring_mask(shape: Union[int, Sequence[int]], center: Optional[list[float]] = None, in_radius: float = 0., out_radius: Optional[float] = None) -> np.ndarray:
"""
Create ring mask on a raster, defaults to the center of the array and a circle mask of half width of the array
:param shape: shape of array
:param center: center
:param in_radius: inside radius
:param out_radius: outside radius
:return:
"""
w, h = shape
if out_radius is None:
center = (int(w / 2), int(h / 2))
out_radius = min(center[0], center[1], w - center[0], h - center[1])
mask_inside = create_circular_mask((w,h),center=center,radius=in_radius)
mask_outside = create_circular_mask((w,h),center=center,radius=out_radius)
mask_ring = np.logical_and(~mask_inside,mask_outside)
return mask_ring
[docs]def ring_subset(dh: np.ndarray, coords: np.ndarray, inside_radius: float = 0, outside_radius: float = 0) -> tuple[Union[np.ndarray, Any], Union[np.ndarray, Any]]:
"""
Subsampling of elevation differences within a ring/disk (to sample points at similar pairwise distances)
:param dh: elevation differences
:param coords: coordinates
:param inside_radius: radius of inside ring disk in pixels
:param outside_radius: radius of outside ring disk in pixels
:return: subsets of dh and coords
"""
# select random center coordinates
nx, ny = np.shape(dh)
center_x = np.random.choice(nx, 1)
center_y = np.random.choice(ny, 1)
mask_ring = create_ring_mask((nx,ny),center=(center_x,center_y),in_radius=inside_radius,out_radius=outside_radius)
dh_ring = dh[mask_ring]
coords_ring = coords[mask_ring]
return dh_ring, coords_ring
[docs]def sample_multirange_empirical_variogram(dh: np.ndarray, gsd: float = None, coords: np.ndarray = None,
nsamp: int = 10000, range_list: list = None, nrun: int = 1, nproc: int = 1,
**kwargs) -> pd.DataFrame:
"""
Wrapper to sample multi-range empirical variograms from the data.
If no option is passed, a varying binning is used with adapted ranges and data subsampling
:param dh: elevation differences
:param gsd: ground sampling distance (if array is 2D on structured grid)
:param coords: coordinates, to be used only with a flattened elevation differences array and passed as an array of \the pairs of coordinates: one dimension equal to two and the other to that of the flattened elevation differences
:param range_list: successive ranges with even binning
:param nsamp: number of samples to randomly draw from the elevation differences
:param nrun: number of samplings
:param nproc: number of processing cores
:return: empirical variogram (variance, lags, counts)
"""
# checks
dh = dh.squeeze()
if coords is None and gsd is None:
raise TypeError('Must provide either coordinates or ground sampling distance.')
elif gsd is not None and dh.ndim == 1:
raise TypeError('Array must be 2-dimensional when providing only ground sampling distance')
elif coords is not None and dh.ndim != 1:
raise TypeError('Coordinate array must be provided with 1-dimensional input array')
elif coords is not None and (coords.shape[0] != 2 and coords.shape[1] != 2):
raise TypeError('One dimension of the coordinates array must be of length equal to 2')
# defaulting to xx and yy if those are provided
if coords is not None:
if coords.shape[0] == 2 and coords.shape[1] != 2:
coords = np.transpose(coords)
else:
x, y = np.meshgrid(np.arange(0, dh.shape[0] * gsd, gsd), np.arange(0, dh.shape[1] * gsd, gsd))
coords = np.dstack((x.flatten(), y.flatten())).squeeze()
dh = dh.flatten()
# COMMENTING: custom binning is not supported by skgstat yet...
# if no range list is specified, define a default one based on the spatial extent of the data and its resolution
# if 'bin_func' not in kwargs.keys():
# if range_list is None:
#
# # define max range as half the maximum distance between coordinates
# max_range = np.sqrt((np.max(coords[:,0])-np.min(coords[:,0]))**2+(np.max(coords[:,1])-np.min(coords[:,1]))**2)/2
#
# # get the ground sampling distance
# if gsd is None:
# est_gsd = np.abs(coords[0,0] - coords[0,1])
# else:
# est_gsd = gsd
#
# # define ranges as multiple of the resolution until they get close to the maximum range
# range_list = []
# new_range = gsd
# while new_range < max_range/10:
# range_list.append(new_range)
# new_range *= 10
# range_list.append(max_range)
#
# else:
# if range_list is not None:
# print('Both range_list and bin_func are defined for binning: defaulting to bin_func')
# default value we want to use (kmeans is failing)
if 'bin_func' not in kwargs.keys():
kwargs.update({'bin_func': 'even'})
if 'n_lags' not in kwargs.keys():
kwargs.update({'n_lags': 100})
# estimate variogram
if nrun == 1:
# subsetting
dh_sub, coords_sub = random_subset(dh, coords, nsamp)
# getting empirical variogram
print(dh_sub.shape)
print(coords_sub.shape)
df = get_empirical_variogram(dh=dh_sub, coords=coords_sub, **kwargs)
df['exp_sigma'] = np.nan
else:
# multiple run only work for an even binning function for now (would need a customized binning not supported by skgstat)
if kwargs.get('bin_func') is None:
raise ValueError('Binning function must be "even" when doing multiple runs.')
# define max range as half the maximum distance between coordinates
max_range = np.sqrt((np.max(coords[:, 0])-np.min(coords[:, 0]))**2 +
(np.max(coords[:, 1])-np.min(coords[:, 1]))**2)/2
# also need a cutoff value to get the exact same bins
if 'maxlag' not in kwargs.keys():
kwargs.update({'maxlag': max_range})
# TODO: somewhere here we could think of adding random sampling without replacement
if nproc == 1:
print('Using 1 core...')
list_df_nb = []
for i in range(nrun):
dh_sub, coords_sub = random_subset(dh, coords, nsamp)
df = get_empirical_variogram(dh=dh_sub, coords=coords_sub, **kwargs)
df['run'] = i
list_df_nb.append(df)
else:
print('Using '+str(nproc) + ' cores...')
list_dh_sub = []
list_coords_sub = []
for i in range(nrun):
dh_sub, coords_sub = random_subset(dh, coords, nsamp)
list_dh_sub.append(dh_sub)
list_coords_sub.append(coords_sub)
pool = mp.Pool(nproc, maxtasksperchild=1)
argsin = [{'dh': list_dh_sub[i], 'coords': list_coords_sub[i], 'i':i, 'max_i':nrun} for i in range(nrun)]
list_df = pool.map(partial(wrapper_get_empirical_variogram, **kwargs), argsin, chunksize=1)
pool.close()
pool.join()
list_df_nb = []
for i in range(10):
df_nb = list_df[i]
df_nb['run'] = i
list_df_nb.append(df_nb)
df = pd.concat(list_df_nb)
# group results, use mean as empirical variogram, estimate sigma, and sum the counts
df_grouped = df.groupby('bins', dropna=False)
df_mean = df_grouped[['exp']].mean()
df_sig = df_grouped[['exp']].std()
df_count = df_grouped[['count']].sum()
df_mean['bins'] = df_mean.index.values
df_mean['exp_sigma'] = df_sig['exp']
df_mean['count'] = df_count['count']
df = df_mean
return df
[docs]def fit_model_sum_vgm(list_model: list[str], emp_vgm_df: pd.DataFrame) -> tuple[Callable, list[float]]:
"""
Fit a multi-range variogram model to an empirical variogram, weighted based on sampling and elevation errors
:param list_model: list of variogram models to sum for the fit: from short-range to long-ranges
:param emp_vgm_df: empirical variogram
:return: modelled variogram
"""
# TODO: expand to other models than spherical, exponential, gaussian (more than 2 arguments)
def vgm_sum(h, *args):
fn = 0
i = 0
for model in list_model:
fn += skg.models.spherical(h, args[i], args[i+1])
# fn += vgm(h, model=model,crange=args[i],psill=args[i+1])
i += 2
return fn
# use shape of empirical variogram to assess rough boundaries/first estimates
n_average = np.ceil(len(emp_vgm_df.exp.values) / 10)
exp_movaverage = np.convolve(emp_vgm_df.exp.values, np.ones(int(n_average))/n_average, mode='valid')
grad = np.gradient(exp_movaverage, 2)
# maximum variance
max_var = np.max(exp_movaverage)
# to simplify things for scipy, let's provide boundaries and first guesses
p0 = []
bounds = []
for i in range(len(list_model)):
# use largest boundaries possible for our problem
psill_bound = [0, max_var]
range_bound = [0, emp_vgm_df.bins.values[-1]]
# use psill evenly distributed
psill_p0 = ((i+1)/len(list_model))*max_var
# use corresponding ranges
# this fails when no empirical value crosses this (too wide binning/nugget)
# ind = np.array(np.abs(exp_movaverage-psill_p0)).argmin()
# range_p0 = emp_vgm_df.bins.values[ind]
range_p0 = ((i+1)/len(list_model))*emp_vgm_df.bins.values[-1]
# TODO: if adding other variogram models, add condition here
# add bounds and guesses with same order as function arguments
bounds.append(range_bound)
bounds.append(psill_bound)
p0.append(range_p0)
p0.append(psill_p0)
bounds = np.transpose(np.array(bounds))
if np.all(np.isnan(emp_vgm_df.exp_sigma.values)):
valid = ~np.isnan(emp_vgm_df.exp.values)
cof, cov = curve_fit(vgm_sum, emp_vgm_df.bins.values[valid],
emp_vgm_df.exp.values[valid], method='trf', p0=p0, bounds=bounds)
else:
valid = np.logical_and(~np.isnan(emp_vgm_df.exp.values), ~np.isnan(emp_vgm_df.exp_sigma.values))
cof, cov = curve_fit(vgm_sum, emp_vgm_df.bins.values[valid], emp_vgm_df.exp.values[valid],
method='trf', p0=p0, bounds=bounds, sigma=emp_vgm_df.exp_sigma.values[valid])
# rewriting the output function: couldn't find a way to pass this with functool.partial because arguments are unordered
def vgm_sum_fit(h):
fn = 0
i = 0
for model in list_model:
fn += skg.models.spherical(h, cof[i], cof[i+1])
# fn += vgm(h, model=model,crange=args[i],psill=args[i+1])
i += 2
return fn
return vgm_sum_fit, cof
[docs]def exact_neff_sphsum_circular(area: float, crange1: float, psill1: float, crange2: float, psill2: float) -> float:
"""
Number of effective samples derived from exact integration of sum of spherical variogram models over a circular area.
The number of effective samples serves to convert between standard deviation/partial sills and standard error
over the area.
If SE is the standard error, SD the standard deviation and N_eff the number of effective samples, we have:
SE = SD / sqrt(N_eff) => N_eff = SD^2 / SE^2 => N_eff = (PS1 + PS2)/SE^2 where PS1 and PS2 are the partial sills
estimated from the variogram models, and SE is estimated by integrating the variogram models with parameters PS1/PS2
and R1/R2 where R1/R2 are the correlation ranges.
Source: Rolstad et al. (2009), appendix: http://dx.doi.org/10.3189/002214309789470950
:param area: circular area
:param crange1: range of short-range variogram model
:param psill1: partial sill of short-range variogram model
:param crange2: range of long-range variogram model
:param psill2: partial sill of long-range variogram model
:return: number of effective samples
"""
# short range variogram
c1 = psill1 # partial sill
a1 = crange1 # short correlation range
# long range variogram
c1_2 = psill2
a1_2 = crange2 # long correlation range
h_equiv = np.sqrt(area / np.pi)
# hypothesis of a circular shape to integrate variogram model
if h_equiv > a1_2:
std_err = np.sqrt(c1 * a1 ** 2 / (5 * h_equiv ** 2) + c1_2 * a1_2 ** 2 / (5 * h_equiv ** 2))
elif (h_equiv < a1_2) and (h_equiv > a1):
std_err = np.sqrt(c1 * a1 ** 2 / (5 * h_equiv ** 2) + c1_2 * (1-h_equiv / a1_2+1 / 5 * (h_equiv / a1_2) ** 3))
else:
std_err = np.sqrt(c1 * (1-h_equiv / a1+1 / 5 * (h_equiv / a1) ** 3) +
c1_2 * (1-h_equiv / a1_2+1 / 5 * (h_equiv / a1_2) ** 3))
return (psill1 + psill2)/std_err**2
[docs]def neff_circ(area: float, list_vgm: list[tuple[float,str,float]]) -> float:
"""
Number of effective samples derived from numerical integration for any sum of variogram models a circular area
(generalization of Rolstad et al. (2009): http://dx.doi.org/10.3189/002214309789470950)
The number of effective samples N_eff serves to convert between standard deviation/partial sills and standard error
over the area: SE = SD / sqrt(N_eff) if SE is the standard error, SD the standard deviation.
:param area: area
:param list_vgm: variogram functions to sum
:returns: number of effective samples
"""
psill_tot = 0
for vario in list_vgm:
psill_tot += vario[2]
def hcov_sum(h):
fn = 0
for vario in list_vgm:
crange, model, psill = vario
fn += h*(cov(h, crange, model=model, psill=psill))
return fn
h_equiv = np.sqrt(area / np.pi)
full_int = integrate_fun(hcov_sum, 0, h_equiv)
std_err = np.sqrt(2*np.pi*full_int / area)
return psill_tot/std_err**2
[docs]def neff_rect(area: float, width: float, crange1: float, psill1: float, model1: str = 'Sph', crange2: float = None,
psill2: float = None, model2: str = None) -> float:
"""
Number of effective samples derived from numerical integration for a sum of 2 variogram functions over a rectangular area
:param area: area
:param width: width of rectangular area
:param crange1: correlation range of first variogram
:param psill1: partial sill of first variogram
:param model1: model of first variogram
:param crange2: correlation range of second variogram
:param psill2: partial sill of second variogram
:param model2: model of second variogram
:returns: number of effective samples
"""
def hcov_sum(h, crange1=crange1, psill1=psill1, model1=model1, crange2=crange2, psill2=psill2, model2=model2):
if crange2 is None or psill2 is None or model2 is None:
return h*(cov(h, crange1, model=model1, psill=psill1))
else:
return h*(cov(h, crange1, model=model1, psill=psill1)+cov(h, crange2, model=model2, psill=psill2))
width = min(width, area/width)
full_int = integrate_fun(hcov_sum, 0, width/2)
bin_int = np.linspace(width/2, area/width, 100)
for i in range(len(bin_int)-1):
low = bin_int[i]
upp = bin_int[i+1]
mid = bin_int[i] + (bin_int[i+1] - bin_int[i])/2
piec_int = integrate_fun(hcov_sum, low, upp)
full_int += piec_int * 2/np.pi*np.arctan(width/(2*mid))
std_err = np.sqrt(2*np.pi*full_int / area)
if crange2 is None or psill2 is None or model2 is None:
return psill1 / std_err ** 2
else:
return (psill1 + psill2) / std_err ** 2
[docs]def integrate_fun(fun: Callable, low_b: float, upp_b: float) -> float:
"""
Numerically integrate function between upper and lower bounds
:param fun: function
:param low_b: lower bound
:param upp_b: upper bound
:return: integral
"""
return integrate.quad(fun, low_b, upp_b)[0]
[docs]def cov(h: float, crange: float, model: str = 'Sph', psill: float = 1., kappa: float = 1/2, nugget: float = 0) -> Callable:
"""
Covariance function based on variogram function (COV = STD - VGM)
:param h: spatial lag
:param crange: correlation range
:param model: model
:param psill: partial sill
:param kappa: smoothing parameter for Exp Class
:param nugget: nugget
:returns: covariance function
"""
return (nugget + psill) - vgm(h, crange, model=model, psill=psill, kappa=kappa)
[docs]def vgm(h: float, crange: float, model: str = 'Sph', psill: float = 1., kappa: float = 1/2, nugget: float = 0):
"""
Compute variogram model function (Spherical, Exponential, Gaussian or Exponential Class)
:param h: spatial lag
:param crange: correlation range
:param model: model
:param psill: partial sill
:param kappa: smoothing parameter for Exp Class
:param nugget: nugget
:returns: variogram function
"""
c0 = nugget # nugget
c1 = psill # partial sill
a1 = crange # correlation range
s = kappa # smoothness parameter for Matern class
if model == 'Sph': # spherical model
if h < a1:
vgm = c0 + c1 * (3 / 2 * h / a1-1 / 2 * (h / a1) ** 3)
else:
vgm = c0 + c1
elif model == 'Exp': # exponential model
vgm = c0 + c1 * (1-np.exp(-h / a1))
elif model == 'Gau': # gaussian model
vgm = c0 + c1 * (1-np.exp(- (h / a1) ** 2))
elif model == 'Exc': # stable exponential model
vgm = c0 + c1 * (1-np.exp(-(h / a1)**s))
return vgm
[docs]def std_err_finite(std: float, neff_tot: float, neff: float) -> float:
"""
Standard error of subsample of a finite ensemble
:param std: standard deviation
:param neff_tot: maximum number of effective samples
:param neff: number of effective samples
:return: standard error
"""
return std * np.sqrt(1 / neff_tot * (neff_tot - neff) / neff_tot)
[docs]def std_err(std: float, neff: float) -> float:
"""
Standard error
:param std: standard deviation
:param neff: number of effective samples
:return: standard error
"""
return std * np.sqrt(1 / neff)
[docs]def distance_latlon(tup1: tuple, tup2: tuple, earth_rad: float = 6373000) -> float:
"""
Distance between two lat/lon coordinates projected on a spheroid
ref: https://stackoverflow.com/questions/19412462/getting-distance-between-two-points-based-on-latitude-longitude
:param tup1: lon/lat coordinates of first point
:param tup2: lon/lat coordinates of second point
:param earth_rad: radius of the earth in meters
:return: distance
"""
lat1 = m.radians(abs(tup1[1]))
lon1 = m.radians(abs(tup1[0]))
lat2 = m.radians(abs(tup2[1]))
lon2 = m.radians(abs(tup2[0]))
dlon = lon2 - lon1
dlat = lat2 - lat1
a = m.sin(dlat / 2)**2 + m.cos(lat1) * m.cos(lat2) * m.sin(dlon / 2)**2
c = 2 * m.atan2(m.sqrt(a), m.sqrt(1 - a))
distance = earth_rad * c
return distance
[docs]def kernel_sph(xi: float, x0: float, a1: float) -> float:
# TODO: homogenize kernel/variogram use
"""
Spherical kernel
:param xi: position of first point
:param x0: position of second point
:param a1: range of kernel
:return: covariance between the two points
"""
if np.abs(xi - x0) > a1:
return 0
else:
return 1 - 3 / 2 * np.abs(xi-x0) / a1 + 1 / 2 * (np.abs(xi-x0) / a1) ** 3
[docs]def part_covar_sum(argsin: tuple) -> float:
"""
Multiprocessing wrapper for covariance summing
:param argsin: Tupled argument for covariance calculation
:return: covariance sum
"""
list_tuple_errs, corr_ranges, list_area_tot, list_lat, list_lon, i_range = argsin
n = len(list_tuple_errs)
part_var_err = 0
for i in i_range:
for j in range(n):
d = distance_latlon((list_lon[i], list_lat[i]), (list_lon[j], list_lat[j]))
for k in range(len(corr_ranges)):
part_var_err += kernel_sph(0, d, corr_ranges[k]) * list_tuple_errs[i][k] * list_tuple_errs[j][k] * \
list_area_tot[i] * list_area_tot[j]
return part_var_err
[docs]def double_sum_covar(list_tuple_errs: list[float], corr_ranges: list[float], list_area_tot: list[float],
list_lat: list[float], list_lon: list[float], nproc: int = 1) -> float:
"""
Double sum of covariances for propagating multi-range correlated errors between disconnected spatial ensembles
:param list_tuple_errs: list of tuples of correlated errors by range, by ensemble
:param corr_ranges: list of correlation ranges
:param list_area_tot: list of areas of ensembles
:param list_lat: list of center latitude of ensembles
:param list_lon: list of center longitude of ensembles
:param nproc: number of cores to use for multiprocessing
:returns: sum of covariances
"""
n = len(list_tuple_errs)
if nproc == 1:
print('Deriving double covariance sum with 1 core...')
var_err = 0
for i in range(n):
for j in range(n):
d = distance_latlon((list_lon[i], list_lat[i]), (list_lon[j], list_lat[j]))
for k in range(len(corr_ranges)):
var_err += kernel_sph(0, d, corr_ranges[k]) * list_tuple_errs[i][k] * list_tuple_errs[j][k] * \
list_area_tot[i] * list_area_tot[j]
else:
print('Deriving double covariance sum with '+str(nproc)+' cores...')
pack_size = int(np.ceil(n/nproc))
argsin = [(list_tuple_errs, corr_ranges, list_area_tot, list_lon, list_lat, np.arange(
i, min(i+pack_size, n))) for k, i in enumerate(np.arange(0, n, pack_size))]
pool = mp.Pool(nproc, maxtasksperchild=1)
outputs = pool.map(part_covar_sum, argsin, chunksize=1)
pool.close()
pool.join()
var_err = np.sum(np.array(outputs))
area_tot = 0
for j in range(len(list_area_tot)):
area_tot += list_area_tot[j]
var_err /= np.nansum(area_tot) ** 2
return np.sqrt(var_err)
[docs]def patches_method(dh : np.ndarray, mask: np.ndarray[bool], gsd : float, area_size : float, perc_min_valid: float = 80.,
patch_shape: str = 'circular',nmax : int = 1000, verbose: bool = False) -> pd.DataFrame:
"""
Patches method for empirical estimation of the standard error over an integration area
:param dh: elevation differences
:param mask: mask of sampled terrain
:param gsd: ground sampling distance
:param area_size: size of integration area
:param perc_min_valid: minimum valid area in the patch
:param patch_shape: shape of patch ['circular' or 'rectangular']
:param nmax: maximum number of patch to sample
#TODO: add overlap option?
:return: tile, mean, median, std and count of each patch
"""
# first, remove non sampled area (but we need to keep the 2D shape of raster for patch sampling)
dh = dh.squeeze()
valid_mask = np.logical_and(np.isfinite(dh), mask)
dh[~valid_mask] = np.nan
# divide raster in cadrants where we can sample
nx, ny = np.shape(dh)
count = len(dh[~np.isnan(dh)])
print('Number of valid pixels: ' + str(count))
nb_cadrant = int(np.floor(np.sqrt((count * gsd ** 2) / area_size) + 1))
# rectangular
nx_sub = int(np.floor((nx - 1) / nb_cadrant))
ny_sub = int(np.floor((ny - 1) / nb_cadrant))
# radius size for a circular patch
rad = int(np.floor(np.sqrt(area_size/np.pi * gsd ** 2)))
tile, mean_patch, med_patch, std_patch, nb_patch = ([] for i in range(5))
# create list of all possible cadrants
list_cadrant = [[i, j] for i in range(nb_cadrant) for j in range(nb_cadrant)]
u = 0
# keep sampling while there is cadrants left and below maximum number of patch to sample
while len(list_cadrant) > 0 and u < nmax:
check = 0
while check == 0:
rand_cadrant = random.randint(0, len(list_cadrant)-1)
i = list_cadrant[rand_cadrant][0]
j = list_cadrant[rand_cadrant][1]
check_x = int(np.floor(nx_sub*(i+1/2)))
check_y = int(np.floor(ny_sub*(j+1/2)))
if mask[check_x, check_y]:
check = 1
list_cadrant.remove(list_cadrant[rand_cadrant])
tile.append(str(i) + '_' + str(j))
if patch_shape == 'rectangular':
patch = dh[nx_sub * i:nx_sub * (i + 1), ny_sub * j:ny_sub * (j + 1)].flatten()
elif patch_shape == 'circular':
center_x = np.floor(nx_sub*(i+1/2))
center_y = np.floor(ny_sub*(j+1/2))
mask = create_circular_mask((nx, ny), center=(center_x, center_y), radius=rad)
patch = dh[mask]
else:
raise ValueError('Patch method must be rectangular or circular.')
nb_pixel_total = len(patch)
nb_pixel_valid = len(patch[np.isfinite(patch)])
if nb_pixel_valid > np.ceil(perc_min_valid / 100. * nb_pixel_total):
u=u+1
if verbose:
print('Found valid cadrant ' + str(u)+ ' (maximum: '+str(nmax)+')')
mean_patch.append(np.nanmean(patch))
med_patch.append(np.nanmedian(patch.filled(np.nan) if isinstance(patch, np.ma.masked_array) else patch))
std_patch.append(np.nanstd(patch))
nb_patch.append(nb_pixel_valid)
df = pd.DataFrame()
df = df.assign(tile=tile, mean=mean_patch, med=med_patch, std=std_patch, count=nb_patch)
return df
[docs]def plot_vgm(df: pd.DataFrame, fit_fun: Callable = None):
fig, ax = plt.subplots(1)
if np.all(np.isnan(df.exp_sigma)):
ax.scatter(df.bins, df.exp, label='Empirical variogram', color='blue')
else:
ax.errorbar(df.bins, df.exp, yerr=df.exp_sigma, label='Empirical variogram (1-sigma s.d)')
if fit_fun is not None:
x = np.linspace(0, np.max(df.bins), 10000)
y = fit_fun(x)
ax.plot(x, y, linestyle='dashed', color='black', label='Model fit', zorder=30)
ax.set_xlabel('Lag (m)')
ax.set_ylabel(r'Variance [$\mu$ $\pm \sigma$]')
ax.legend(loc='best')
ax.grid()
plt.show()