"""DEM coregistration classes and functions."""
from __future__ import annotations
import copy
import concurrent.futures
import json
import os
import subprocess
import tempfile
import warnings
from enum import Enum
from typing import Any, Callable, Optional, Union, Sequence, TypeVar
import fiona
import geoutils as gu
import numpy as np
import rasterio as rio
import rasterio.warp # pylint: disable=unused-import
import rasterio.windows # pylint: disable=unused-import
import scipy
import scipy.interpolate
import scipy.ndimage
import scipy.optimize
import skimage.transform
from rasterio import Affine
from tqdm import trange, tqdm
import pandas as pd
import xdem
try:
import richdem as rd
_has_rd = True
except ImportError:
_has_rd = False
try:
import cv2
_has_cv2 = True
except ImportError:
_has_cv2 = False
try:
from pytransform3d.transform_manager import TransformManager
import pytransform3d.transformations
_HAS_P3D = True
except ImportError:
_HAS_P3D = False
[docs]def filter_by_range(ds: rio.DatasetReader, rangelim: tuple[float, float]):
"""
Function to filter values using a range.
"""
print('Excluding values outside of range: {0:f} to {1:f}'.format(*rangelim))
out = np.ma.masked_outside(ds, *rangelim)
out.set_fill_value(ds.fill_value)
return out
[docs]def filtered_slope(ds_slope, slope_lim=(0.1, 40)):
print("Slope filter: %0.2f - %0.2f" % slope_lim)
print("Initial count: %i" % ds_slope.count())
flt_slope = filter_by_range(ds_slope, slope_lim)
print(flt_slope.count())
return flt_slope
[docs]def apply_xy_shift(ds: rio.DatasetReader, dx: float, dy: float) -> np.ndarray:
"""
Apply horizontal shift to rio dataset using Transform affine matrix
:param ds: DEM
:param dx: dx shift value
:param dy: dy shift value
Returns:
Rio Dataset with updated transform
"""
print("X shift: ", dx)
print("Y shift: ", dy)
# Update geotransform
ds_meta = ds.meta
gt_orig = ds.transform
gt_align = Affine(gt_orig.a, gt_orig.b, gt_orig.c+dx,
gt_orig.d, gt_orig.e, gt_orig.f+dy)
print("Original transform:", gt_orig)
print("Updated transform:", gt_align)
# Update ds Geotransform
ds_align = ds
meta_update = ds.meta.copy()
meta_update({"driver": "GTiff", "height": ds.shape[1],
"width": ds.shape[2], "transform": gt_align, "crs": ds.crs})
# to split this part in two?
with rasterio.open(ds_align, "w", **meta_update) as dest:
dest.write(ds_align)
return ds_align
[docs]def apply_z_shift(ds: rio.DatasetReader, dz: float):
"""
Apply vertical shift to rio dataset using Transform affine matrix
:param ds: DEM
:param dx: dz shift value
"""
src_dem = rio.open(ds)
a = src_dem.read(1)
ds_shift = a + dz
return ds_shift
[docs]def rio_to_rda(ds: rio.DatasetReader) -> rd.rdarray:
"""
Get georeferenced richDEM array from rasterio dataset
:param ds: DEM
:return: DEM
"""
arr = ds.read(1)
rda = rd.rdarray(arr, no_data=ds.get_nodatavals()[0])
rda.geotransform = ds.get_transform()
rda.projection = ds.get_gcps()
return rda
[docs]def get_terrainattr(ds: rio.DatasetReader, attrib='slope_degrees') -> rd.rdarray:
"""
Derive terrain attribute for DEM opened with rasterio. One of "slope_degrees", "slope_percentage", "aspect",
"profile_curvature", "planform_curvature", "curvature" and others (see richDEM documentation)
:param ds: DEM
:param attrib: terrain attribute
:return:
"""
rda = rio_to_rda(ds)
terrattr = rd.TerrainAttribute(rda, attrib=attrib)
return terrattr
[docs]def get_horizontal_shift(elevation_difference: np.ndarray, slope: np.ndarray, aspect: np.ndarray,
min_count: int = 20) -> tuple[float, float, float]:
"""
Calculate the horizontal shift between two DEMs using the method presented in Nuth and Kääb (2011).
:param elevation_difference: The elevation difference (reference_dem - aligned_dem).
:param slope: A slope map with the same shape as elevation_difference (units = pixels?).
:param aspect: An aspect map with the same shape as elevation_difference (units = radians).
:param min_count: The minimum allowed bin size to consider valid.
:raises ValueError: If very few finite values exist to analyse.
:returns: The pixel offsets in easting, northing, and the c_parameter (altitude?).
"""
input_x_values = aspect
with np.errstate(divide="ignore", invalid="ignore"):
input_y_values = elevation_difference / slope
# Remove non-finite values
x_values = input_x_values[np.isfinite(input_x_values) & np.isfinite(input_y_values)]
y_values = input_y_values[np.isfinite(input_x_values) & np.isfinite(input_y_values)]
assert y_values.shape[0] > 0
# Remove outliers
lower_percentile = np.percentile(y_values, 1)
upper_percentile = np.percentile(y_values, 99)
valids = np.where((y_values > lower_percentile) & (y_values < upper_percentile) & (np.abs(y_values) < 200))
x_values = x_values[valids]
y_values = y_values[valids]
# Slice the dataset into appropriate aspect bins
step = np.pi / 36
slice_bounds = np.arange(start=0, stop=2 * np.pi, step=step)
y_medians = np.zeros([len(slice_bounds)])
count = y_medians.copy()
for i, bound in enumerate(slice_bounds):
y_slice = y_values[(bound < x_values) & (x_values < (bound + step))]
if y_slice.shape[0] > 0:
y_medians[i] = np.median(y_slice)
count[i] = y_slice.shape[0]
# Filter out bins with counts below threshold
y_medians = y_medians[count > min_count]
slice_bounds = slice_bounds[count > min_count]
if slice_bounds.shape[0] < 10:
raise ValueError("Less than 10 different cells exist.")
# Make an initial guess of the a, b, and c parameters
initial_guess: tuple[float, float, float] = (3 * np.std(y_medians) / (2 ** 0.5), 0.0, np.mean(y_medians))
def estimate_ys(x_values: np.ndarray, parameters: tuple[float, float, float]) -> np.ndarray:
"""
Estimate y-values from x-values and the current parameters.
y(x) = a * cos(b - x) + c
:param x_values: The x-values to feed the above function.
:param parameters: The a, b, and c parameters to feed the above function
:returns: Estimated y-values with the same shape as the given x-values
"""
return parameters[0] * np.cos(parameters[1] - x_values) + parameters[2]
def residuals(parameters: tuple[float, float, float], y_values: np.ndarray, x_values: np.ndarray):
"""
Get the residuals between the estimated and measured values using the given parameters.
err(x, y) = est_y(x) - y
:param parameters: The a, b, and c parameters to use for the estimation.
:param y_values: The measured y-values.
:param x_values: The measured x-values
:returns: An array of residuals with the same shape as the input arrays.
"""
err = estimate_ys(x_values, parameters) - y_values
return err
# Estimate the a, b, and c parameters with least square minimisation
plsq = scipy.optimize.leastsq(func=residuals, x0=initial_guess, args=(y_medians, slice_bounds), full_output=1)
a_parameter, b_parameter, c_parameter = plsq[0]
# Calculate the easting and northing offsets from the above parameters
east_offset = a_parameter * np.sin(b_parameter)
north_offset = a_parameter * np.cos(b_parameter)
return east_offset, north_offset, c_parameter
[docs]def calculate_slope_and_aspect(dem: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate the slope and aspect of a DEM.
:param dem: A numpy array of elevation values.
:returns: The slope (in pixels??) and aspect (in radians) of the DEM.
"""
# TODO: Figure out why slope is called slope_px. What unit is it in?
# TODO: Change accordingly in the get_horizontal_shift docstring.
# Calculate the gradient of the slope
gradient_y, gradient_x = np.gradient(dem)
slope_px = np.sqrt(gradient_x ** 2 + gradient_y ** 2)
aspect = np.arctan2(-gradient_x, gradient_y)
aspect += np.pi
return slope_px, aspect
[docs]def deramping(elevation_difference, x_coordinates: np.ndarray, y_coordinates: np.ndarray,
degree: int, verbose: bool = False,
metadata: Optional[dict[str, Any]] = None) -> Callable[[np.ndarray, np.ndarray], np.ndarray]:
"""
Calculate a deramping function to account for rotational and non-rigid components of the elevation difference.
:param elevation_difference: The elevation difference array to analyse.
:param x_coordinates: x-coordinates of the above array (must have the same shape as elevation_difference)
:param y_coordinates: y-coordinates of the above array (must have the same shape as elevation_difference)
:param degree: The polynomial degree to estimate the ramp.
:param verbose: Print the least squares optimization progress.
:param metadata: Optional. A metadata dictionary that will be updated with the key "deramp".
:returns: A callable function to estimate the ramp.
"""
#warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning)
# Extract only the finite values of the elevation difference and corresponding coordinates.
valid_diffs = elevation_difference[np.isfinite(elevation_difference)]
valid_x_coords = x_coordinates[np.isfinite(elevation_difference)]
valid_y_coords = y_coordinates[np.isfinite(elevation_difference)]
# Randomly subsample the values if there are more than 500,000 of them.
if valid_x_coords.shape[0] > 500_000:
random_indices = np.random.randint(0, valid_x_coords.shape[0] - 1, 500_000)
valid_diffs = valid_diffs[random_indices]
valid_x_coords = valid_x_coords[random_indices]
valid_y_coords = valid_y_coords[random_indices]
# Create a function whose residuals will be attempted to minimise
def estimate_values(x_coordinates: np.ndarray, y_coordinates: np.ndarray,
coefficients: np.ndarray, degree: int) -> np.ndarray:
"""
Estimate values from a 2D-polynomial.
:param x_coordinates: x-coordinates of the difference array (must have the same shape as elevation_difference).
:param y_coordinates: y-coordinates of the difference array (must have the same shape as elevation_difference).
:param coefficients: The coefficients (a, b, c, etc.) of the polynomial.
:param degree: The degree of the polynomial.
:raises ValueError: If the length of the coefficients list is not compatible with the degree.
:returns: The values estimated by the polynomial.
"""
# Check that the coefficient size is correct.
coefficient_size = (degree + 1) * (degree + 2) / 2
if len(coefficients) != coefficient_size:
raise ValueError()
# Do Amaury's black magic to estimate the values.
estimated_values = np.sum([coefficients[k * (k + 1) // 2 + j] * x_coordinates ** (k - j) *
y_coordinates ** j for k in range(degree + 1) for j in range(k + 1)], axis=0)
return estimated_values # type: ignore
# Creat the error function
def residuals(coefficients: np.ndarray, values: np.ndarray, x_coordinates: np.ndarray,
y_coordinates: np.ndarray, degree: int) -> np.ndarray:
"""
Calculate the difference between the estimated and measured values.
:param coefficients: Coefficients for the estimation.
:param values: The measured values.
:param x_coordinates: The x-coordinates of the values.
:param y_coordinates: The y-coordinates of the values.
:param degree: The degree of the polynomial to estimate.
:returns: An array of residuals.
"""
error = estimate_values(x_coordinates, y_coordinates, coefficients, degree) - values
error = error[np.isfinite(error)]
return error
# Run a least-squares minimisation to estimate the correct coefficients.
# TODO: Maybe remove the full_output?
initial_guess = np.zeros(shape=((degree + 1) * (degree + 2) // 2))
if verbose:
print("Deramping...")
coefficients = scipy.optimize.least_squares(
fun=residuals,
x0=initial_guess,
args=(valid_diffs, valid_x_coords, valid_y_coords, degree),
verbose=2 if verbose and degree > 1 else 0
).x
# Generate the return-function which can correctly estimate the ramp
def ramp(x_coordinates: np.ndarray, y_coordinates: np.ndarray) -> np.ndarray:
"""
Get the values of the ramp that corresponds to given coordinates.
:param x_coordinates: x-coordinates of interest.
:param y_coordinates: y-coordinates of interest.
:returns: The estimated ramp offsets.
"""
return estimate_values(x_coordinates, y_coordinates, coefficients, degree)
if metadata is not None:
metadata["deramp"] = {
"coefficients": coefficients,
"nmad": xdem.spatial_tools.nmad(residuals(coefficients, valid_diffs, valid_x_coords, valid_y_coords, degree))
}
# Return the function which can be used later.
return ramp
[docs]def mask_as_array(reference_raster: gu.georaster.Raster, mask: Union[str, gu.geovector.Vector, gu.georaster.Raster]) -> np.ndarray:
"""
Convert a given mask into an array.
:param reference_raster: The raster to use for rasterizing the mask if the mask is a vector.
:param mask: A valid Vector, Raster or a respective filepath to a mask.
:raises: ValueError: If the mask path is invalid.
:raises: TypeError: If the wrong mask type was given.
:returns: The mask as a squeezed array.
"""
# Try to load the mask file if it's a filepath
if isinstance(mask, str):
# First try to load it as a Vector
try:
mask = gu.geovector.Vector(mask)
# If the format is unsopported, try loading as a Raster
except fiona.errors.DriverError:
try:
mask = gu.georaster.Raster(mask)
# If that fails, raise an error
except rio.errors.RasterioIOError:
raise ValueError(f"Mask path not in a supported Raster or Vector format: {mask}")
# At this point, the mask variable is either a Raster or a Vector
# Now, convert the mask into an array by either rasterizing a Vector or by fetching a Raster's data
if isinstance(mask, gu.geovector.Vector):
mask_array = mask.create_mask(reference_raster)
elif isinstance(mask, gu.georaster.Raster):
# The true value is the maximum value in the raster, unless the maximum value is 0 or False
true_value = np.nanmax(mask.data) if not np.nanmax(mask.data) in [0, False] else True
mask_array = (mask.data == true_value).squeeze()
else:
raise TypeError(
f"Mask has invalid type: {type(mask)}. Expected one of: "
f"{[gu.georaster.Raster, gu.geovector.Vector, str, type(None)]}"
)
return mask_array
def _transform_to_bounds_and_res(shape: tuple[int, ...],
transform: rio.transform.Affine) -> tuple[rio.coords.BoundingBox, float]:
"""Get the bounding box and (horizontal) resolution from a transform and the shape of a DEM."""
bounds = rio.coords.BoundingBox(
*rio.transform.array_bounds(shape[0], shape[1], transform=transform))
resolution = (bounds.right - bounds.left) / shape[1]
return bounds, resolution
def _get_x_and_y_coords(shape: tuple[int, ...], transform: rio.transform.Affine):
"""Generate center coordinates from a transform and the shape of a DEM."""
bounds, resolution = _transform_to_bounds_and_res(shape, transform)
x_coords, y_coords = np.meshgrid(
np.linspace(bounds.left + resolution / 2, bounds.right - resolution / 2, num=shape[1]),
np.linspace(bounds.bottom + resolution / 2, bounds.top - resolution / 2, num=shape[0])[::-1]
)
return x_coords, y_coords
CoregType = TypeVar("CoregType", bound="Coreg")
[docs]class Coreg:
"""
Generic Coreg class.
Made to be subclassed.
"""
_fit_called: bool = False # Flag to check if the .fit() method has been called.
_is_affine: Optional[bool] = None
[docs] def __init__(self, meta: Optional[dict[str, Any]] = None, matrix: Optional[np.ndarray] = None):
"""Instantiate a generic Coreg method."""
self._meta: dict[str, Any] = meta or {} # All __init__ functions should instantiate an empty dict.
if matrix is not None:
with warnings.catch_warnings():
# This error is fixed in the upcoming 1.8
warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`")
valid_matrix = pytransform3d.transformations.check_transform(matrix)
self._meta["matrix"] = valid_matrix
[docs] def fit(self, reference_dem: Union[np.ndarray, np.ma.masked_array],
dem_to_be_aligned: Union[np.ndarray, np.ma.masked_array],
inlier_mask: Optional[np.ndarray] = None,
transform: Optional[rio.transform.Affine] = None,
weights: Optional[np.ndarray] = None,
subsample: Union[float, int] = 1.0,
verbose: bool = False):
"""
Estimate the coregistration transform on the given DEMs.
:param reference_dem: 2D array of elevation values acting reference.
:param dem_to_be_aligned: 2D array of elevation values to be aligned.
:param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True).
:param transform: Optional. Transform of the reference_dem. Mandatory in some cases.
:param weights: Optional. Per-pixel weights for the coregistration.
:param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count.
:param verbose: Print progress messages to stdout.
"""
if weights is not None:
raise NotImplementedError("Weights have not yet been implemented")
ref_dem, ref_mask = xdem.spatial_tools.get_array_and_mask(reference_dem)
tba_dem, tba_mask = xdem.spatial_tools.get_array_and_mask(dem_to_be_aligned)
# Make sure that the mask has an expected format.
if inlier_mask is not None:
inlier_mask = np.asarray(inlier_mask).squeeze()
assert inlier_mask.dtype == bool, f"Invalid mask dtype: '{inlier_mask.dtype}'. Expected 'bool'"
if np.all(~inlier_mask):
raise ValueError("'inlier_mask' had no inliers.")
ref_dem[~inlier_mask] = np.nan
tba_dem[~inlier_mask] = np.nan
if np.all(ref_mask):
raise ValueError("'reference_dem' had only NaNs")
if np.all(tba_mask):
raise ValueError("'dem_to_be_aligned' had only NaNs")
# If subsample is not equal to one, subsampling should be performed.
if subsample != 1.0:
# The full mask (inliers=True) is the inverse of the above masks and the provided mask.
full_mask = (~ref_mask & ~tba_mask & (np.asarray(inlier_mask) if inlier_mask is not None else True)).squeeze()
# If subsample is less than one, it is parsed as a fraction (e.g. 0.8 => retain 80% of the values)
if subsample < 1.0:
subsample = int(np.count_nonzero(full_mask) * (1 - subsample))
# Randomly pick N inliers in the full_mask where N=subsample
random_falses = np.random.choice(np.argwhere(full_mask.flatten()).squeeze(), int(subsample), replace=False)
# Convert the 1D indices to 2D indices
cols = (random_falses // full_mask.shape[0]).astype(int)
rows = random_falses % full_mask.shape[0]
# Set the N random inliers to be parsed as outliers instead.
full_mask[rows, cols] = False
# Run the associated fitting function
self._fit_func(ref_dem=ref_dem, tba_dem=tba_dem, transform=transform, weights=weights, verbose=verbose)
# Flag that the fitting function has been called.
self._fit_called = True
[docs] def apply(self, dem: Union[np.ndarray, np.ma.masked_array],
transform: rio.transform.Affine) -> Union[np.ndarray, np.ma.masked_array]:
"""
Apply the estimated transform to a DEM.
:param dem: A DEM to apply the transform on.
:param transform: The transform object of the DEM. TODO: Remove??
:returns: The transformed DEM.
"""
if not self._fit_called and self._meta.get("matrix") is None:
raise AssertionError(".fit() does not seem to have been called yet")
# The array to provide the functions will be an ndarray with NaNs for masked out areas.
dem_array, dem_mask = xdem.spatial_tools.get_array_and_mask(dem)
if np.all(dem_mask):
raise ValueError("'dem' had only NaNs")
# See if a _apply_func exists
try:
# Run the associated apply function
applied_dem = self._apply_func(dem_array, transform) # pylint: disable=assignment-from-no-return
# If it doesn't exist, use apply_matrix()
except NotImplementedError:
if self.is_affine: # This only works on it's affine, however.
# Apply the matrix around the centroid (if defined, otherwise just from the center).
applied_dem = apply_matrix(
dem_array,
transform=transform,
matrix=self.to_matrix(),
centroid=self._meta.get("centroid"),
dilate_mask=True
)
else:
raise ValueError("Coreg method is non-rigid but has no implemented _apply_func")
# Return the array in the same format as it was given (ndarray or masked_array)
return np.ma.masked_array(applied_dem, mask=dem.mask) if isinstance(dem, np.ma.masked_array) else applied_dem
[docs] def apply_pts(self, coords: np.ndarray) -> np.ndarray:
"""
Apply the estimated transform to a set of 3D points.
:param coords: A (N, 3) array of X/Y/Z coordinates or one coordinate of shape (3,).
:returns: The transformed coordinates.
"""
if not self._fit_called and self._meta.get("matrix") is None:
raise AssertionError(".fit() does not seem to have been called yet")
# If the coordinates represent just one coordinate
if np.shape(coords) == (3,):
coords = np.reshape(coords, (1, 3))
assert len(np.shape(coords)) == 2 and np.shape(coords)[1] == 3, f"'coords' shape must be (N, 3). Given shape: {np.shape(coords)}"
coords_c = coords.copy()
# See if an _apply_pts_func exists
try:
transformed_points = self._apply_pts_func(coords)
# If it doesn't exist, use opencv's perspectiveTransform
except NotImplementedError:
if self.is_affine: # This only works on it's rigid, however.
# Transform the points (around the centroid if it exists).
if self._meta.get("centroid") is not None:
coords_c -= self._meta["centroid"]
transformed_points = cv2.perspectiveTransform(coords_c.reshape(1, -1, 3), self.to_matrix()).squeeze()
if self._meta.get("centroid") is not None:
transformed_points += self._meta["centroid"]
else:
raise ValueError("Coreg method is non-rigid but has not implemented _apply_pts_func")
return transformed_points
@property
def is_affine(self) -> bool:
"""Check if the transform be explained by a 3D affine transform."""
# _is_affine is found by seeing if to_matrix() raises an error.
# If this hasn't been done yet, it will be None
if self._is_affine is None:
try: # See if to_matrix() raises an error.
self.to_matrix()
self._is_affine = True
except (ValueError, NotImplementedError):
self._is_affine = False
return self._is_affine
[docs] def to_matrix(self) -> np.ndarray:
"""Convert the transform to a 4x4 transformation matrix."""
return self._to_matrix_func()
[docs] def centroid(self) -> Optional[tuple[float, float, float]]:
"""Get the centroid of the coregistration, if defined."""
meta_centroid = self._meta.get("centroid")
if meta_centroid is None:
return None
# Unpack the centroid in case it is in an unexpected format (an array, list or something else).
return (meta_centroid[0], meta_centroid[1], meta_centroid[2])
[docs] def residuals(self, reference_dem: Union[np.ndarray, np.ma.masked_array],
dem_to_be_aligned: Union[np.ndarray, np.ma.masked_array],
inlier_mask: Optional[np.ndarray] = None,
transform: Optional[rio.transform.Affine] = None) -> np.ndarray:
"""
Calculate the residual offsets (the difference) between two DEMs after applying the transformation.
:param reference_dem: 2D array of elevation values acting reference.
:param dem_to_be_aligned: 2D array of elevation values to be aligned.
:param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True).
:param transform: Optional. Transform of the reference_dem. Mandatory in some cases.
:returns: A 1D array of finite residuals.
"""
# Use the transform to correct the DEM to be aligned.
aligned_dem = self.apply(dem_to_be_aligned, transform=transform)
# Format the reference DEM
ref_arr, ref_mask = xdem.spatial_tools.get_array_and_mask(reference_dem)
if inlier_mask is None:
inlier_mask = np.ones(ref_arr.shape, dtype=bool)
# Create the full inlier mask (manual inliers plus non-nans)
full_mask = (~ref_mask) & np.isfinite(aligned_dem) & inlier_mask
# Calculate the DEM difference
diff = ref_arr - aligned_dem
# Return the difference values within the full inlier mask
return diff[full_mask]
[docs] def error(self, reference_dem: Union[np.ndarray, np.ma.masked_array],
dem_to_be_aligned: Union[np.ndarray, np.ma.masked_array],
error_type: str | list[str] = "nmad",
inlier_mask: Optional[np.ndarray] = None,
transform: Optional[rio.transform.Affine] = None) -> float | list[float]:
"""
Calculate the error of a coregistration approach.
Choices:
- "nmad": Default. The Normalized Median Absolute Deviation of the residuals.
- "median": The median of the residuals.
- "mean": The mean/average of the residuals
- "std": The standard deviation of the residuals.
- "rms": The root mean square of the residuals.
- "mae": The mean absolute error of the residuals.
- "count": The residual count.
:param reference_dem: 2D array of elevation values acting reference.
:param dem_to_be_aligned: 2D array of elevation values to be aligned.
:param error_type: The type of error meaure to calculate. May be a list of error types.
:param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True).
:param transform: Optional. Transform of the reference_dem. Mandatory in some cases.
:returns: The error measure of choice for the residuals.
"""
if isinstance(error_type, str):
error_type = [error_type]
residuals = self.residuals(reference_dem=reference_dem, dem_to_be_aligned=dem_to_be_aligned,
inlier_mask=inlier_mask, transform=transform)
error_functions = {
"nmad": xdem.spatial_tools.nmad,
"median": np.median,
"mean": np.mean,
"std": np.std,
"rms": lambda residuals: np.sqrt(np.mean(np.square(residuals))),
"mae": lambda residuals: np.mean(np.abs(residuals)),
"count": lambda residuals: residuals.size
}
try:
errors = [error_functions[err_type](residuals) for err_type in error_type]
except KeyError as exception:
raise ValueError(
f"Invalid 'error_type'{'s' if len(error_type) > 1 else ''}: "
f"'{error_type}'. Choices: {list(error_functions.keys())}"
) from exception
return errors if len(errors) > 1 else errors[0]
[docs] @classmethod
def from_matrix(cls, matrix: np.ndarray):
"""
Instantiate a generic Coreg class from a transformation matrix.
:param matrix: A 4x4 transformation matrix. Shape must be (4,4).
:raises ValueError: If the matrix is incorrectly formatted.
:returns: The instantiated generic Coreg class.
"""
if np.any(~np.isfinite(matrix)):
raise ValueError(f"Matrix has non-finite values:\n{matrix}")
with warnings.catch_warnings():
# This error is fixed in the upcoming 1.8
warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`")
valid_matrix = pytransform3d.transformations.check_transform(matrix)
return cls(matrix=valid_matrix)
[docs] @classmethod
def from_translation(cls, x_off: float = 0.0, y_off: float = 0.0, z_off: float = 0.0):
"""
Instantiate a generic Coreg class from a X/Y/Z translation.
:param x_off: The offset to apply in the X (west-east) direction.
:param y_off: The offset to apply in the Y (south-north) direction.
:param z_off: The offset to apply in the Z (vertical) direction.
:raises ValueError: If the given translation contained invalid values.
:returns: An instantiated generic Coreg class.
"""
matrix = np.diag(np.ones(4, dtype=float))
matrix[0, 3] = x_off
matrix[1, 3] = y_off
matrix[2, 3] = z_off
return cls.from_matrix(matrix)
[docs] def copy(self: CoregType) -> CoregType:
"""Return an identical copy of the class."""
new_coreg = self.__new__(type(self))
new_coreg.__dict__ = {key: copy.copy(value) for key, value in self.__dict__.items()}
return new_coreg
def __add__(self, other: Coreg) -> CoregPipeline:
"""Return a pipeline consisting of self and the other coreg function."""
if not isinstance(other, Coreg):
raise ValueError(f"Incompatible add type: {type(other)}. Expected 'Coreg' subclass")
return CoregPipeline([self, other])
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
# FOR DEVELOPERS: This function needs to be implemented.
raise NotImplementedError("This should have been implemented by subclassing")
def _to_matrix_func(self) -> np.ndarray:
# FOR DEVELOPERS: This function needs to be implemented if the `self._meta['matrix']` keyword is not None.
# Try to see if a matrix exists.
meta_matrix = self._meta.get("matrix")
if meta_matrix is not None:
assert meta_matrix.shape == (4, 4), f"Invalid _meta matrix shape. Expected: (4, 4), got {meta_matrix.shape}"
return meta_matrix
raise NotImplementedError("This should be implemented by subclassing")
def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray:
# FOR DEVELOPERS: This function is only needed for non-rigid transforms.
raise NotImplementedError("This should have been implemented by subclassing")
def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray:
# FOR DEVELOPERS: This function is only needed for non-rigid transforms.
raise NotImplementedError("This should have been implemented by subclassing")
[docs]class BiasCorr(Coreg):
"""
DEM bias correction.
Estimates the mean (or median, weighted avg., etc.) offset between two DEMs.
"""
[docs] def __init__(self, bias_func=np.average): # pylint: disable=super-init-not-called
"""
Instantiate a bias correction object.
:param bias_func: The function to use for calculating the bias. Default: (weighted) average.
"""
super().__init__(meta={"bias_func": bias_func})
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
"""Estimate the bias using the bias_func."""
if verbose:
print("Estimating bias...")
diff = ref_dem - tba_dem
diff = diff[np.isfinite(diff)]
if np.count_nonzero(np.isfinite(diff)) == 0:
raise ValueError("No finite values in bias comparison.")
# Use weights if those were provided.
bias = self._meta["bias_func"](diff) if weights is None \
else self._meta["bias_func"](diff, weights=weights)
if verbose:
print("Bias estimated")
self._meta["bias"] = bias
def _to_matrix_func(self) -> np.ndarray:
"""Convert the bias to a transform matrix."""
empty_matrix = np.diag(np.ones(4, dtype=float))
empty_matrix[2, 3] += self._meta["bias"]
return empty_matrix
[docs]class ICP(Coreg):
"""
Iterative Closest Point DEM coregistration.
Estimates a rigid transform (rotation + translation) between two DEMs.
Requires 'opencv'
See opencv docs for more info: https://docs.opencv.org/master/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html
"""
[docs] def __init__(self, max_iterations=100, tolerance=0.05, rejection_scale=2.5, num_levels=6):
"""
Instantiate an ICP coregistration object.
:param max_iterations: The maximum allowed iterations before stopping.
:param tolerance: The residual change threshold after which to stop the iterations.
:param rejection_scale: The threshold (std * rejection_scale) to consider points as outliers.
:param num_levels: Number of octree levels to consider. A higher number is faster but may be more inaccurate.
"""
if not _has_cv2:
raise ValueError("Optional dependency needed. Install 'opencv'")
self.max_iterations = max_iterations
self.tolerance = tolerance
self.rejection_scale = rejection_scale
self.num_levels = num_levels
super().__init__()
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
"""Estimate the rigid transform from tba_dem to ref_dem."""
if weights is not None:
warnings.warn("ICP was given weights, but does not support it.")
bounds, resolution = _transform_to_bounds_and_res(ref_dem.shape, transform)
points: dict[str, np.ndarray] = {}
# Generate the x and y coordinates for the reference_dem
x_coords, y_coords = _get_x_and_y_coords(ref_dem.shape, transform)
centroid = np.array([np.mean([bounds.left, bounds.right]), np.mean([bounds.bottom, bounds.top]), 0.0])
# Subtract by the bounding coordinates to avoid float32 rounding errors.
x_coords -= centroid[0]
y_coords -= centroid[1]
for key, dem in zip(["ref", "tba"], [ref_dem, tba_dem]):
gradient_x, gradient_y = np.gradient(dem)
normal_east = np.sin(np.arctan(gradient_y / resolution)) * -1
normal_north = np.sin(np.arctan(gradient_x / resolution))
normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0)
valid_mask = ~np.isnan(dem) & ~np.isnan(normal_east) & ~np.isnan(normal_north)
point_cloud = np.dstack([
x_coords[valid_mask],
y_coords[valid_mask],
dem[valid_mask],
normal_east[valid_mask],
normal_north[valid_mask],
normal_up[valid_mask]
]).squeeze()
points[key] = point_cloud[~np.any(np.isnan(point_cloud), axis=1)].astype("float32")
icp = cv2.ppf_match_3d_ICP(self.max_iterations, self.tolerance, self.rejection_scale, self.num_levels)
if verbose:
print("Running ICP...")
try:
_, residual, matrix = icp.registerModelToScene(points["tba"], points["ref"])
except cv2.error as exception:
if "(expected: 'n > 0'), where" not in str(exception):
raise exception
raise ValueError(
"Not enough valid points in input data."
f"'reference_dem' had {points['ref'].size} valid points."
f"'dem_to_be_aligned' had {points['tba'].size} valid points."
)
if verbose:
print("ICP finished")
assert residual < 1000, f"ICP coregistration failed: residual={residual}, threshold: 1000"
self._meta["centroid"] = centroid
self._meta["matrix"] = matrix
[docs]class Deramp(Coreg):
"""
Polynomial DEM deramping.
Estimates an n-D polynomial between the difference of two DEMs.
"""
[docs] def __init__(self, degree: int = 1, subsample: Union[int, float] = 5e5):
"""
Instantiate a deramping correction object.
:param degree: The polynomial degree to estimate. degree=0 is a simple bias correction.
:param subsample: Factor for subsampling the input raster for speed-up.
If <= 1, will be considered a fraction of valid pixels to extract.
If > 1 will be considered the number of pixels to extract.
"""
self.degree = degree
self.subsample = subsample
super().__init__()
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
"""Fit the dDEM between the DEMs to a least squares polynomial equation."""
x_coords, y_coords = _get_x_and_y_coords(ref_dem.shape, transform)
ddem = ref_dem - tba_dem
valid_mask = np.isfinite(ddem)
ddem = ddem[valid_mask]
x_coords = x_coords[valid_mask]
y_coords = y_coords[valid_mask]
# Formulate the 2D polynomial whose coefficients will be solved for.
def poly2d(x_coordinates: np.ndarray, y_coordinates: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
"""
Estimate values from a 2D-polynomial.
:param x_coordinates: x-coordinates of the difference array (must have the same shape as elevation_difference).
:param y_coordinates: y-coordinates of the difference array (must have the same shape as elevation_difference).
:param coefficients: The coefficients (a, b, c, etc.) of the polynomial.
:param degree: The degree of the polynomial.
:raises ValueError: If the length of the coefficients list is not compatible with the degree.
:returns: The values estimated by the polynomial.
"""
# Check that the coefficient size is correct.
coefficient_size = (self.degree + 1) * (self.degree + 2) / 2
if len(coefficients) != coefficient_size:
raise ValueError()
# Do Amaury's black magic to formulate and calculate the polynomial equation.
estimated_values = np.sum([coefficients[k * (k + 1) // 2 + j] * x_coordinates ** (k - j) *
y_coordinates ** j for k in range(self.degree + 1) for j in range(k + 1)], axis=0)
return estimated_values # type: ignore
def residuals(coefs: np.ndarray, x_coords: np.ndarray, y_coords: np.ndarray, targets: np.ndarray):
return np.median(np.abs(targets - poly2d(x_coords, y_coords, coefs)))
if verbose:
print("Estimating deramp function...")
# reduce number of elements for speed
# Get number of points to extract
max_points = np.size(x_coords)
if (self.subsample <= 1) & (self.subsample >= 0):
npoints = int(self.subsample * max_points)
elif self.subsample > 1:
npoints = int(self.subsample)
else:
raise ValueError("`subsample` must be >= 0")
if max_points > npoints:
indices = np.random.choice(max_points, npoints, replace=False)
x_coords = x_coords[indices]
y_coords = y_coords[indices]
ddem = ddem[indices]
# Optimize polynomial parameters
coefs = scipy.optimize.fmin(
func=residuals,
x0=np.zeros(shape=((self.degree + 1) * (self.degree + 2) // 2)),
args=(x_coords, y_coords, ddem),
disp=verbose
)
self._meta["coefficients"] = coefs
self._meta["func"] = lambda x, y: poly2d(x, y, coefs)
def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray:
"""Apply the deramp function to a DEM."""
x_coords, y_coords = _get_x_and_y_coords(dem.shape, transform)
ramp = self._meta["func"](x_coords, y_coords)
return dem + ramp
def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray:
"""Apply the deramp function to a set of points."""
new_coords = coords.copy()
new_coords[:, 2] += self._meta["func"](new_coords[:, 0], new_coords[:, 1])
return new_coords
def _to_matrix_func(self) -> np.ndarray:
"""Return a transform matrix if possible."""
if self.degree > 1:
raise ValueError(
"Nonlinear deramping degrees cannot be represented as transformation matrices."
f" (max 1, given: {self.degree})")
if self.degree == 1:
raise NotImplementedError("Vertical shift, rotation and horizontal scaling has to be implemented.")
# If degree==0, it's just a bias correction
empty_matrix = np.diag(np.ones(4, dtype=float))
empty_matrix[2, 3] += self._meta["coefficients"][0]
return empty_matrix
[docs]class CoregPipeline(Coreg):
"""
A sequential set of coregistration steps.
"""
[docs] def __init__(self, pipeline: list[Coreg]):
"""
Instantiate a new coregistration pipeline.
:param: Coregistration steps to run in the sequence they are given.
"""
self.pipeline = pipeline
super().__init__()
def __repr__(self):
return f"CoregPipeline: {self.pipeline}"
[docs] def copy(self: CoregType) -> CoregType:
"""Return an identical copy of the class."""
new_coreg = self.__new__(type(self))
new_coreg.__dict__ = {key: copy.copy(value) for key, value in self.__dict__.items() if key != "pipeline"}
new_coreg.pipeline = [step.copy() for step in self.pipeline]
return new_coreg
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
"""Fit each coregistration step with the previously transformed DEM."""
tba_dem_mod = tba_dem.copy()
for i, coreg in enumerate(self.pipeline):
if verbose:
print(f"Running pipeline step: {i + 1} / {len(self.pipeline)}")
coreg._fit_func(ref_dem, tba_dem_mod, transform=transform, weights=weights, verbose=verbose)
coreg._fit_called = True
tba_dem_mod = coreg.apply(tba_dem_mod, transform)
def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray:
"""Apply the coregistration steps sequentially to a DEM."""
dem_mod = dem.copy()
for coreg in self.pipeline:
dem_mod = coreg.apply(dem_mod, transform)
return dem_mod
def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray:
"""Apply the coregistration steps sequentially to a set of points."""
coords_mod = coords.copy()
for coreg in self.pipeline:
coords_mod = coreg.apply_pts(coords_mod).reshape(coords_mod.shape)
return coords_mod
def _to_matrix_func(self) -> np.ndarray:
"""Try to join the coregistration steps to a single transformation matrix."""
if not _HAS_P3D:
raise ValueError("Optional dependency needed. Install 'pytransform3d'")
transform_mgr = TransformManager()
with warnings.catch_warnings():
# Deprecation warning from pytransform3d. Let's hope that is fixed in the near future.
warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`")
for i, coreg in enumerate(self.pipeline):
new_matrix = coreg.to_matrix()
transform_mgr.add_transform(i, i + 1, new_matrix)
return transform_mgr.get_transform(0, len(self.pipeline))
def __iter__(self):
"""Iterate over the pipeline steps."""
for coreg in self.pipeline:
yield coreg
def __add__(self, other: Union[list[Coreg], Coreg, CoregPipeline]) -> CoregPipeline:
"""Append Coreg(s) or a CoregPipeline to the pipeline."""
if not isinstance(other, Coreg):
other = list(other)
else:
other = [other]
pipelines = self.pipeline + other
return CoregPipeline(pipelines)
[docs]class NuthKaab(Coreg):
"""
Nuth and Kääb (2011) DEM coregistration.
Implemented after the paper:
https://doi.org/10.5194/tc-5-271-2011
"""
[docs] def __init__(self, max_iterations: int = 10, offset_threshold: float = 0.05):
"""
Instantiate a new Nuth and Kääb (2011) coregistration object.
:param max_iterations: The maximum allowed iterations before stopping.
:param offset_threshold: The residual offset threshold after which to stop the iterations.
"""
self.max_iterations = max_iterations
self.offset_threshold = offset_threshold
super().__init__()
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
"""Estimate the x/y/z offset between two DEMs."""
if verbose:
print("Running Nuth and Kääb (2011) coregistration")
bounds, resolution = _transform_to_bounds_and_res(ref_dem.shape, transform)
# Make a new DEM which will be modified inplace
aligned_dem = tba_dem.copy()
# Calculate slope and aspect maps from the reference DEM
if verbose:
print(" Calculate slope and aspect")
slope, aspect = calculate_slope_and_aspect(ref_dem)
# Make index grids for the east and north dimensions
east_grid = np.arange(ref_dem.shape[1])
north_grid = np.arange(ref_dem.shape[0])
# Make a function to estimate the aligned DEM (used to construct an offset DEM)
elevation_function = scipy.interpolate.RectBivariateSpline(
x=north_grid, y=east_grid, z=np.where(np.isnan(aligned_dem), -9999, aligned_dem), kx=1, ky=1
)
# Make a function to estimate nodata gaps in the aligned DEM (used to fix the estimated offset DEM)
# Use spline degree 1, as higher degrees will create instabilities around 1 and mess up the nodata mask
nodata_function = scipy.interpolate.RectBivariateSpline(
x=north_grid, y=east_grid, z=np.isnan(aligned_dem), kx=1, ky=1
)
# Initialise east and north pixel offset variables (these will be incremented up and down)
offset_east, offset_north, bias = 0.0, 0.0, 0.0
# Calculate initial dDEM statistics
elevation_difference = ref_dem - aligned_dem
bias = np.nanmedian(elevation_difference)
nmad_old = xdem.spatial_tools.nmad(elevation_difference)
if verbose:
print(" Statistics on initial dh:")
print(" Median = {:.2f} - NMAD = {:.2f}".format(bias, nmad_old))
# Iteratively run the analysis until the maximum iterations or until the error gets low enough
if verbose:
print(" Iteratively estimating horizontal shit:")
# If verbose is True, will use progressbar and print additional statements
pbar = trange(self.max_iterations, disable=not verbose, desc=" Progress")
for i in pbar:
# Calculate the elevation difference and the residual (NMAD) between them.
elevation_difference = ref_dem - aligned_dem
bias = np.nanmedian(elevation_difference)
# Correct potential biases
elevation_difference -= bias
# Estimate the horizontal shift from the implementation by Nuth and Kääb (2011)
east_diff, north_diff, _ = get_horizontal_shift( # type: ignore
elevation_difference=elevation_difference,
slope=slope,
aspect=aspect
)
if verbose:
pbar.write(" #{:d} - Offset in pixels : ({:.2f}, {:.2f})".format(i + 1, east_diff, north_diff))
# Increment the offsets with the overall offset
offset_east += east_diff
offset_north += north_diff
# Calculate new elevations from the offset x- and y-coordinates
new_elevation = elevation_function(y=east_grid + offset_east, x=north_grid - offset_north)
# Set NaNs where NaNs were in the original data
new_nans = nodata_function(y=east_grid + offset_east, x=north_grid - offset_north)
new_elevation[new_nans >= 1] = np.nan
# Assign the newly calculated elevations to the aligned_dem
aligned_dem = new_elevation
# Update statistics
elevation_difference = ref_dem - aligned_dem
bias = np.nanmedian(elevation_difference)
nmad_new = xdem.spatial_tools.nmad(elevation_difference)
nmad_gain = (nmad_new - nmad_old) / nmad_old*100
if verbose:
pbar.write(" Median = {:.2f} - NMAD = {:.2f} ==> Gain = {:.2f}%".format(bias, nmad_new, nmad_gain))
# Stop if the NMAD is low and a few iterations have been made
assert ~np.isnan(nmad_new), (offset_east, offset_north)
offset = np.sqrt(east_diff**2 + north_diff**2)
if i > 1 and offset < self.offset_threshold:
if verbose:
pbar.write(f" Last offset was below the residual offset threshold of {self.offset_threshold} -> stopping")
break
nmad_old = nmad_new
# Print final results
if verbose:
print("\n Final offset in pixels (east, north) : ({:f}, {:f})".format(offset_east, offset_north))
print(" Statistics on coregistered dh:")
print(" Median = {:.2f} - NMAD = {:.2f}".format(bias, nmad_new))
self._meta["offset_east_px"] = offset_east
self._meta["offset_north_px"] = offset_north
self._meta["bias"] = bias
self._meta["resolution"] = resolution
def _to_matrix_func(self) -> np.ndarray:
"""Return a transformation matrix from the estimated offsets."""
offset_east = self._meta["offset_east_px"] * self._meta["resolution"]
offset_north = self._meta["offset_north_px"] * self._meta["resolution"]
matrix = np.diag(np.ones(4, dtype=float))
matrix[0, 3] += offset_east
matrix[1, 3] += offset_north
matrix[2, 3] += self._meta["bias"]
return matrix
[docs]def invert_matrix(matrix: np.ndarray) -> np.ndarray:
"""Invert a transformation matrix."""
with warnings.catch_warnings():
# Deprecation warning from pytransform3d. Let's hope that is fixed in the near future.
warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`")
checked_matrix = pytransform3d.transformations.check_matrix(matrix)
# Invert the transform if wanted.
return pytransform3d.transformations.invert_transform(checked_matrix)
[docs]def apply_matrix(dem: np.ndarray, transform: rio.transform.Affine, matrix: np.ndarray, invert: bool = False,
centroid: Optional[tuple[float, float, float]] = None,
resampling: Union[int, str] = "bilinear",
dilate_mask: bool = False) -> np.ndarray:
"""
Apply a 3D transformation matrix to a 2.5D DEM.
The transformation is applied as a value correction using linear deramping, and 2D image warping.
1. Convert the DEM into a point cloud (not for gridding; for estimating the DEM shifts).
2. Transform the point cloud in 3D using the 4x4 matrix.
3. Measure the difference in elevation between the original and transformed points.
4. Estimate a linear deramp from the elevation difference, and apply the correction to the DEM values.
5. Convert the horizontal coordinates of the transformed points to pixel index coordinates.
6. Apply the pixel-wise displacement in 2D using the new pixel coordinates.
7. Apply the same displacement to a nodata-mask to exclude previous and/or new nans.
:param dem: The DEM to transform.
:param transform: The Affine transform object (georeferencing) of the DEM.
:param matrix: A 4x4 transformation matrix to apply to the DEM.
:param invert: Invert the transformation matrix.
:param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations. Defaults to the midpoint (Z=0)
:param resampling: The resampling method to use. Can be `nearest`, `bilinear`, `cubic` or an integer from 0-5.
:param dilate_mask: Dilate the nan mask to exclude edge pixels that could be wrong.
:returns: The transformed DEM with NaNs as nodata values (replaces a potential mask of the input `dem`).
"""
# Parse the resampling argument given.
if isinstance(resampling, int):
resampling_order = resampling
elif resampling == "cubic":
resampling_order = 3
elif resampling == "bilinear":
resampling_order = 1
elif resampling == "nearest":
resampling_order = 0
else:
raise ValueError(
f"`{resampling}` is not a valid resampling mode."
" Choices: [`nearest`, `bilinear`, `cubic`] or an integer."
)
# Copy the DEM to make sure the original is not modified, and convert it into an ndarray
demc = np.array(dem)
# Check if the matrix only contains a Z correction. In that case, only shift the DEM values by the bias.
empty_matrix = np.diag(np.ones(4, float))
empty_matrix[2, 3] = matrix[2, 3]
if np.mean(np.abs(empty_matrix - matrix)) == 0.0:
return demc + matrix[2, 3]
# Temporary. Should probably be removed.
#demc[demc == -9999] = np.nan
nan_mask = xdem.spatial_tools.get_mask(dem)
assert np.count_nonzero(~nan_mask) > 0, "Given DEM had all nans."
# Create a filled version of the DEM. (skimage doesn't like nans)
filled_dem = np.where(~nan_mask, demc, np.median(demc[~nan_mask]))
# Get the centre coordinates of the DEM pixels.
x_coords, y_coords = _get_x_and_y_coords(demc.shape, transform)
bounds, resolution = _transform_to_bounds_and_res(dem.shape, transform)
# If a centroid was not given, default to the bottom left corner.
if centroid is None:
centroid = (np.mean([bounds.left, bounds.right]), np.mean([bounds.bottom, bounds.top]), 0.0)
else:
assert len(centroid) == 3, f"Expected centroid to be 3D X/Y/Z coordinate. Got shape of {len(centroid)}"
# Shift the coordinates to centre around the centroid.
x_coords -= centroid[0]
y_coords -= centroid[1]
# Create a point cloud of X/Y/Z coordinates
point_cloud = np.dstack((x_coords, y_coords, filled_dem))
# Shift the Z components by the centroid.
point_cloud[:, 2] -= centroid[2]
if invert:
matrix = invert_matrix(matrix)
# Transform the point cloud using the matrix.
transformed_points = cv2.perspectiveTransform(
point_cloud.reshape((1, -1, 3)),
matrix,
).reshape(point_cloud.shape)
# Estimate the vertical difference of old and new point cloud elevations.
deramp = deramping(
(point_cloud[:, :, 2] - transformed_points[:, :, 2])[~nan_mask].flatten(),
point_cloud[:, :, 0][~nan_mask].flatten(),
point_cloud[:, :, 1][~nan_mask].flatten(),
degree=1
)
# Shift the elevation values of the soon-to-be-warped DEM.
filled_dem -= deramp(x_coords, y_coords)
# Create gap-free arrays of x and y coordinates to be converted into index coordinates.
x_inds = rio.fill.fillnodata(transformed_points[:, :, 0].copy(), mask=(~nan_mask).astype("uint8"))
y_inds = rio.fill.fillnodata(transformed_points[:, :, 1].copy(), mask=(~nan_mask).astype("uint8"))
# Divide the coordinates by the resolution to create index coordinates.
x_inds /= resolution
y_inds /= resolution
# Shift the x coords so that bounds.left is equivalent to xindex -0.5
x_inds -= x_coords.min() / resolution
# Shift the y coords so that bounds.top is equivalent to yindex -0.5
y_inds = (y_coords.max() / resolution) - y_inds
# Create a skimage-compatible array of the new index coordinates that the pixels shall have after warping.
inds = np.vstack((y_inds.reshape((1,) + y_inds.shape), x_inds.reshape((1,) + x_inds.shape)))
# Warp the DEM
transformed_dem = skimage.transform.warp(
filled_dem,
inds,
order=resampling_order,
mode="constant",
cval=0,
preserve_range=True
)
# Warp the NaN mask, setting true to all values outside the new frame.
tr_nan_mask = skimage.transform.warp(
nan_mask.astype("uint8"),
inds,
order=resampling_order,
mode="constant",
cval=1,
preserve_range=True
) > 0.25 # Due to different interpolation approaches, everything above 0.25 is assumed to be 1 (True)
if dilate_mask:
tr_nan_mask = scipy.ndimage.morphology.binary_dilation(tr_nan_mask, iterations=resampling_order)
# Apply the transformed nan_mask
transformed_dem[tr_nan_mask] = np.nan
assert np.count_nonzero(~np.isnan(transformed_dem)) > 0, "Transformed DEM has all nans."
return transformed_dem
[docs]class ZScaleCorr(Coreg):
"""
Correct linear or nonlinear elevation scale errors.
Often useful for nadir image DEM correction, where the focal length is slightly miscalculated.
DISCLAIMER: This function may introduce error when correcting non-photogrammetric biases.
See Gardelle et al. (2012) (Figure 2), http://dx.doi.org/10.3189/2012jog11j175, for curvature-related biases.
"""
[docs] def __init__(self, degree=1, bin_count=100):
"""
Instantiate a elevation scale correction object.
:param degree: The polynomial degree to estimate.
:param bin_count: The amount of bins to divide the elevation change in.
"""
self.degree = degree
self.bin_count = bin_count
super().__init__()
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine],
weights: Optional[np.ndarray], verbose: bool = False):
"""Estimate the scale difference between the two DEMs."""
ddem = ref_dem - tba_dem
medians = xdem.volume.hypsometric_binning(
ddem=ddem,
ref_dem=tba_dem,
bins=self.bin_count,
kind="count"
)["value"]
coefficients = np.polyfit(medians.index.mid, medians.values, deg=self.degree)
self._meta["coefficients"] = coefficients
def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray:
"""Apply the scaling model to a DEM."""
model = np.poly1d(self._meta["coefficients"])
return dem + model(dem)
def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray:
"""Apply the scaling model to a set of points."""
model = np.poly1d(self._meta["coefficients"])
new_coords = coords.copy()
new_coords[:, 2] += model(new_coords[:, 2])
return new_coords
def _to_matrix_func(self) -> np.ndarray:
"""Convert the transform to a matrix, if possible."""
if self.degree == 0: # If it's just a bias correction.
return self._meta["coefficients"][-1]
elif self.degree < 2:
raise NotImplementedError
else:
raise ValueError("A 2nd degree or higher ZScaleCorr cannot be described as a 4x4 matrix!")
[docs]class BlockwiseCoreg(Coreg):
"""
Block-wise coreg class for nonlinear estimations.
A coreg class of choice is run on an arbitrary subdivision of the raster. When later applying the coregistration,\
the optimal warping is interpolated based on X/Y/Z shifts from the coreg algorithm at the grid points.
E.g. a subdivision of 4 means to divide the DEM in four equally sized parts. These parts are then coregistered\
separately, creating four Coreg.fit results. If the subdivision is not divisible by the raster shape,\
subdivision is made as best as possible to have approximately equal pixel counts.
"""
[docs] def __init__(self, coreg: Coreg | CoregPipeline, subdivision: int, success_threshold: float = 0.8, n_threads: int | None = None):
"""
Instantiate a blockwise coreg object.
:param coreg: An instantiated coreg object to fit in the subdivided DEMs.
:param subdivision: The number of chunks to divide the DEMs in. E.g. 4 means four different transforms.
:param success_threshold: Raise an error if fewer chunks than the fraction failed for any reason.
:param n_threads: The maximum amount of threads to use. Default=auto
"""
if isinstance(coreg, type):
raise ValueError(
"The 'coreg' argument must be an instantiated Coreg subclass. "
"Hint: write e.g. ICP() instead of ICP"
)
self.coreg = coreg
self.subdivision = subdivision
self.success_threshold = success_threshold
self.n_threads = n_threads
super().__init__()
self._meta["coreg_meta"] = []
def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: rio.transform.Affine | None,
weights: np.ndarray | None, verbose: bool = False):
"""Fit the coreg approach for each subdivision."""
groups = self.subdivide_array(tba_dem.shape)
indices = np.unique(groups)
progress_bar = tqdm(total=indices.size, desc="Coregistering chunks", disable=(not verbose))
def coregister(i: int) -> dict[str, Any] | BaseException:
"""Coregister a chunk in a thread-safe way."""
inlier_mask = groups == i
# Find the corresponding slice of the inlier_mask to subset the data
rows, cols = np.where(inlier_mask)
arrayslice = np.s_[rows.min():rows.max() + 1, cols.min(): cols.max() + 1]
# Copy a subset of the two DEMs, the mask, the coreg instance, and make a new subset transform
ref_subset = ref_dem[arrayslice].copy()
tba_subset = tba_dem[arrayslice].copy()
mask_subset = inlier_mask[arrayslice].copy()
west, top = rio.transform.xy(transform, min(rows), min(cols), offset="ul")
transform_subset = rio.transform.from_origin(west, top, transform.a, -transform.e)
coreg = self.coreg.copy()
# Try to run the coregistration. If it fails for any reason, skip it and save the exception.
try:
coreg.fit(
reference_dem=ref_subset,
dem_to_be_aligned=tba_subset,
transform=transform_subset,
inlier_mask=mask_subset
)
except Exception as exception:
return exception
nmad, median = coreg.error(
reference_dem=ref_subset,
dem_to_be_aligned=tba_subset,
error_type=["nmad", "median"],
inlier_mask=mask_subset,
transform=transform_subset
)
meta: dict[str, Any] = {
"i": i,
"transform": transform_subset,
"inlier_count": np.count_nonzero(mask_subset & np.isfinite(ref_subset) & np.isfinite(tba_subset)),
"nmad": nmad,
"median": median
}
# Find the center of the inliers.
inlier_positions = np.argwhere(mask_subset)
mid_row = np.mean(inlier_positions[:, 0]).astype(int)
mid_col = np.mean(inlier_positions[:, 1]).astype(int)
# Find the indices of all finites within the mask
finites = np.argwhere(np.isfinite(tba_subset) & mask_subset)
# Calculate the distance between the approximate center and all finite indices
distances = np.linalg.norm(finites - np.array([mid_row, mid_col]), axis=1)
# Find the index representing the closest finite value to the center.
closest = np.argwhere(distances == distances.min())
# Assign the closest finite value as the representative point
representative_row, representative_col = finites[closest][0][0]
meta["representative_x"], meta["representative_y"] = rio.transform.xy(transform_subset, representative_row, representative_col)
meta["representative_val"] = ref_subset[representative_row, representative_col]
# If the coreg is a pipeline, copy its metadatas to the output meta
if hasattr(coreg, "pipeline"):
meta["pipeline"] = [step._meta.copy() for step in coreg.pipeline]
# Copy all current metadata (except for the alreay existing keys like "i", "min_row", etc, and the "coreg_meta" key)
# This can then be iteratively restored when the apply function should be called.
meta.update({key: value for key, value in coreg._meta.items() if key not in ["coreg_meta"] + list(meta.keys())})
progress_bar.update()
return meta.copy()
with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
results = executor.map(coregister, indices)
exceptions: list[BaseException] = []
for result in results:
if isinstance(result, BaseException):
exceptions.append(result)
else:
self._meta["coreg_meta"].append(result)
progress_bar.close()
# Stop if the success rate was below the threshold
if (len(self._meta["coreg_meta"]) / self.subdivision) <= self.success_threshold:
raise ValueError(
f"Fitting failed for {len(exceptions)} chunks:\n" +
"\n".join(map(str, exceptions[:5])) +
f"\n... and {len(exceptions) - 5} more" if len(exceptions) > 5 else ""
)
# Set the _fit_called parameters (only identical copies of self.coreg have actually been called)
self.coreg._fit_called = True
if hasattr(self.coreg, "pipeline"):
for step in self.coreg.pipeline:
step._fit_called = True
def _restore_metadata(self, meta: dict[str, Any]) -> None:
"""
Given some metadata, set it in the right place.
:param meta: A metadata file to update self._meta
"""
self.coreg._meta.update(meta)
if hasattr(self.coreg, "pipeline") and "pipeline" in meta:
for i, step in enumerate(self.coreg.pipeline):
step._meta.update(meta["pipeline"][i])
[docs] def to_points(self) -> np.ndarray:
"""
Convert the blockwise coregistration matrices to 3D (source -> destination) points.
The returned shape is (N, 3, 2) where the dimensions represent:
0. The point index where N is equal to the amount of subdivisions.
1. The X/Y/Z coordinate of the point.
2. The old/new position of the point.
To acquire the first point's original position: points[0, :, 0]
To acquire the first point's new position: points[0, :, 1]
To acquire the first point's Z difference: points[0, 2, 1] - points[0, 2, 0]
:returns: An array of 3D source -> destionation points.
"""
if len(self._meta["coreg_meta"]) == 0:
raise AssertionError("No coreg results exist. Has '.fit()' been called?")
points = np.empty(shape=(0, 3, 2))
for meta in self._meta["coreg_meta"]:
self._restore_metadata(meta)
#x_coord, y_coord = rio.transform.xy(meta["transform"], meta["representative_row"], meta["representative_col"])
x_coord, y_coord = meta["representative_x"], meta["representative_y"]
old_position = np.reshape([x_coord, y_coord, meta["representative_val"]], (1, 3))
new_position = self.coreg.apply_pts(old_position)
points = np.append(points, np.dstack((old_position, new_position)), axis=0)
return points
[docs] def stats(self) -> pd.DataFrame:
"""
Return statistics for each chunk in the blockwise coregistration.
* center_{x,y,z}: The center coordinate of the chunk in georeferenced units.
* {x,y,z}_off: The calculated offset in georeferenced units.
* inlier_count: The number of pixels that were inliers in the chunk.
* nmad: The NMAD after coregistration.
* median: The bias after coregistration.
:raises ValueError: If no coregistration results exist yet.
:returns: A dataframe of statistics for each chunk.
"""
points = self.to_points()
chunk_meta = {meta["i"]: meta for meta in self._meta["coreg_meta"]}
statistics: list[dict[str, Any]] = []
for i in range(points.shape[0]):
statistics.append(
{
"center_x": points[i, 0, 0],
"center_y": points[i, 1, 0],
"center_z": points[i, 2, 0],
"x_off": points[i, 0, 1] - points[i, 0, 0],
"y_off": points[i, 1, 1] - points[i, 1, 0],
"z_off": points[i, 2, 1] - points[i, 2, 0],
"inlier_count": chunk_meta[i]["inlier_count"],
"nmad": chunk_meta[i]["nmad"],
"median": chunk_meta[i]["median"]
}
)
stats_df = pd.DataFrame(statistics)
stats_df.index.name = "chunk"
return stats_df
[docs] def subdivide_array(self, shape: tuple[int, ...]) -> np.ndarray:
"""
Return the grid subdivision for a given DEM shape.
:param shape: The shape of the input DEM.
:returns: An array of shape 'shape' with 'self.subdivision' unique indices.
"""
if len(shape) == 3 and shape[0] == 1: # Account for (1, row, col) shapes
shape = (shape[1], shape[2])
return xdem.spatial_tools.subdivide_array(shape, count=self.subdivision)
def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray:
points = self.to_points()
bounds, resolution = _transform_to_bounds_and_res(dem.shape, transform)
representative_height = np.nanmean(dem)
edges_source = np.array([
[bounds.left + resolution / 2, bounds.top - resolution / 2, representative_height],
[bounds.right - resolution / 2, bounds.top - resolution / 2, representative_height],
[bounds.left + resolution / 2, bounds.bottom + resolution / 2, representative_height],
[bounds.right - resolution / 2, bounds.bottom + resolution / 2, representative_height]
])
edges_dest = self.apply_pts(edges_source)
edges = np.dstack((edges_source, edges_dest))
all_points = np.append(points, edges, axis=0)
warped_dem = warp_dem(
dem=dem,
transform=transform,
source_coords=all_points[:, :, 0],
destination_coords=all_points[:, :, 1],
resampling="linear"
)
return warped_dem
def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray:
"""Apply the scaling model to a set of points."""
points = self.to_points()
new_coords = coords.copy()
for dim in range(0, 3):
with warnings.catch_warnings():
# ZeroDivisionErrors may happen when the transformation is empty (which is fine)
warnings.filterwarnings("ignore", message="ZeroDivisionError")
model = scipy.interpolate.Rbf(
points[:, 0, 0],
points[:, 1, 0],
points[:, dim, 1] - points[:, dim, 0],
function="linear",
)
new_coords[:, dim] += model(coords[:, 0], coords[:, 1])
return new_coords
[docs]def warp_dem(
dem: np.ndarray,
transform: rio.transform.Affine,
source_coords: np.ndarray,
destination_coords: np.ndarray,
resampling: str = "cubic",
trim_border: bool = True
) -> np.ndarray:
"""
Warp a DEM using a set of source-destination 2D or 3D coordinates.
:param dem: The DEM to warp. Allowed shapes are (1, row, col) or (row, col)
:param transform: The Affine transform of the DEM.
:param source_coords: The source 2D or 3D points. must be X/Y/(Z) coords of shape (N, 2) or (N, 3).
:param destination_coords: The destination 2D or 3D points. Must have the exact same shape as 'source_coords'
:param resampling: The resampling order to use. Choices: ['nearest', 'linear', 'cubic'].
:param trim_border: Remove values outside of the interpolation regime (True) or leave them unmodified (False).
:raises ValueError: If the inputs are poorly formatted.
:raises AssertionError: For unexpected outputs.
:returns: A warped DEM with the same shape as the input.
"""
if source_coords.shape != destination_coords.shape:
raise ValueError(
f"Incompatible shapes: source_coords '({source_coords.shape})' and "
f"destination_coords '({destination_coords.shape})' shapes must be the same"
)
if (len(source_coords.shape) > 2) or (source_coords.shape[1] < 2) or (source_coords.shape[1] > 3):
raise ValueError(
"Invalid coordinate shape. Expected 2D or 3D coordinates of shape (N, 2) or (N, 3). "
f"Got '{source_coords.shape}'"
)
allowed_resampling_strs = ["nearest", "linear", "cubic"]
if resampling not in allowed_resampling_strs:
raise ValueError(f"Resampling type '{resampling}' not understood. Choices: {allowed_resampling_strs}")
dem_arr, dem_mask = xdem.spatial_tools.get_array_and_mask(dem)
bounds, resolution = _transform_to_bounds_and_res(dem_arr.shape, transform)
no_horizontal = np.sum(np.linalg.norm(destination_coords[:, :2] - source_coords[:, :2], axis=1)) < 1e-6
no_vertical = source_coords.shape[1] > 2 and np.sum(np.abs(destination_coords[:, 2] - source_coords[:, 2])) < 1e-6
if no_horizontal and no_vertical:
warnings.warn("No difference between source and destination coordinates. Returning self.")
return dem
source_coords_scaled = source_coords.copy()
destination_coords_scaled = destination_coords.copy()
# Scale the coordinates to index-space
for coords in (source_coords_scaled, destination_coords_scaled):
coords[:, 0] = (
dem_arr.shape[1] *
(coords[:, 0] - bounds.left) /
(bounds.right - bounds.left)
)
coords[:, 1] = (
dem_arr.shape[0] *
(1 - (
coords[:, 1] - bounds.bottom) /
(bounds.top - bounds.bottom)
)
)
# Generate a grid of x and y index coordinates.
grid_y, grid_x = np.mgrid[0:dem_arr.shape[0], 0:dem_arr.shape[1]]
if no_horizontal:
warped = dem_arr.copy()
else:
# Interpolate the sparse source-destination points to a grid.
# (row, col, 0) represents the destination y-coordinates of the pixels.
# (row, col, 1) represents the destination x-coordinates of the pixels.
new_indices = scipy.interpolate.griddata(
source_coords_scaled[:, [1, 0]],
destination_coords_scaled[:, [1, 0]], # Coordinates should be in y/x (not x/y) for some reason..
(grid_y, grid_x),
method="linear"
)
# If the border should not be trimmed, just assign the original indices to the missing values.
if not trim_border:
missing_ys = np.isnan(new_indices[:, :, 0])
missing_xs = np.isnan(new_indices[:, :, 1])
new_indices[:, :, 0][missing_ys] = grid_y[missing_ys]
new_indices[:, :, 1][missing_xs] = grid_x[missing_xs]
order = {"nearest": 0, "linear": 1, "cubic": 3}
with warnings.catch_warnings():
# An skimage warning that will hopefully be fixed soon. (2021-06-08)
warnings.filterwarnings("ignore", message="Passing `np.nan` to mean no clipping in np.clip")
warped = skimage.transform.warp(
image=np.where(dem_mask, -9999.12345, dem_arr),
inverse_map=np.moveaxis(new_indices, 2, 0),
output_shape=dem_arr.shape,
preserve_range=True,
order=order[resampling],
cval=-9999.12345
)
new_mask = skimage.transform.warp(
image=dem_mask,
inverse_map=np.moveaxis(new_indices, 2, 0),
output_shape=dem_arr.shape,
cval=False
) == 1
warped[new_mask | (warped == -9999.12345)] = np.nan
# If the coordinates are 3D (N, 3), apply a Z correction as well.
if not no_vertical:
grid_offsets = scipy.interpolate.griddata(
points=destination_coords_scaled[:, :2],
values=destination_coords_scaled[:, 2] - source_coords_scaled[:, 2],
xi=(grid_x, grid_y),
method=resampling,
fill_value=np.nan
)
if not trim_border:
grid_offsets[np.isnan(grid_offsets)] = np.nanmean(grid_offsets)
warped += grid_offsets
assert not np.all(np.isnan(warped)), "All-NaN output."
return warped.reshape(dem.shape)