from __future__ import division, print_function
import logging
from typing import Any, Callable, List, Union
import numpy as np
import open3d as o3
from scipy.optimize import minimize
from . import cost_functions as cf
from . import features as ft
from . import transformation as tf
from .log import log
[docs]class L2DistRegistration(object):
"""L2 distance registration class
This algorithm expresses point clouds as mixture gaussian distributions and
performs registration by minimizing the distance between two distributions.
Args:
source (numpy.ndarray): Source point cloud data.
feature_gen (probreg.features.Feature): Generator of mixture gaussian distribution.
cost_fn (probreg.cost_functions.CostFunction): Cost function to caliculate L2 distance.
sigma (float, optional): Scaling parameter for L2 distance.
delta (float, optional): Annealing parameter for optimization.
use_estimated_sigma (float, optional): If this flag is True,
sigma estimates from the source point cloud.
"""
def __init__(
self,
source: np.ndarray,
feature_gen: ft.Feature,
cost_fn: cf.CostFunction,
sigma: float = 1.0,
delta: float = 0.9,
use_estimated_sigma: bool = True,
):
self._source = source
self._feature_gen = feature_gen
self._cost_fn = cost_fn
self._sigma = sigma
self._delta = delta
self._use_estimated_sigma = use_estimated_sigma
self._callbacks = []
if not self._source is None and self._use_estimated_sigma:
self._estimate_sigma(self._source)
[docs] def set_source(self, source: np.ndarray):
self._source = source
if self._use_estimated_sigma:
self._estimate_sigma(self._source)
[docs] def set_callbacks(self, callbacks):
self._callbacks.extend(callbacks)
def _estimate_sigma(self, data: np.ndarray):
ndata, dim = data.shape
data_hat = data - np.mean(data, axis=0)
self._sigma = np.power(np.linalg.det(np.dot(data_hat.T, data_hat) / (ndata - 1)), 1.0 / (2.0 * dim))
def _annealing(self):
self._sigma *= self._delta
[docs] def optimization_cb(self, x: np.ndarray):
tf_result = self._cost_fn.to_transformation(x)
for c in self._callbacks:
c(tf_result)
[docs] def registration(
self, target: np.ndarray, maxiter: int = 1, tol: float = 1.0e-3, opt_maxiter: int = 50, opt_tol: float = 1.0e-3
) -> tf.Transformation:
f = None
x_ini = self._cost_fn.initial()
for _ in range(maxiter):
self._feature_gen.init()
mu_source, phi_source = self._feature_gen.compute(self._source)
mu_target, phi_target = self._feature_gen.compute(target)
args = (mu_source, phi_source, mu_target, phi_target, self._sigma)
res = minimize(
self._cost_fn,
x_ini,
args=args,
method="BFGS",
jac=True,
tol=opt_tol,
options={"maxiter": opt_maxiter, "disp": log.level == logging.DEBUG},
callback=self.optimization_cb,
)
self._annealing()
self._feature_gen.annealing()
if not f is None and abs(res.fun - f) < tol:
break
f = res.fun
x_ini = res.x
return self._cost_fn.to_transformation(res.x)
[docs]class RigidGMMReg(L2DistRegistration):
def __init__(self, source, sigma=1.0, delta=0.9, n_gmm_components=800, use_estimated_sigma=True):
n_gmm_components = min(n_gmm_components, int(source.shape[0] * 0.8))
super(RigidGMMReg, self).__init__(
source, ft.GMM(n_gmm_components), cf.RigidCostFunction(), sigma, delta, use_estimated_sigma
)
[docs]class TPSGMMReg(L2DistRegistration):
def __init__(
self, source, sigma=1.0, delta=0.9, n_gmm_components=800, alpha=1.0, beta=0.1, use_estimated_sigma=True
):
n_gmm_components = min(n_gmm_components, int(source.shape[0] * 0.8))
super(TPSGMMReg, self).__init__(
source, ft.GMM(n_gmm_components), cf.TPSCostFunction([], alpha, beta), sigma, delta, use_estimated_sigma
)
self._feature_gen.init()
control_pts, _ = self._feature_gen.compute(source)
self._cost_fn._control_pts = control_pts
[docs]class RigidSVR(L2DistRegistration):
def __init__(self, source, sigma=1.0, delta=0.9, gamma=0.5, nu=0.1, use_estimated_sigma=True):
super(RigidSVR, self).__init__(
source,
ft.OneClassSVM(source.shape[1], sigma, gamma, nu),
cf.RigidCostFunction(),
sigma,
delta,
use_estimated_sigma,
)
def _estimate_sigma(self, data):
super(RigidSVR, self)._estimate_sigma(data)
self._feature_gen._sigma = self._sigma
self._feature_gen._gamma = 1.0 / (2.0 * np.square(self._sigma))
[docs]class TPSSVR(L2DistRegistration):
def __init__(self, source, sigma=1.0, delta=0.9, gamma=0.5, nu=0.1, alpha=1.0, beta=0.1, use_estimated_sigma=True):
super(TPSSVR, self).__init__(
source,
ft.OneClassSVM(source.shape[1], sigma, gamma, nu),
cf.TPSCostFunction([], alpha, beta),
sigma,
delta,
use_estimated_sigma,
)
self._feature_gen.init()
control_pts, _ = self._feature_gen.compute(source)
self._cost_fn._control_pts = control_pts
def _estimate_sigma(self, data):
super(TPSSVR, self)._estimate_sigma(data)
self._feature_gen._sigma = self._sigma
self._feature_gen._gamma = 1.0 / (2.0 * np.square(self._sigma))
[docs]def registration_gmmreg(
source: np.ndarray, target: np.ndarray, tf_type_name: str = "rigid", callbacks: List = [], **kargs
):
"""GMMReg.
Args:
source (numpy.ndarray): Source point cloud data.
target (numpy.ndarray): Target point cloud data.
tf_type_name (str, optional): Transformation type('rigid', 'nonrigid')
callback (:obj:`list` of :obj:`function`, optional): Called after each iteration.
`callback(probreg.Transformation)`
Returns:
probreg.Transformation: Transformation from source to target.
"""
cv = lambda x: np.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x)
if tf_type_name == "rigid":
gmmreg = RigidGMMReg(cv(source), **kargs)
elif tf_type_name == "nonrigid":
gmmreg = TPSGMMReg(cv(source), **kargs)
else:
raise ValueError("Unknown transform type %s" % tf_type_name)
gmmreg.set_callbacks(callbacks)
return gmmreg.registration(cv(target))
[docs]def registration_svr(
source: Union[np.ndarray, o3.geometry.PointCloud],
target: Union[np.ndarray, o3.geometry.PointCloud],
tf_type_name: str = "rigid",
maxiter: int = 1,
tol: float = 1.0e-3,
opt_maxiter: int = 50,
opt_tol: float = 1.0e-3,
callbacks: List[Callable] = [],
**kwargs: Any,
):
"""Support Vector Registration.
Args:
source (numpy.ndarray): Source point cloud data.
target (numpy.ndarray): Target point cloud data.
tf_type_name (str, optional): Transformation type('rigid', 'nonrigid')
maxitr (int, optional): Maximum number of iterations for outer loop.
tol (float, optional): Tolerance for termination of outer loop.
opt_maxitr (int, optional): Maximum number of iterations for inner loop.
opt_tol (float, optional): Tolerance for termination of inner loop.
callback (:obj:`list` of :obj:`function`, optional): Called after each iteration.
`callback(probreg.Transformation)`
Returns:
probreg.Transformation: Transformation from source to target.
"""
cv = lambda x: np.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x)
if tf_type_name == "rigid":
svr = RigidSVR(cv(source), **kwargs)
elif tf_type_name == "nonrigid":
svr = TPSSVR(cv(source), **kwargs)
else:
raise ValueError("Unknown transform type %s" % tf_type_name)
svr.set_callbacks(callbacks)
return svr.registration(cv(target), maxiter, tol, opt_maxiter, opt_tol)