From b7c8edf343adc88bb8698ab9f5cfc3320fd17f67 Mon Sep 17 00:00:00 2001 From: Kenta Tanaka Date: Fri, 10 May 2024 16:10:55 +0900 Subject: [PATCH] add color cpd --- README.md | 1 + probreg/__init__.py | 3 +- probreg/bcpd.py | 6 +- probreg/callbacks.py | 24 +++++--- probreg/cost_functions.py | 4 +- probreg/cpd.py | 118 ++++++++++++++++++++++++++++---------- probreg/features.py | 2 +- probreg/transformation.py | 2 +- pyproject.toml | 4 +- 9 files changed, 116 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index a661839..6a7a3d2 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ This package implements several algorithms using stochastic models and provides * Maximum likelihood when the target or source point cloud is observation data * [Coherent Point Drift (2010)](https://arxiv.org/pdf/0905.2635.pdf) * [Extended Coherent Point Drift (2016)](https://ieeexplore.ieee.org/abstract/document/7477719) (add correspondence priors to CPD) + * [Color Coherent Point Drift (2018)](https://arxiv.org/pdf/1802.01516) * [FilterReg (CVPR2019)](https://arxiv.org/pdf/1811.10136.pdf) * Variational Bayesian inference * [Bayesian Coherent Point Drift (2020)](https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8985307) diff --git a/probreg/__init__.py b/probreg/__init__.py index 85a1296..6283de3 100644 --- a/probreg/__init__.py +++ b/probreg/__init__.py @@ -1,3 +1,2 @@ -from . import (bcpd, callbacks, cpd, filterreg, gmmtree, l2dist_regs, log, - math_utils, transformation) +from . import bcpd, callbacks, cpd, filterreg, gmmtree, l2dist_regs, log, math_utils, transformation from .version import __version__ diff --git a/probreg/bcpd.py b/probreg/bcpd.py index 12ceb83..3fc496b 100644 --- a/probreg/bcpd.py +++ b/probreg/bcpd.py @@ -58,7 +58,7 @@ def expectation_step(self, t_source, target, scale, alpha, sigma_mat, sigma2, w= pmat = np.exp(-pmat / (2.0 * sigma2)) pmat /= (2.0 * np.pi * sigma2) ** (dim * 0.5) pmat = pmat.T - pmat *= np.exp(-(scale ** 2) / (2 * sigma2) * np.diag(sigma_mat) * dim) + pmat *= np.exp(-(scale**2) / (2 * sigma2) * np.diag(sigma_mat) * dim) pmat *= (1.0 - w) * alpha den = w / target.shape[0] + np.sum(pmat, axis=1) den[den == 0] = np.finfo(np.float32).eps @@ -126,7 +126,7 @@ def _maximization_step(source, target, rigid_trans, estep_res, gmat_inv, lmd, k, nu_d, nu, n_p, px, x_hat = estep_res dim = source.shape[1] m = source.shape[0] - s2s2 = rigid_trans.scale ** 2 / (sigma2_p ** 2) + s2s2 = rigid_trans.scale**2 / (sigma2_p**2) sigma_mat_inv = lmd * gmat_inv + s2s2 * np.diag(nu) sigma_mat = np.linalg.inv(sigma_mat_inv) residual = rigid_trans.inverse().transform(x_hat) - source @@ -152,7 +152,7 @@ def _maximization_step(source, target, rigid_trans, estep_res, gmat_inv, lmd, k, s1 = np.dot(target.ravel(), np.kron(nu_d, np.ones(dim)) * target.ravel()) s2 = np.dot(px.ravel(), y_hat.ravel()) s3 = np.dot(y_hat.ravel(), np.kron(nu, np.ones(dim)) * y_hat.ravel()) - sigma2 = (s1 - 2.0 * s2 + s3) / (n_p * dim) + scale ** 2 * sigma2_m + sigma2 = (s1 - 2.0 * s2 + s3) / (n_p * dim) + scale**2 * sigma2_m return MstepResult(tf.CombinedTransformation(rot, t, scale, v_hat), u_hat, sigma_mat, alpha, sigma2) diff --git a/probreg/callbacks.py b/probreg/callbacks.py index 85b0e73..40eb1e9 100644 --- a/probreg/callbacks.py +++ b/probreg/callbacks.py @@ -18,7 +18,7 @@ def asnumpy(x): from .transformation import Transformation -class Plot2DCallback(object): +class Plot2DCallback: """Display the 2D registration result of each iteration. Args: @@ -62,12 +62,12 @@ def __call__(self, transformation: Transformation) -> None: self._cnt += 1 -class Open3dVisualizerCallback(object): +class Open3dVisualizerCallback: """Display the 3D registration result of each iteration. Args: - source (numpy.ndarray): Source point cloud data. - target (numpy.ndarray): Target point cloud data. + source (open3d.geometry.PointCloud): Source point cloud data. + target (open3d.geometry.PointCloud): Target point cloud data. save (bool, optional): If this flag is True, each iteration image is saved in a sequential number. keep_window (bool, optional): If this flag is True, @@ -76,7 +76,12 @@ class Open3dVisualizerCallback(object): """ def __init__( - self, source: np.ndarray, target: np.ndarray, save: bool = False, keep_window: bool = True, fov: Any = None + self, + source: o3.geometry.PointCloud, + target: o3.geometry.PointCloud, + save: bool = False, + keep_window: bool = True, + fov: Any = None, ): self._vis = o3.visualization.Visualizer() self._vis.create_window() @@ -85,9 +90,12 @@ def __init__( self._result = copy.deepcopy(self._source) self._save = save self._keep_window = keep_window - self._source.paint_uniform_color([1, 0, 0]) - self._target.paint_uniform_color([0, 1, 0]) - self._result.paint_uniform_color([0, 0, 1]) + if not self._source.has_colors(): + self._source.paint_uniform_color([1, 0, 0]) + if not self._target.has_colors(): + self._target.paint_uniform_color([0, 1, 0]) + if not self._result.has_colors(): + self._result.paint_uniform_color([0, 0, 1]) self._vis.add_geometry(self._source) self._vis.add_geometry(self._target) self._vis.add_geometry(self._result) diff --git a/probreg/cost_functions.py b/probreg/cost_functions.py index 24944d9..dfde324 100644 --- a/probreg/cost_functions.py +++ b/probreg/cost_functions.py @@ -33,11 +33,11 @@ def __call__(self, theta: np.ndarray, *args): def compute_l2_dist( mu_source: np.ndarray, phi_source: np.ndarray, mu_target: np.ndarray, phi_target: np.ndarray, sigma: float ): - z = np.power(2.0 * np.pi * sigma ** 2, mu_source.shape[1] * 0.5) + z = np.power(2.0 * np.pi * sigma**2, mu_source.shape[1] * 0.5) gtrans = gt.GaussTransform(mu_target, np.sqrt(2.0) * sigma) phi_j_e = gtrans.compute(mu_source, phi_target / z) phi_mu_j_e = gtrans.compute(mu_source, phi_target * mu_target.T / z).T - g = (phi_source * phi_j_e * mu_source.T - phi_source * phi_mu_j_e.T).T / (2.0 * sigma ** 2) + g = (phi_source * phi_j_e * mu_source.T - phi_source * phi_mu_j_e.T).T / (2.0 * sigma**2) return -np.dot(phi_source, phi_j_e), g diff --git a/probreg/cpd.py b/probreg/cpd.py index 67a6967..305f7a5 100644 --- a/probreg/cpd.py +++ b/probreg/cpd.py @@ -36,13 +36,18 @@ class CoherentPointDrift: Args: source (numpy.ndarray, optional): Source point cloud data. + use_color (bool, optional): Use color information (if available). use_cuda (bool, optional): Use CUDA. """ - def __init__(self, source: Optional[np.ndarray] = None, use_cuda: bool = False) -> None: + _N_DIM = 3 + _N_COLOR = 3 + + def __init__(self, source: Optional[np.ndarray] = None, use_color: bool = False, use_cuda: bool = False) -> None: self._source = source self._tf_type = None self._callbacks = [] + self._use_color = use_color if use_cuda: import cupy as cp from cupyx.scipy.spatial import distance as cupy_distance @@ -68,19 +73,41 @@ def set_callbacks(self, callbacks: List[Callable]) -> None: def _initialize(self, target: np.ndarray) -> MstepResult: return MstepResult(None, None, None) - def expectation_step(self, t_source: np.ndarray, target: np.ndarray, sigma2: float, w: float = 0.0) -> EstepResult: - """Expectation step for CPD""" - assert t_source.ndim == 2 and target.ndim == 2, "source and target must have 2 dimensions." + def _compute_pmat_numerator(self, t_source: np.ndarray, target: np.ndarray, sigma2: float) -> np.ndarray: pmat = self.distance_module.cdist(t_source, target, "sqeuclidean") - # pmat = self.xp.stack([self.xp.sum(self.xp.square(target - ts), axis=1) for ts in t_source]) pmat = self.xp.exp(-pmat / (2.0 * sigma2)) + return pmat + + def expectation_step( + self, + t_source: np.ndarray, + target: np.ndarray, + sigma2: float, + sigma2_c: float, + w: float = 0.0, + ) -> EstepResult: + """Expectation step for CPD""" + assert t_source.ndim == 2 and target.ndim == 2, "source and target must have 2 dimensions." + pmat = self._compute_pmat_numerator(t_source[:, : self._N_DIM], target[:, : self._N_DIM], sigma2) + if self._use_color: + pmat_c = self._compute_pmat_numerator(t_source[:, 3:], target[:, 3:], sigma2_c) - c = (2.0 * np.pi * sigma2) ** (t_source.shape[1] * 0.5) + c = (2.0 * np.pi * sigma2) ** (self._N_DIM * 0.5) c *= w / (1.0 - w) * t_source.shape[0] / target.shape[0] den = self.xp.sum(pmat, axis=0) den[den == 0] = self.xp.finfo(np.float32).eps + if self._use_color: + den_c = self.xp.sum(pmat_c, axis=0) + den_c[den_c == 0] = self.xp.finfo(np.float32).eps + den = np.multiply(den, den_c) + o_c = self.t_source.shape[0] * (2 * np.pi * sigma2_c) ** (0.5 * (self._N_DIM + self._N_COLOR - 1)) + o_c *= self.xp.exp(-1.0 / self.t_source.shape[0] * self.xp.sum(pmat_c, axis=0).square() / (2.0 * sigma2_c)) + den += o_c + c *= (2.0 * np.pi * sigma2_c) ** (self._N_COLOR * 0.5) den += c + if self._use_color: + pmat = self.xp.multiply(pmat, pmat_c) pmat = self.xp.divide(pmat, den) pt1 = self.xp.sum(pmat, axis=0) p1 = self.xp.sum(pmat, axis=1) @@ -90,7 +117,9 @@ def expectation_step(self, t_source: np.ndarray, target: np.ndarray, sigma2: flo def maximization_step( self, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None ) -> Optional[MstepResult]: - return self._maximization_step(self._source, target, estep_res, sigma2_p, xp=self.xp) + return self._maximization_step( + self._source[:, : self._N_DIM], target[:, : self._N_DIM], estep_res, sigma2_p, xp=self.xp + ) @staticmethod @abc.abstractmethod @@ -105,11 +134,14 @@ def _maximization_step( def registration(self, target: np.ndarray, w: float = 0.0, maxiter: int = 50, tol: float = 0.001) -> MstepResult: assert not self._tf_type is None, "transformation type is None." - res = self._initialize(target) + res = self._initialize(target[:, : self._N_DIM]) + sigma2_c = 0.0 + if self._use_color: + sigma2_c = self._squared_kernel_sum(self._source[:, self._N_DIM :], target[:, self._N_DIM :]) q = res.q for i in range(maxiter): t_source = res.transformation.transform(self._source) - estep_res = self.expectation_step(t_source, target, res.sigma2, w) + estep_res = self.expectation_step(t_source, target, res.sigma2, sigma2_c, w) res = self.maximization_step(target, estep_res, res.sigma2) for c in self._callbacks: c(res.transformation) @@ -127,6 +159,7 @@ class RigidCPD(CoherentPointDrift): source (numpy.ndarray, optional): Source point cloud data. update_scale (bool, optional): If this flag is True, compute the scale parameter. tf_init_params (dict, optional): Parameters to initialize transformation. + use_color (bool, optional): Use color information (if available). use_cuda (bool, optional): Use CUDA. """ @@ -135,15 +168,16 @@ def __init__( source: Optional[np.ndarray] = None, update_scale: bool = True, tf_init_params: Dict = {}, + use_color: bool = False, use_cuda: bool = False, ) -> None: - super(RigidCPD, self).__init__(source, use_cuda) + super(RigidCPD, self).__init__(source, use_color, use_cuda) self._tf_type = tf.RigidTransformation self._update_scale = update_scale self._tf_init_params = tf_init_params def _initialize(self, target: np.ndarray) -> MstepResult: - dim = self._source.shape[1] + dim = self._N_DIM sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) if len(self._tf_init_params) == 0: @@ -167,7 +201,7 @@ def _maximization_step( xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res - dim = source.shape[1] + dim = CoherentPointDrift._N_DIM mu_x = xp.sum(px, axis=0) / n_p mu_y = xp.dot(source.T, p1) / n_p target_hat = target - mu_x @@ -187,7 +221,7 @@ def _maximization_step( else: sigma2 = (tr_xp1x + tr_yp1y - scale * tr_atr) / (n_p * dim) sigma2 = max(sigma2, np.finfo(np.float32).eps) - q = (tr_xp1x - 2.0 * scale * tr_atr + (scale ** 2) * tr_yp1y) / (2.0 * sigma2) + q = (tr_xp1x - 2.0 * scale * tr_atr + (scale**2) * tr_yp1y) / (2.0 * sigma2) q += dim * n_p * 0.5 * np.log(sigma2) return MstepResult(tf.RigidTransformation(rot, t, scale, xp=xp), sigma2, q) @@ -198,16 +232,23 @@ class AffineCPD(CoherentPointDrift): Args: source (numpy.ndarray, optional): Source point cloud data. tf_init_params (dict, optional): Parameters to initialize transformation. + use_color (bool, optional): Use color information (if available). use_cuda (bool, optional): Use CUDA. """ - def __init__(self, source: Optional[np.ndarray] = None, tf_init_params: Dict = {}, use_cuda: bool = False) -> None: - super(AffineCPD, self).__init__(source, use_cuda) + def __init__( + self, + source: Optional[np.ndarray] = None, + tf_init_params: Dict = {}, + use_color: bool = False, + use_cuda: bool = False, + ) -> None: + super(AffineCPD, self).__init__(source, use_color, use_cuda) self._tf_type = tf.AffineTransformation self._tf_init_params = tf_init_params def _initialize(self, target: np.ndarray) -> MstepResult: - dim = self._source.shape[1] + dim = self._N_DIM sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) if len(self._tf_init_params) == 0: @@ -225,7 +266,7 @@ def _maximization_step( xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res - dim = source.shape[1] + dim = CoherentPointDrift._N_DIM mu_x = xp.sum(px, axis=0) / n_p mu_y = xp.dot(source.T, p1) / n_p target_hat = target - mu_x @@ -251,13 +292,19 @@ class NonRigidCPD(CoherentPointDrift): source (numpy.ndarray, optional): Source point cloud data. beta (float, optional): Parameter of RBF kernel. lmd (float, optional): Parameter for regularization term. + use_color (bool, optional): Use color information (if available). use_cuda (bool, optional): Use CUDA. """ def __init__( - self, source: Optional[np.ndarray] = None, beta: float = 2.0, lmd: float = 2.0, use_cuda: bool = False + self, + source: Optional[np.ndarray] = None, + beta: float = 2.0, + lmd: float = 2.0, + use_color: bool = False, + use_cuda: bool = False, ) -> None: - super(NonRigidCPD, self).__init__(source, use_cuda) + super(NonRigidCPD, self).__init__(source, use_color, use_cuda) self._tf_type = tf.NonRigidTransformation self._beta = beta self._lmd = lmd @@ -275,7 +322,7 @@ def maximization_step( return self._maximization_step(self._source, target, estep_res, sigma2_p, self._tf_obj, self._lmd, self.xp) def _initialize(self, target: np.ndarray) -> MstepResult: - dim = self._source.shape[1] + dim = self._N_DIM sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) self._tf_obj.w = self.xp.zeros_like(self._source) @@ -292,7 +339,7 @@ def _maximization_step( xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res - dim = source.shape[1] + dim = CoherentPointDrift._N_DIM w = xp.linalg.solve((p1 * tf_obj.g).T + lmd * sigma2_p * xp.identity(source.shape[0]), px - (source.T * p1).T) t = source + xp.dot(tf_obj.g, w) tr_xp1x = xp.trace(xp.dot(target.T * pt1, target)) @@ -316,6 +363,7 @@ class ConstrainedNonRigidCPD(CoherentPointDrift): alpha (float): Degree of reliability of priors. Approximately between 1e-8 (highly reliable) and 1 (highly unreliable) use_cuda (bool, optional): Use CUDA. + use_color (bool, optional): Use color information (if available). idx_source (numpy.ndarray of ints, optional): Indices in source matrix for which a correspondance is known idx_target (numpy.ndarray of ints, optional): Indices in target matrix @@ -328,11 +376,12 @@ def __init__( beta: float = 2.0, lmd: float = 2.0, alpha: float = 1e-8, + use_color: bool = False, use_cuda: bool = False, idx_source: Optional[np.ndarray] = None, idx_target: Optional[np.ndarray] = None, ): - super(ConstrainedNonRigidCPD, self).__init__(source, use_cuda) + super(ConstrainedNonRigidCPD, self).__init__(source, use_color, use_cuda) self._tf_type = tf.NonRigidTransformation self._beta = beta self._lmd = lmd @@ -363,7 +412,7 @@ def maximization_step( ) def _initialize(self, target: np.ndarray) -> MstepResult: - dim = self._source.shape[1] + dim = self._N_DIM sigma2 = self._squared_kernel_sum(self._source, target) q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2) self._tf_obj.w = self.xp.zeros_like(self._source) @@ -388,7 +437,7 @@ def _maximization_step( xp: ModuleType = np, ) -> MstepResult: pt1, p1, px, n_p = estep_res - dim = source.shape[1] + dim = CoherentPointDrift._N_DIM w = xp.linalg.solve( (p1 * tf_obj.g).T + sigma2_p / alpha * (p1_tilde * tf_obj.g).T @@ -412,6 +461,7 @@ def registration_cpd( maxiter: int = 50, tol: float = 0.001, callbacks: List[Callable] = [], + use_color: bool = False, use_cuda: bool = False, **kwargs: Any, ) -> MstepResult: @@ -426,6 +476,7 @@ def registration_cpd( tol (float, optional): Tolerance for termination. callback (:obj:`list` of :obj:`function`, optional): Called after each iteration. `callback(probreg.Transformation)` + use_color (bool, optional): Use color information (if available). use_cuda (bool, optional): Use CUDA. Keyword Args: @@ -441,15 +492,24 @@ def registration_cpd( import cupy as cp xp = cp - cv = lambda x: xp.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x) + if use_color: + cv = ( + lambda x: xp.c_[xp.asarray(x.points), xp.asarray(x.colors)] + if isinstance(x, o3.geometry.PointCloud) + else xp.asanyarray(x)[:, :6] + ) + else: + cv = lambda x: xp.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x)[ + :, : CoherentPointDrift._N_DIM + ] if tf_type_name == "rigid": - cpd = RigidCPD(cv(source), use_cuda=use_cuda, **kwargs) + cpd = RigidCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs) elif tf_type_name == "affine": - cpd = AffineCPD(cv(source), use_cuda=use_cuda, **kwargs) + cpd = AffineCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs) elif tf_type_name == "nonrigid": - cpd = NonRigidCPD(cv(source), use_cuda=use_cuda, **kwargs) + cpd = NonRigidCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs) elif tf_type_name == "nonrigid_constrained": - cpd = ConstrainedNonRigidCPD(cv(source), use_cuda=use_cuda, **kwargs) + cpd = ConstrainedNonRigidCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs) else: raise ValueError("Unknown transformation type %s" % tf_type_name) cpd.set_callbacks(callbacks) diff --git a/probreg/features.py b/probreg/features.py index 805fcb0..0769132 100644 --- a/probreg/features.py +++ b/probreg/features.py @@ -93,7 +93,7 @@ def init(self): def compute(self, data: np.ndarray): self._clf.fit(data) - z = np.power(2.0 * np.pi * self._sigma ** 2, self._dim * 0.5) + z = np.power(2.0 * np.pi * self._sigma**2, self._dim * 0.5) return self._clf.support_vectors_, self._clf.dual_coef_[0] * z def annealing(self): diff --git a/probreg/transformation.py b/probreg/transformation.py index ab3e8e3..088ec7b 100644 --- a/probreg/transformation.py +++ b/probreg/transformation.py @@ -23,7 +23,7 @@ def __init__(self, xp=np): def transform(self, points, array_type=o3.utility.Vector3dVector): if isinstance(points, array_type): return array_type(self._transform(self.xp.asarray(points))) - return self._transform(points) + return self.xp.c_[self._transform(points[:, :3]), points[:, 3:]] @abc.abstractmethod def _transform(self, points): diff --git a/pyproject.toml b/pyproject.toml index 6f99176..1978a48 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,14 +20,14 @@ pyyaml = "^6.0" addict = "^2.4.0" pandas = "^2.0.0" -[tool.poetry.dev-dependencies] +[tool.poetry.group.dev.dependencies] Sphinx = "^3.4.3" flake8 = "^3.8.4" sphinx-rtd-theme = "^0.5.1" twine = "^3.3.0" setuptools = "^52.0.0" isort = "^5.9.3" -black = "^21.9b0" +black = "22.3.0" [build-system] requires = ["poetry-core>=1.0.0", "setuptools", "pybind11"]