"""Volume change calculation tools (aimed for glaciers)."""
from __future__ import annotations
import warnings
from typing import Callable, Optional, Union
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio.fill
import scipy.interpolate
from tqdm import tqdm
import xdem
[docs]def hypsometric_binning(ddem: np.ndarray, ref_dem: np.ndarray, bins: Union[float, np.ndarray] = 50.0,
kind: str = "fixed", aggregation_function: Callable = np.median) -> pd.DataFrame:
"""
Separate the dDEM in discrete elevation bins.
The elevation bins will be calculated based on all ref_dem valid pixels.
ddem may contain NaN/masked values over the same area, they will be excluded before the aggregation.
It is assumed that the dDEM is calculated as 'ref_dem - dem' (not 'dem - ref_dem').
:param ddem: The dDEM as a 2D or 1D array.
:param ref_dem: The reference DEM as a 2D or 1D array.
:param bins: The bin size, count, or array, depending on the binning method ('kind').
:param kind: The kind of binning to do. Choices: ['fixed', 'count', 'quantile', 'custom'].
:param aggregation_function: The function to aggregate the elevation values within a bin. Defaults to the median.
:returns: A Pandas DataFrame with elevation bins and dDEM statistics.
"""
assert ddem.shape == ref_dem.shape
# Convert ddem mask into NaN
ddem, _ = xdem.spatial_tools.get_array_and_mask(ddem)
# Extract only the valid values, i.e. valid in ref_dem
valid_mask = ~xdem.spatial_tools.get_mask(ref_dem)
ddem = np.array(ddem[valid_mask])
ref_dem = np.array(ref_dem.squeeze()[valid_mask])
if isinstance(bins, np.ndarray):
zbins = bins
elif kind == "fixed":
zbins = np.arange(ref_dem.min(), ref_dem.max() + bins + 1e-6, step=bins) # +1e-6 in case min=max (1 point)
elif kind == "count":
# Make bins between mean_dem.min() and a little bit above mean_dem.max().
# The bin count has to be bins + 1 because zbins[0] will be a "below min value" bin, which will be irrelevant.
zbins = np.linspace(ref_dem.min(), ref_dem.max() + 1e-6 / bins, num=int(bins + 1))
elif kind == "quantile":
# Make the percentile steps. The bins + 1 is explained above.
steps = np.linspace(0, 100, num=int(bins) + 1)
zbins = np.fromiter(
(np.percentile(ref_dem, step) for step in steps),
dtype=float
)
# The uppermost bin needs to be a tiny amount larger than the highest value to include it.
zbins[-1] += 1e-6
elif kind == "custom":
zbins = bins # type: ignore
else:
raise ValueError(f"Invalid bin kind: {kind}. Choices: ['fixed', 'count', 'quantile', 'custom']")
# Generate bins and get bin indices from the mean DEM
indices = np.digitize(ref_dem, bins=zbins)
# Calculate statistics for each bin.
# If no values exist, all stats should be nans (except count with should be 0)
# medians, means, stds, nmads = (np.zeros(shape=bins.shape[0] - 1, dtype=ddem.dtype) * np.nan, ) * 4
values = np.zeros(shape=zbins.shape[0] - 1, dtype=ddem.dtype) * np.nan
counts = np.zeros_like(values, dtype=int)
for i in np.arange(indices.min(), indices.max() + 1):
values_in_bin = ddem[indices == i]
# Remove possible Nans
values_in_bin = values_in_bin[np.isfinite(values_in_bin)]
# Skip if no values are in the bin.
if values_in_bin.shape[0] == 0:
continue
try:
values[i - 1] = aggregation_function(values_in_bin)
counts[i - 1] = values_in_bin.shape[0]
except IndexError as exception:
# If custom bins were added, i may exceed the bin range, which will be silently ignored.
if kind == "custom" and "out of bounds" in str(exception):
continue
raise exception
# Collect the results in a dataframe
output = pd.DataFrame(
index=pd.IntervalIndex.from_breaks(zbins),
data=np.vstack([
values, counts
]).T,
columns=["value", "count"]
)
return output
[docs]def interpolate_hypsometric_bins(hypsometric_bins: pd.DataFrame, value_column="value", method="polynomial", order=3,
count_threshold: Optional[int] = None) -> pd.DataFrame:
"""
Interpolate hypsometric bins using any valid Pandas interpolation technique.
NOTE: It will not extrapolate!
:param hypsometric_bins: Bins where nans will be interpolated.
:param value_column: The name of the column in 'hypsometric_bins' to use as values.
:param method: Any valid Pandas interpolation technique (e.g. 'linear', 'polynomial', 'ffill', 'bfill').
:param order: The polynomial order to use. Only used if method='polynomial'.
:param count_threshold: Optional. A pixel count threshold to exclude during the curve fit (requires a 'count' column).
:returns: Bins interpolated with the chosen interpolation method.
"""
# Copy the bins that will be filled.
bins = hypsometric_bins.copy()
# Temporarily set the index to be the midpoint (for interpolation)
bins.index = bins.index.mid
# Set all bins that are under a (potentially) specified count to NaN (they should be excluded from interpolation)
if count_threshold is not None:
assert "count" in hypsometric_bins.columns, "'count' not a column in the dataframe"
bins_under_threshold = bins["count"] < count_threshold
bins.loc[bins_under_threshold, value_column] = np.nan
# Count number of valid (finite) values
nvalids = np.count_nonzero(np.isfinite(bins[value_column]))
if nvalids <= order + 1:
# Cannot interpolate -> leave as it is
warnings.warn("Not enough valid bins for interpolation -> returning copy", UserWarning)
return hypsometric_bins.copy()
else:
# Interpolate all bins that are NaN.
bins[value_column] = bins[value_column].interpolate(method=method, order=order, limit_direction="both")
# If some points were temporarily set to NaN (to exclude from the interpolation), re-set them.
if count_threshold is not None:
bins.loc[bins_under_threshold, value_column] = hypsometric_bins.loc[bins_under_threshold.values, value_column]
# Return the index to intervals instead of the midpoint.
bins.index = hypsometric_bins.index
return bins
[docs]def fit_hypsometric_bins_poly(hypsometric_bins: pd.DataFrame, value_column: str = "value", degree: int = 3,
iterations: int = 1, count_threshold: Optional[int] = None) -> pd.Series:
"""
Fit a polynomial to the hypsometric bins.
:param hypsometric_bins: Bins where nans will be interpolated.
:param value_column: The name of the column in 'hypsometric_bins' to use as values.
:param degree: The degree of the polynomial to use.
:param iterations: The number of iterations to run. \
At each iteration, values with residuals larger than 3 times the residuals' standard deviation are excluded.
:param count_threshold: Optional. A pixel count threshold to exclude during the curve fit (requires a 'count' column).
:returns: Bins replaced by the polynomial fit.
"""
bins = hypsometric_bins.copy()
bins.index = bins.index.mid
if count_threshold is not None:
assert "count" in hypsometric_bins.columns, f"'count' not a column in the dataframe"
bins_under_threshold = bins["count"] < count_threshold
bins.loc[bins_under_threshold, value_column] = np.nan
# Remove invalid bins
valids = np.isfinite(np.asarray(bins[value_column]))
for k in range(iterations):
# Fit polynomial
x = bins.index[valids]
y = bins[value_column][valids]
pcoeff = np.polyfit(x, y, deg=degree)
# Calculate residuals
interpolated_values = np.polyval(pcoeff, bins.index)
residuals = interpolated_values - bins[value_column]
residuals_std = np.nanstd(residuals.values)
# Filter outliers further than 3 std
valids_old = np.copy(valids)
valids[np.abs(residuals.values) > 3*residuals_std] = False
if np.array_equal(valids, valids_old):
break
# Save as pandas' DataFrame
output = pd.DataFrame(
index=hypsometric_bins.index,
data=np.vstack([
interpolated_values, bins["count"]
]).T,
columns=["value", "count"]
)
return output
[docs]def calculate_hypsometry_area(ddem_bins: Union[pd.Series, pd.DataFrame], ref_dem: np.ndarray,
pixel_size: Union[float, tuple[float, float]],
timeframe: str = "reference") -> pd.Series:
"""
Calculate the associated representative area of the given dDEM bins.
By default, the area bins will be representative of the mean timing between the reference and nonreference DEM:
elevations = ref_dem - (h_vs_dh_funcion(ref_dem) / 2)
This can be changed to either "reference":
elevations = ref_dem
Or "nonreference":
elevations = ref_dem - h_vs_dh_function(ref_dem)
:param ddem_bins: A Series or DataFrame of dDEM values. If a DataFrame is given, the column 'value' will be used.
:param ref_dem: The reference DEM. This should not have any NaNs.
:param pixel_size: The xy or (x, y) size of the reference DEM pixels in georeferenced coordinates.
:param timeframe: The time at which the area bins are representative. Choices: "reference", "nonreference", "mean"
:returns: The representative area within the given dDEM bins.
"""
assert not np.any(np.isnan(ref_dem)), "The given reference DEM has NaNs. No NaNs are allowed to calculate area!"
if timeframe not in ["reference", "nonreference", "mean"]:
raise ValueError(f"Argument 'timeframe={timeframe}' is invalid. Choices: ['reference', 'nonreference', 'mean']")
if isinstance(ddem_bins, pd.DataFrame):
ddem_bins = ddem_bins["value"]
assert not np.any(np.isnan(ddem_bins.values)), "The dDEM bins cannot contain NaNs. Remove or fill them first."
# Generate a continuous elevation vs. dDEM function
ddem_func = scipy.interpolate.interp1d(ddem_bins.index.mid, ddem_bins.values,
kind="linear", fill_value="extrapolate")
# Generate average elevations by subtracting half of the dDEM's values to the reference DEM
if timeframe == "mean":
elevations = ref_dem - (ddem_func(ref_dem.data) / 2)
elif timeframe == "reference":
elevations = ref_dem
elif timeframe == "nonreference":
elevations = ref_dem - ddem_func(ref_dem.data)
# Extract the bins from the dDEM series and compute the frequency of points in the bins.
bins = np.r_[[ddem_bins.index.left[0]], ddem_bins.index.right]
bin_counts = np.histogram(elevations, bins=bins)[0]
# Multiply the bin counts with the pixel area to get the full area.
bin_area = bin_counts * (pixel_size ** 2 if not isinstance(pixel_size, tuple) else pixel_size[0] * pixel_size[1])
# Put this in a series which will be returned.
output = pd.Series(index=ddem_bins.index, data=bin_area)
return output
[docs]def linear_interpolation(array: Union[np.ndarray, np.ma.masked_array], max_search_distance: int = 10,
extrapolate: bool = False, force_fill: bool = False) -> np.ndarray:
"""
Interpolate a 2D array using rasterio's fillnodata.
:param array: An array with NaNs or a masked array to interpolate.
:param max_search_distance: The maximum number of pixels to search in all directions to find values \
to interpolate from. The default is 10.
:param extrapolate: if False, will remove pixels that have been extrapolated by fillnodata. Default is False.
:param force_fill: if True, will replace all remaining gaps by the median of all valid values. Default is False.
:returns: A filled array with no NaNs
"""
# Create a mask for where nans exist
nan_mask = xdem.spatial_tools.get_mask(array)
interpolated_array = rasterio.fill.fillnodata(array.copy(), mask=(~nan_mask).astype("uint8"),
max_search_distance=max_search_distance)
# Remove extrapolated values: gaps up to the size of max_search_distance are kept,
# but surfaces that artifically grow on the edges are removed
if not extrapolate:
interp_mask = cv2.morphologyEx((~nan_mask).squeeze().astype('uint8'), cv2.MORPH_CLOSE,
kernel=np.ones((max_search_distance - 1, )*2)).astype('bool')
if np.ndim(array) == 3:
interpolated_array[:, ~interp_mask] = np.nan
else:
interpolated_array[~interp_mask] = np.nan
# Fill the nans (values outside of the value boundaries) with the median value
# This triggers a warning with np.masked_array's because it ignores the mask
if force_fill:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
interpolated_array[np.isnan(interpolated_array)] = np.nanmedian(array)
else:
# If input is masked array, return a masked array
extrap_mask = (interpolated_array != array.data)
if isinstance(array, np.ma.masked_array):
interpolated_array = np.ma.masked_array(interpolated_array, mask=(nan_mask & ~extrap_mask))
return interpolated_array.reshape(array.shape)
[docs]def hypsometric_interpolation(voided_ddem: Union[np.ndarray, np.ma.masked_array],
ref_dem: Union[np.ndarray, np.ma.masked_array],
mask: np.ndarray) -> np.ma.masked_array:
"""
Interpolate a dDEM using hypsometric interpolation within the given mask.
Using `ref_dem`, elevation bins of constant height (hard-coded to 50 m for now) are created.
Gaps in `voided-ddem`, within the provided `mask`, are filled with the median dDEM value within that bin.
:param voided_ddem: A dDEM with voids (either an array with nans or a masked array).
:param ref_dem: The reference DEM in the dDEM comparison.
:param mask: A mask to delineate the area that will be interpolated (True means hypsometric will be used).
"""
# Get ddem array with invalid pixels converted to NaN and mask of invalid pixels
ddem, ddem_mask = xdem.spatial_tools.get_array_and_mask(voided_ddem)
# Get ref_dem array with invalid pixels converted to NaN and mask of invalid pixels
dem, dem_mask = xdem.spatial_tools.get_array_and_mask(ref_dem)
# Make sure the mask does not have e.g. the shape (1, height, width)
mask = mask.squeeze()
# A mask of inlier values: The union of the mask and the inverted exclusion masks of both rasters.
inlier_mask = mask & (~ddem_mask & ~dem_mask)
if np.count_nonzero(inlier_mask) == 0:
warnings.warn("No valid data found within mask, returning copy", UserWarning)
return np.copy(ddem)
# Estimate the elevation dependent gradient.
gradient = xdem.volume.hypsometric_binning(
ddem[inlier_mask],
dem[inlier_mask]
)
# Interpolate possible missing elevation bins in 1D - no extrapolation done here
interpolated_gradient = xdem.volume.interpolate_hypsometric_bins(gradient)
gradient_model = scipy.interpolate.interp1d(
interpolated_gradient.index.mid,
interpolated_gradient["value"].values,
fill_value="extrapolate"
)
# Create an idealized dDEM using the relationship between elevation and dDEM
idealized_ddem = np.zeros_like(dem)
idealized_ddem[mask] = gradient_model(dem[mask])
# Replace ddem gaps with idealized hypsometric ddem, but only within mask
corrected_ddem = np.where(ddem_mask & mask, idealized_ddem, ddem)
output = np.ma.masked_array(
corrected_ddem,
mask=~np.isfinite(corrected_ddem)
)
assert output is not None
return output
[docs]def local_hypsometric_interpolation(voided_ddem: Union[np.ndarray, np.ma.masked_array],
ref_dem: Union[np.ndarray, np.ma.masked_array],
mask: np.ndarray, min_coverage: float = 0.2,
count_threshold: Optional[int] = 1, nodata: Union[float, int] = -9999,
plot: bool = False) -> np.ma.masked_array:
"""
Interpolate a dDEM using local hypsometric interpolation.
The algorithm loops through each features in the vector file.
The dDEM is assumed to have been created as "voided_ddem = reference_dem - other_dem".
:param voided_ddem: A dDEM with voids (either an array with nans or a masked array).
:param ref_dem: The reference DEM in the dDEM comparison.
:param mask: A raster of same shape as voided_ddem and ref_dem, containing a diferent non-0 pixel value for \
each geometry on which to loop.
:param min_coverage: Optional. The minimum coverage fraction to be considered for interpolation.
:param count_threshold: Optional. A pixel count threshold to exclude during the hypsometric curve fit.
:param nodata: Optional. No data value to be used for the output masked_array.
:param plot: Set to True to display intermediate plots.
:returns: A dDEM with gaps filled by applying a hypsometric interpolation for each geometry in mask, \
for areas filling the min_coverage criterion.
"""
# Remove any unnecessary dimension
orig_shape = voided_ddem.shape
voided_ddem = voided_ddem.squeeze()
ref_dem = ref_dem.squeeze()
mask = mask.squeeze()
# Check that all arrays have same dimensions
assert voided_ddem.shape == ref_dem.shape == mask.shape
# Get ddem array with invalid pixels converted to NaN and mask of invalid pixels
ddem, ddem_mask = xdem.spatial_tools.get_array_and_mask(voided_ddem)
# Get ref_dem array with invalid pixels converted to NaN and mask of invalid pixels
dem, dem_mask = xdem.spatial_tools.get_array_and_mask(ref_dem)
# A mask of inlier values: The union of the mask and the inverted exclusion masks of both rasters.
inlier_mask = (mask != 0) & (~ddem_mask & ~dem_mask)
if np.count_nonzero(inlier_mask) == 0:
warnings.warn("No valid data found within mask, returning copy", UserWarning)
return np.copy(ddem)
if plot:
plt.matshow(inlier_mask)
plt.title("inlier mask")
plt.show()
# List of indexes to loop on
geometry_index = np.unique(mask[mask != 0])
print("Found {:d} geometries".format(len(geometry_index)))
# Get fraction of valid pixels for each geometry
coverage = np.zeros(len(geometry_index))
for k, index in enumerate(geometry_index):
local_inlier_mask = inlier_mask & (mask == index)
total_pixels = np.count_nonzero((mask == index))
valid_pixels = np.count_nonzero(local_inlier_mask)
coverage[k] = valid_pixels/float(total_pixels)
# Filter geometries with too little coverage
valid_geometry_index = geometry_index[coverage >= min_coverage]
print("Found {:d} geometries with sufficient coverage".format(len(valid_geometry_index)))
idealized_ddem = nodata * np.ones_like(dem)
for k, index in enumerate(valid_geometry_index):
# Mask of valid pixel within geometry
local_mask = (mask == index)
local_inlier_mask = inlier_mask & (local_mask)
# Estimate the elevation dependent gradient
gradient = xdem.volume.hypsometric_binning(
ddem[local_mask],
dem[local_mask]
)
# Remove bins with loo low count
filt_gradient = gradient.copy()
if count_threshold > 1:
bins_under_threshold = filt_gradient["count"] < count_threshold
filt_gradient.loc[bins_under_threshold, "value"] = np.nan
# Interpolate missing elevation bins
interpolated_gradient = xdem.volume.interpolate_hypsometric_bins(filt_gradient)
# At least 2 points needed for interp1d, if not skip feature
nvalues = len(interpolated_gradient['value'].values)
if nvalues < 2:
warnings.warn(
"Not enough valid bins for feature with index {:d} -> skipping interpolation".format(index),
UserWarning
)
continue
# Create a model for 2D interpolation
gradient_model = scipy.interpolate.interp1d(
interpolated_gradient.index.mid,
interpolated_gradient['value'].values,
fill_value="extrapolate"
)
if plot:
local_ddem = np.where(local_inlier_mask, ddem, np.nan)
vmax = max(np.abs(np.nanpercentile(local_ddem, [2, 98])))
rowmin, rowmax, colmin, colmax = xdem.spatial_tools.get_valid_extent(mask == index)
fig = plt.figure(figsize=(12, 8))
plt.subplot(121)
plt.imshow((mask == index)[rowmin:rowmax, colmin:colmax], cmap='Greys',
vmin=0, vmax=2, interpolation='none')
plt.imshow(local_ddem[rowmin:rowmax, colmin:colmax], cmap='RdYlBu',
vmin=-vmax, vmax=vmax, interpolation='none')
plt.colorbar()
plt.title("ddem for geometry # {:d}".format(index))
plt.subplot(122)
plt.plot(gradient["value"], gradient.index.mid, label='raw')
plt.plot(interpolated_gradient, gradient.index.mid, label='interpolated', ls='--')
plt.xlabel('ddem')
plt.ylabel('Elevation')
plt.legend()
plt.title("Average ddem per elevation bin")
plt.tight_layout()
plt.show()
# Create an idealized dDEM (only considering the dH gradient)
idealized_ddem[mask == index] = gradient_model(dem[mask == index])
# Measure the difference between the original dDEM and the idealized dDEM
assert ddem.shape == idealized_ddem.shape
ddem_difference = ddem.astype("float32") - idealized_ddem.astype("float32")
ddem_difference[idealized_ddem == nodata] = np.nan
# Spatially interpolate the difference between these two products.
interpolated_ddem_diff = linear_interpolation(np.where(ddem_mask, np.nan, ddem_difference))
interpolated_ddem_diff[np.isnan(interpolated_ddem_diff)] = 0
# Correct the idealized dDEM with the difference to the original dDEM.
corrected_ddem = idealized_ddem + interpolated_ddem_diff
# Set Nans to nodata
corrected_ddem[~np.isfinite(corrected_ddem)] = nodata
output = np.ma.masked_array(
corrected_ddem,
mask=(corrected_ddem == nodata) # mask=((mask != 0) & (ddem_mask | dem_mask))
).reshape(orig_shape)
assert output is not None
return output
[docs]def get_regional_hypsometric_signal(ddem: Union[np.ndarray, np.ma.masked_array],
ref_dem: Union[np.ndarray, np.ma.masked_array],
glacier_index_map: np.ndarray, n_bins: int = 20,
verbose: bool = False, min_coverage: float = 0.05) -> pd.DataFrame:
"""
Get the normalized regional hypsometric elevation change signal, read "the general shape of it".
:param ddem: The dDEM to analyse.
:param ref_dem: A void-free reference DEM.
:param glacier_index_map: An array glacier indices of the same shape as the previous inputs.
:param verbose: Show progress bar.
n_bins = 20 # TODO: This should be an argument.
:param n_bins: The number of elevation bins to subdivide each glacier in.
:returns: A DataFrame of bin statistics, scaled by elevation and elevation change.
"""
# Extract the array and mask representations of the arrays.
ddem_arr, ddem_mask = xdem.spatial_tools.get_array_and_mask(ddem.squeeze())
ref_arr, ref_mask = xdem.spatial_tools.get_array_and_mask(ref_dem.squeeze())
# The reference DEM should be void free
assert np.count_nonzero(ref_mask) == 0, "Reference DEM has voids"
# The unique indices are the unique glaciers.
unique_indices = np.unique(glacier_index_map)
# Create empty (ddem) value and (pixel) count arrays which will be filled iteratively.
values = np.empty((n_bins, unique_indices.shape[0]), dtype=float) + np.nan
counts = np.empty((n_bins, unique_indices.shape[0]), dtype=float) + np.nan
# Start a counter of glaciers that are actually processed.
count = 0
# Loop over each unique glacier.
for i in tqdm(np.unique(glacier_index_map), desc="Finding regional signal", disable=(not verbose)):
# If i ==0, it's assumed to be periglacial.
if i == 0:
continue
# Create a mask representing a particular glacier.
glacier_values = (glacier_index_map == i)
# Stop if the "glacier" is tiny. It might be a cropped glacier outline for example.
if np.count_nonzero(glacier_values) < 10:
continue
# The inlier mask is where that particular glacier is and where nans don't exist.
inlier_mask = glacier_values & ~ddem_mask
# Skip if the coverage is below the threshold
if (np.count_nonzero(inlier_mask) / np.count_nonzero(glacier_values)) < min_coverage:
continue
# Extract only the difference and elevation values that correspond to the glacier.
differences = ddem_arr[inlier_mask]
elevations = ref_arr[inlier_mask]
# Run the hypsometric binning.
try:
bins = hypsometric_binning(differences, elevations, bins=n_bins, kind="count")
except ValueError: # ValueError: zero-size array to reduction operation minimum which has no identity on "zbins=" call
continue
# Min-max scale by elevation.
bins.index = (bins.index.mid - bins.index.left.min()) / (bins.index.right.max() - bins.index.left.min())
# Scale by difference.
bins["value"] = (bins["value"] - np.nanmin(bins["value"])) / \
(np.nanmax(bins["value"]) - np.nanmin(bins["value"]))
# Assign the values and counts to the output array.
values[:, count] = bins["value"]
counts[:, count] = bins["count"]
count += 1
output = pd.DataFrame(
data={
"w_mean": np.nansum(values * counts, axis=1) / np.nansum(counts, axis=1),
"median": np.nanmedian(values, axis=1),
"std": np.nanstd(values, axis=1),
"sigma-1-lower": np.nanpercentile(values, 16, axis=1),
"sigma-1-upper": np.nanpercentile(values, 84, axis=1),
"sigma-2-lower": np.nanpercentile(values, 2.5, axis=1),
"sigma-2-upper": np.nanpercentile(values, 97.5, axis=1),
"count": np.nansum(counts, axis=1).astype(int),
},
index=pd.IntervalIndex.from_breaks(np.linspace(0, 1, n_bins + 1, dtype="float64")),
)
return output
[docs]def norm_regional_hypsometric_interpolation(voided_ddem: Union[np.ndarray, np.ma.masked_array],
ref_dem: Union[np.ndarray, np.ma.masked_array],
glacier_index_map: np.ndarray,
min_coverage: float = 0.1,
regional_signal: Optional[pd.DataFrame] = None,
verbose: bool = False,
min_elevation_range: float = 0.33,
idealized_ddem: bool = False) -> np.ndarray:
"""
Interpolate missing values by scaling the normalized regional hypsometric signal to each glacier separately.
Only missing values are interpolated. The rest of the glacier's values are fixed.
:param voided_ddem: The voided dDEM to fill NaNs in.
:param ref_dem: A void-free reference DEM.
:param glacier_index_map: An array glacier indices of the same shape as the previous inputs.
:param min_coverage: The minimum fractional coverage of a glacier to interpolate. Defaults to 10%.
:param regional_signal: A regional signal is already estimate. Otherwise one will be estimated.
:param verbose: Show progress bars.
:param min_elevation_range: The minimum allowed min/max bin range to scale a signal from.\
Default: 1/3 of the elevation range needs to be present.
:param idealized_ddem: Replace observed glacier values with the hypsometric signal. Good for error assessments.
:raises AssertionError: If `ref_dem` has voids.
:returns: A dDEM where glacier's that fit the min_coverage criterion are interpolated.
"""
# Extract the array and nan parts of the inputs.
ddem_arr, ddem_nans = xdem.spatial_tools.get_array_and_mask(voided_ddem)
ref_arr, ref_nans = xdem.spatial_tools.get_array_and_mask(ref_dem)
# The reference DEM should be void free
assert np.count_nonzero(ref_nans) == 0, "Reference DEM has voids"
# If the regional signal was not given as an argument, find it from the dDEM.
if regional_signal is None:
regional_signal = get_regional_hypsometric_signal(
ddem=ddem_arr,
ref_dem=ref_arr,
glacier_index_map=glacier_index_map,
verbose=verbose
)
# The unique indices are the unique glaciers.
unique_indices = np.unique(glacier_index_map)
# Make a copy of the dDEM which will be filled iteratively.
ddem_filled = ddem_arr.copy()
# Loop over all glaciers and fill the dDEM accordingly.
for i in tqdm(unique_indices, desc="Interpolating dDEM", disable=(not verbose)):
if i == 0: # i==0 is assumed to mean stable ground.
continue
# Create a mask representing a particular glacier.
glacier_values = (glacier_index_map == i)
# The inlier mask is where that particular glacier is and where nans don't exist.
inlier_mask = glacier_values & ~ddem_nans
# If the fractional coverage is smaller than the given threshold, skip the glacier.
if (np.count_nonzero(inlier_mask) / np.count_nonzero(glacier_values)) < min_coverage:
continue
# Extract only the finite difference and elevation values that correspond to the glacier.
differences = ddem_arr[inlier_mask]
elevations = ref_arr[inlier_mask]
# Get the reference elevation min and max
elev_min = ref_arr[glacier_values].min()
elev_max = ref_arr[glacier_values].max()
# Copy the signal
signal = regional_signal["w_mean"].copy()
# Scale the signal elevation midpoints to the glacier elevation range.
midpoints = signal.index.mid
midpoints *= elev_max - elev_min
midpoints += elev_min
step = midpoints[1] - midpoints[0]
# Create an interval structure from the midpoints and the step size.
signal.index = pd.IntervalIndex.from_arrays(left=midpoints - step / 2, right=midpoints + step / 2)
# Find the hypsometric bins of the glacier.
hypsometric_bins = hypsometric_binning(
ddem=differences,
ref_dem=elevations,
bins=np.r_[[signal.index.left[0]], signal.index.right], # This will generate the same steps as the signal.
kind="custom"
)
bin_stds = hypsometric_binning(
ddem=differences,
ref_dem=elevations,
bins=np.r_[[signal.index.left[0]], signal.index.right],
kind="custom",
aggregation_function=np.nanstd
)
# Check which of the bins were non-empty.
non_empty_bins = np.isfinite(hypsometric_bins["value"])
non_empty_range = np.sum(non_empty_bins[non_empty_bins].index.length)
full_range = np.sum(hypsometric_bins.index.length)
if (non_empty_range / full_range) < min_elevation_range:
continue
# A theoretical minimum of 2 bins are needed for the curve fit.
if np.count_nonzero(non_empty_bins) < 2:
continue
# The weights are the squared inverse of the standard deviation of each bin.
bin_weights = bin_stds["value"].values[non_empty_bins] / \
np.sqrt(hypsometric_bins["count"].values[non_empty_bins])
bin_weights[bin_weights == 0.0] = 1e-8 # Avoid divide by zero problems.
# Fit linear coefficients to scale the regional signal to the hypsometric bins properly.
# The inverse of the pixel counts are used as weights, to properly disregard poorly constrained bins.
with warnings.catch_warnings():
# curve_fit will sometimes say "can't estimate covariance". This is okay.
warnings.filterwarnings("ignore", message="covariance")
coeffs = scipy.optimize.curve_fit(
f=lambda x, a, b: a * x + b, # Estimate a linear function "f(x) = ax + b".
xdata=signal.values[non_empty_bins], # The xdata is the normalized regional signal
ydata=hypsometric_bins["value"].values[non_empty_bins], # The ydata is the actual values.
p0=[1, 0], # The initial guess of a and b (doesn't matter too much)
sigma=bin_weights,
)[0]
# Create a linear model from the elevations and the scaled regional signal.
model = scipy.interpolate.interp1d(signal.index.mid, np.poly1d(coeffs)(signal.values), bounds_error=False)
# Find which values to fill using the model (all nans within the glacier extent)
if not idealized_ddem:
values_to_fill = glacier_values & ddem_nans
# If it should be idealized, replace all glacier values with the model
else:
values_to_fill = glacier_values
# Fill the nans using the scaled regional signal.
ddem_filled[values_to_fill] = model(ref_arr[values_to_fill])
return ddem_filled