Source code for probreg.cpd

from __future__ import division, print_function

import abc
from collections import namedtuple
from types import ModuleType
from typing import Any, Callable, Dict, List, Optional, Union

import numpy as np
import open3d as o3
import six

from . import math_utils as mu
from . import transformation as tf
from .log import log

EstepResult = namedtuple("EstepResult", ["pt1", "p1", "px", "n_p"])
MstepResult = namedtuple("MstepResult", ["transformation", "sigma2", "q"])
MstepResult.__doc__ = """Result of Maximization step.

    Attributes:
        transformation (tf.Transformation): Transformation from source to target.
        sigma2 (float): Variance of Gaussian distribution.
        q (float): Result of likelihood.
"""


[docs]@six.add_metaclass(abc.ABCMeta) class CoherentPointDrift: """Coherent Point Drift algorithm. This is an abstract class. Based on this class, it is inherited by rigid, affine, nonrigid classes according to the type of transformation. In this class, Estimation step in EM algorithm is implemented and Maximazation step is implemented in the inherited classes. Args: source (numpy.ndarray, optional): Source point cloud data. use_cuda (bool, optional): Use CUDA. """ def __init__(self, source: Optional[np.ndarray] = None, use_cuda: bool = False) -> None: self._source = source self._tf_type = None self._callbacks = [] if use_cuda: import cupy as cp from . import cupy_utils self.xp = cp self.cupy_utils = cupy_utils self._squared_kernel_sum = cupy_utils.squared_kernel_sum else: self.xp = np self._squared_kernel_sum = mu.squared_kernel_sum
[docs] def set_source(self, source: np.ndarray) -> None: self._source = source
[docs] def set_callbacks(self, callbacks: List[Callable]) -> None: self._callbacks.extend(callbacks)
@abc.abstractmethod def _initialize(self, target: np.ndarray) -> MstepResult: return MstepResult(None, None, None)
[docs] def expectation_step(self, t_source: np.ndarray, target: np.ndarray, sigma2: float, w: float = 0.0) -> EstepResult: """Expectation step for CPD""" assert t_source.ndim == 2 and target.ndim == 2, "source and target must have 2 dimensions." pmat = self.xp.stack([self.xp.sum(self.xp.square(target - ts), axis=1) for ts in t_source]) pmat = self.xp.exp(-pmat / (2.0 * sigma2)) c = (2.0 * np.pi * sigma2) ** (t_source.shape[1] * 0.5) c *= w / (1.0 - w) * t_source.shape[0] / target.shape[0] den = self.xp.sum(pmat, axis=0) den[den == 0] = self.xp.finfo(np.float32).eps den += c pmat = self.xp.divide(pmat, den) pt1 = self.xp.sum(pmat, axis=0) p1 = self.xp.sum(pmat, axis=1) px = self.xp.dot(pmat, target) return EstepResult(pt1, p1, px, np.sum(p1))
[docs] def maximization_step( self, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None ) -> Optional[MstepResult]: return self._maximization_step(self._source, target, estep_res, sigma2_p, xp=self.xp)
@staticmethod @abc.abstractmethod def _maximization_step( source: np.ndarray, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None, xp: ModuleType = np, ) -> Optional[MstepResult]: return None
[docs] def registration(self, target: np.ndarray, w: float = 0.0, maxiter: int = 50, tol: float = 0.001) -> MstepResult: assert not self._tf_type is None, "transformation type is None." res = self._initialize(target) q = res.q for i in range(maxiter): t_source = res.transformation.transform(self._source) estep_res = self.expectation_step(t_source, target, res.sigma2, w) res = self.maximization_step(target, estep_res, res.sigma2) for c in self._callbacks: c(res.transformation) log.debug("Iteration: {}, Criteria: {}".format(i, res.q)) if abs(res.q - q) < tol: break q = res.q return res
[docs]class RigidCPD(CoherentPointDrift): """Coherent Point Drift for rigid transformation. Args: source (numpy.ndarray, optional): Source point cloud data. update_scale (bool, optional): If this flag is True, compute the scale parameter. tf_init_params (dict, optional): Parameters to initialize transformation. use_cuda (bool, optional): Use CUDA. """ def __init__( self, source: Optional[np.ndarray] = None, update_scale: bool = True, tf_init_params: Dict = {}, use_cuda: bool = False, ) -> None: super(RigidCPD, self).__init__(source, use_cuda) self._tf_type = tf.RigidTransformation self._update_scale = update_scale self._tf_init_params = tf_init_params def _initialize(self, target: np.ndarray) -> MstepResult: dim = self._source.shape[1] sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) if len(self._tf_init_params) == 0: self._tf_init_params = {"rot": self.xp.identity(dim), "t": self.xp.zeros(dim)} if not "xp" in self._tf_init_params: self._tf_init_params["xp"] = self.xp return MstepResult(self._tf_type(**self._tf_init_params), sigma2, q)
[docs] def maximization_step( self, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None ) -> MstepResult: return self._maximization_step(self._source, target, estep_res, sigma2_p, self._update_scale, self.xp)
@staticmethod def _maximization_step( source: np.ndarray, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None, update_scale: bool = True, xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res dim = source.shape[1] mu_x = xp.sum(px, axis=0) / n_p mu_y = xp.dot(source.T, p1) / n_p target_hat = target - mu_x source_hat = source - mu_y a = xp.dot(px.T, source_hat) - xp.outer(mu_x, xp.dot(p1.T, source_hat)) u, _, vh = np.linalg.svd(a, full_matrices=True) c = xp.ones(dim) c[-1] = xp.linalg.det(xp.dot(u, vh)) rot = xp.dot(u * c, vh) tr_atr = np.trace(xp.dot(a.T, rot)) tr_yp1y = np.trace(xp.dot(source_hat.T * p1, source_hat)) scale = tr_atr / tr_yp1y if update_scale else 1.0 t = mu_x - scale * xp.dot(rot, mu_y) tr_xp1x = xp.trace(xp.dot(target_hat.T * pt1, target_hat)) if update_scale: sigma2 = (tr_xp1x - scale * tr_atr) / (n_p * dim) else: sigma2 = (tr_xp1x + tr_yp1y - scale * tr_atr) / (n_p * dim) sigma2 = max(sigma2, np.finfo(np.float32).eps) q = (tr_xp1x - 2.0 * scale * tr_atr + (scale ** 2) * tr_yp1y) / (2.0 * sigma2) q += dim * n_p * 0.5 * np.log(sigma2) return MstepResult(tf.RigidTransformation(rot, t, scale, xp=xp), sigma2, q)
[docs]class AffineCPD(CoherentPointDrift): """Coherent Point Drift for affine transformation. Args: source (numpy.ndarray, optional): Source point cloud data. tf_init_params (dict, optional): Parameters to initialize transformation. use_cuda (bool, optional): Use CUDA. """ def __init__(self, source: Optional[np.ndarray] = None, tf_init_params: Dict = {}, use_cuda: bool = False) -> None: super(AffineCPD, self).__init__(source, use_cuda) self._tf_type = tf.AffineTransformation self._tf_init_params = tf_init_params def _initialize(self, target: np.ndarray) -> MstepResult: dim = self._source.shape[1] sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) if len(self._tf_init_params) == 0: self._tf_init_params = {"b": self.xp.identity(dim), "t": self.xp.zeros(dim)} if not "xp" in self._tf_init_params: self._tf_init_params["xp"] = self.xp return MstepResult(self._tf_type(**self._tf_init_params), sigma2, q) @staticmethod def _maximization_step( source: np.ndarray, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None, xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res dim = source.shape[1] mu_x = xp.sum(px, axis=0) / n_p mu_y = xp.dot(source.T, p1) / n_p target_hat = target - mu_x source_hat = source - mu_y a = xp.dot(px.T, source_hat) - xp.outer(mu_x, xp.dot(p1.T, source_hat)) yp1y = xp.dot(source_hat.T * p1, source_hat) b = xp.linalg.solve(yp1y.T, a.T).T t = mu_x - xp.dot(b, mu_y) tr_xp1x = xp.trace(xp.dot(target_hat.T * pt1, target_hat)) tr_xpyb = xp.trace(xp.dot(a, b.T)) sigma2 = (tr_xp1x - tr_xpyb) / (n_p * dim) tr_ab = xp.trace(xp.dot(a, b.T)) sigma2 = max(sigma2, np.finfo(np.float32).eps) q = (tr_xp1x - 2 * tr_ab + tr_xpyb) / (2.0 * sigma2) q += dim * n_p * 0.5 * np.log(sigma2) return MstepResult(tf.AffineTransformation(b, t), sigma2, q)
[docs]class NonRigidCPD(CoherentPointDrift): """Coherent Point Drift for nonrigid transformation. Args: source (numpy.ndarray, optional): Source point cloud data. beta (float, optional): Parameter of RBF kernel. lmd (float, optional): Parameter for regularization term. use_cuda (bool, optional): Use CUDA. """ def __init__( self, source: Optional[np.ndarray] = None, beta: float = 2.0, lmd: float = 2.0, use_cuda: bool = False ) -> None: super(NonRigidCPD, self).__init__(source, use_cuda) self._tf_type = tf.NonRigidTransformation self._beta = beta self._lmd = lmd self._tf_obj = None if not self._source is None: self._tf_obj = self._tf_type(None, self._source, self._beta, self.xp)
[docs] def set_source(self, source: np.ndarray) -> None: self._source = source self._tf_obj = self._tf_type(None, self._source, self._beta)
[docs] def maximization_step( self, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None ) -> MstepResult: return self._maximization_step(self._source, target, estep_res, sigma2_p, self._tf_obj, self._lmd, self.xp)
def _initialize(self, target: np.ndarray) -> MstepResult: dim = self._source.shape[1] sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) self._tf_obj.w = self.xp.zeros_like(self._source) return MstepResult(self._tf_obj, sigma2, q) @staticmethod def _maximization_step( source: np.ndarray, target: np.ndarray, estep_res: EstepResult, sigma2_p: float, tf_obj: tf.NonRigidTransformation, lmd: float, xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res dim = source.shape[1] w = xp.linalg.solve((p1 * tf_obj.g).T + lmd * sigma2_p * xp.identity(source.shape[0]), px - (source.T * p1).T) t = source + xp.dot(tf_obj.g, w) tr_xp1x = xp.trace(xp.dot(target.T * pt1, target)) tr_pxt = xp.trace(xp.dot(px.T, t)) tr_tpt = xp.trace(xp.dot(t.T * p1, t)) sigma2 = (tr_xp1x - 2.0 * tr_pxt + tr_tpt) / (n_p * dim) tf_obj.w = w return MstepResult(tf_obj, sigma2, sigma2)
[docs]class ConstrainedNonRigidCPD(CoherentPointDrift): """ Extended Coherent Point Drift for nonrigid transformation. Like CoherentPointDrift, but allows to add point correspondance constraints See: https://people.mpi-inf.mpg.de/~golyanik/04_DRAFTS/ECPD2016.pdf Args: source (numpy.ndarray, optional): Source point cloud data. beta (float, optional): Parameter of RBF kernel. lmd (float, optional): Parameter for regularization term. alpha (float): Degree of reliability of priors. Approximately between 1e-8 (highly reliable) and 1 (highly unreliable) use_cuda (bool, optional): Use CUDA. idx_source (numpy.ndarray of ints, optional): Indices in source matrix for which a correspondance is known idx_target (numpy.ndarray of ints, optional): Indices in target matrix for which a correspondance is known """ def __init__( self, source: Optional[np.ndarray] = None, beta: float = 2.0, lmd: float = 2.0, alpha: float = 1e-8, use_cuda: bool = False, idx_source: Optional[np.ndarray] = None, idx_target: Optional[np.ndarray] = None, ): super(ConstrainedNonRigidCPD, self).__init__(source, use_cuda) self._tf_type = tf.NonRigidTransformation self._beta = beta self._lmd = lmd self.alpha = alpha self._tf_obj = None self.idx_source, self.idx_target = idx_source, idx_target if not self._source is None: self._tf_obj = self._tf_type(None, self._source, self._beta, self.xp)
[docs] def set_source(self, source: np.ndarray) -> None: self._source = source self._tf_obj = self._tf_type(None, self._source, self._beta)
[docs] def maximization_step( self, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None ) -> MstepResult: return self._maximization_step( self._source, target, estep_res, sigma2_p, self._tf_obj, self._lmd, self.alpha, self.p1_tilde, self.px_tilde, self.xp, )
def _initialize(self, target: np.ndarray) -> MstepResult: dim = self._source.shape[1] sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) self._tf_obj.w = self.xp.zeros_like(self._source) self.p_tilde = self.xp.zeros((self._source.shape[0], target.shape[0])) if self.idx_source is not None and self.idx_target is not None: self.p_tilde[self.idx_source, self.idx_target] = 1 self.p1_tilde = self.xp.sum(self.p_tilde, axis=1) self.px_tilde = self.xp.dot(self.p_tilde, target) return MstepResult(self._tf_obj, sigma2, q) @staticmethod def _maximization_step( source: np.ndarray, target: np.ndarray, estep_res: EstepResult, sigma2_p: float, tf_obj: tf.NonRigidTransformation, lmd: float, alpha: float, p1_tilde: float, px_tilde: float, xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res dim = source.shape[1] w = xp.linalg.solve( (p1 * tf_obj.g).T + sigma2_p / alpha * (p1_tilde * tf_obj.g).T + lmd * sigma2_p * xp.identity(source.shape[0]), px - (source.T * p1).T + sigma2_p / alpha * (px_tilde - (source.T * p1_tilde).T), ) t = source + xp.dot(tf_obj.g, w) tr_xp1x = xp.trace(xp.dot(target.T * pt1, target)) tr_pxt = xp.trace(xp.dot(px.T, t)) tr_tpt = xp.trace(xp.dot(t.T * p1, t)) sigma2 = (tr_xp1x - 2.0 * tr_pxt + tr_tpt) / (n_p * dim) tf_obj.w = w return MstepResult(tf_obj, sigma2, sigma2)
[docs]def registration_cpd( source: Union[np.ndarray, o3.geometry.PointCloud], target: Union[np.ndarray, o3.geometry.PointCloud], tf_type_name: str = "rigid", w: float = 0.0, maxiter: int = 50, tol: float = 0.001, callbacks: List[Callable] = [], use_cuda: bool = False, **kwargs: Any, ) -> MstepResult: """CPD Registraion. Args: source (numpy.ndarray): Source point cloud data. target (numpy.ndarray): Target point cloud data. tf_type_name (str, optional): Transformation type('rigid', 'affine', 'nonrigid', 'nonrigid_constrained') w (float, optional): Weight of the uniform distribution, 0 < `w` < 1. maxitr (int, optional): Maximum number of iterations to EM algorithm. tol (float, optional): Tolerance for termination. callback (:obj:`list` of :obj:`function`, optional): Called after each iteration. `callback(probreg.Transformation)` use_cuda (bool, optional): Use CUDA. Keyword Args: update_scale (bool, optional): If this flag is true and tf_type is rigid transformation, then the scale is treated. The default is true. tf_init_params (dict, optional): Parameters to initialize transformation (for rigid or affine). Returns: MstepResult: Result of the registration (transformation, sigma2, q) """ xp = np if use_cuda: import cupy as cp xp = cp cv = lambda x: xp.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x) if tf_type_name == "rigid": cpd = RigidCPD(cv(source), use_cuda=use_cuda, **kwargs) elif tf_type_name == "affine": cpd = AffineCPD(cv(source), use_cuda=use_cuda, **kwargs) elif tf_type_name == "nonrigid": cpd = NonRigidCPD(cv(source), use_cuda=use_cuda, **kwargs) elif tf_type_name == "nonrigid_constrained": cpd = ConstrainedNonRigidCPD(cv(source), use_cuda=use_cuda, **kwargs) else: raise ValueError("Unknown transformation type %s" % tf_type_name) cpd.set_callbacks(callbacks) return cpd.registration(cv(target), w, maxiter, tol)