Skip to content

Commit

Permalink
add color cpd
Browse files Browse the repository at this point in the history
  • Loading branch information
neka-nat committed May 10, 2024
1 parent 9c838b7 commit 2e354a2
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 45 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This package implements several algorithms using stochastic models and provides
* Maximum likelihood when the target or source point cloud is observation data
* [Coherent Point Drift (2010)](https://arxiv.org/pdf/0905.2635.pdf)
* [Extended Coherent Point Drift (2016)](https://ieeexplore.ieee.org/abstract/document/7477719) (add correspondence priors to CPD)
* [Color Coherent Point Drift (2018)](https://arxiv.org/pdf/1802.01516)
* [FilterReg (CVPR2019)](https://arxiv.org/pdf/1811.10136.pdf)
* Variational Bayesian inference
* [Bayesian Coherent Point Drift (2020)](https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=8985307)
Expand Down
3 changes: 1 addition & 2 deletions probreg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from . import (bcpd, callbacks, cpd, filterreg, gmmtree, l2dist_regs, log,
math_utils, transformation)
from . import bcpd, callbacks, cpd, filterreg, gmmtree, l2dist_regs, log, math_utils, transformation
from .version import __version__
6 changes: 3 additions & 3 deletions probreg/bcpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def expectation_step(self, t_source, target, scale, alpha, sigma_mat, sigma2, w=
pmat = np.exp(-pmat / (2.0 * sigma2))
pmat /= (2.0 * np.pi * sigma2) ** (dim * 0.5)
pmat = pmat.T
pmat *= np.exp(-(scale ** 2) / (2 * sigma2) * np.diag(sigma_mat) * dim)
pmat *= np.exp(-(scale**2) / (2 * sigma2) * np.diag(sigma_mat) * dim)
pmat *= (1.0 - w) * alpha
den = w / target.shape[0] + np.sum(pmat, axis=1)
den[den == 0] = np.finfo(np.float32).eps
Expand Down Expand Up @@ -126,7 +126,7 @@ def _maximization_step(source, target, rigid_trans, estep_res, gmat_inv, lmd, k,
nu_d, nu, n_p, px, x_hat = estep_res
dim = source.shape[1]
m = source.shape[0]
s2s2 = rigid_trans.scale ** 2 / (sigma2_p ** 2)
s2s2 = rigid_trans.scale**2 / (sigma2_p**2)
sigma_mat_inv = lmd * gmat_inv + s2s2 * np.diag(nu)
sigma_mat = np.linalg.inv(sigma_mat_inv)
residual = rigid_trans.inverse().transform(x_hat) - source
Expand All @@ -152,7 +152,7 @@ def _maximization_step(source, target, rigid_trans, estep_res, gmat_inv, lmd, k,
s1 = np.dot(target.ravel(), np.kron(nu_d, np.ones(dim)) * target.ravel())
s2 = np.dot(px.ravel(), y_hat.ravel())
s3 = np.dot(y_hat.ravel(), np.kron(nu, np.ones(dim)) * y_hat.ravel())
sigma2 = (s1 - 2.0 * s2 + s3) / (n_p * dim) + scale ** 2 * sigma2_m
sigma2 = (s1 - 2.0 * s2 + s3) / (n_p * dim) + scale**2 * sigma2_m
return MstepResult(tf.CombinedTransformation(rot, t, scale, v_hat), u_hat, sigma_mat, alpha, sigma2)


Expand Down
15 changes: 10 additions & 5 deletions probreg/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def asnumpy(x):
from .transformation import Transformation


class Plot2DCallback(object):
class Plot2DCallback:
"""Display the 2D registration result of each iteration.
Args:
Expand Down Expand Up @@ -62,12 +62,12 @@ def __call__(self, transformation: Transformation) -> None:
self._cnt += 1


class Open3dVisualizerCallback(object):
class Open3dVisualizerCallback:
"""Display the 3D registration result of each iteration.
Args:
source (numpy.ndarray): Source point cloud data.
target (numpy.ndarray): Target point cloud data.
source (open3d.geometry.PointCloud): Source point cloud data.
target (open3d.geometry.PointCloud): Target point cloud data.
save (bool, optional): If this flag is True,
each iteration image is saved in a sequential number.
keep_window (bool, optional): If this flag is True,
Expand All @@ -76,7 +76,12 @@ class Open3dVisualizerCallback(object):
"""

def __init__(
self, source: np.ndarray, target: np.ndarray, save: bool = False, keep_window: bool = True, fov: Any = None
self,
source: o3.geometry.PointCloud,
target: o3.geometry.PointCloud,
save: bool = False,
keep_window: bool = True,
fov: Any = None,
):
self._vis = o3.visualization.Visualizer()
self._vis.create_window()
Expand Down
4 changes: 2 additions & 2 deletions probreg/cost_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ def __call__(self, theta: np.ndarray, *args):
def compute_l2_dist(
mu_source: np.ndarray, phi_source: np.ndarray, mu_target: np.ndarray, phi_target: np.ndarray, sigma: float
):
z = np.power(2.0 * np.pi * sigma ** 2, mu_source.shape[1] * 0.5)
z = np.power(2.0 * np.pi * sigma**2, mu_source.shape[1] * 0.5)
gtrans = gt.GaussTransform(mu_target, np.sqrt(2.0) * sigma)
phi_j_e = gtrans.compute(mu_source, phi_target / z)
phi_mu_j_e = gtrans.compute(mu_source, phi_target * mu_target.T / z).T
g = (phi_source * phi_j_e * mu_source.T - phi_source * phi_mu_j_e.T).T / (2.0 * sigma ** 2)
g = (phi_source * phi_j_e * mu_source.T - phi_source * phi_mu_j_e.T).T / (2.0 * sigma**2)
return -np.dot(phi_source, phi_j_e), g


Expand Down
118 changes: 89 additions & 29 deletions probreg/cpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ class CoherentPointDrift:
Args:
source (numpy.ndarray, optional): Source point cloud data.
use_color (bool, optional): Use color information (if available).
use_cuda (bool, optional): Use CUDA.
"""

def __init__(self, source: Optional[np.ndarray] = None, use_cuda: bool = False) -> None:
_N_DIM = 3
_N_COLOR = 3

def __init__(self, source: Optional[np.ndarray] = None, use_color: bool = False, use_cuda: bool = False) -> None:
self._source = source
self._tf_type = None
self._callbacks = []
self._use_color = use_color
if use_cuda:
import cupy as cp
from cupyx.scipy.spatial import distance as cupy_distance
Expand All @@ -68,19 +73,41 @@ def set_callbacks(self, callbacks: List[Callable]) -> None:
def _initialize(self, target: np.ndarray) -> MstepResult:
return MstepResult(None, None, None)

def expectation_step(self, t_source: np.ndarray, target: np.ndarray, sigma2: float, w: float = 0.0) -> EstepResult:
"""Expectation step for CPD"""
assert t_source.ndim == 2 and target.ndim == 2, "source and target must have 2 dimensions."
def _compute_pmat_numerator(self, t_source: np.ndarray, target: np.ndarray, sigma2: float) -> np.ndarray:
pmat = self.distance_module.cdist(t_source, target, "sqeuclidean")
# pmat = self.xp.stack([self.xp.sum(self.xp.square(target - ts), axis=1) for ts in t_source])
pmat = self.xp.exp(-pmat / (2.0 * sigma2))
return pmat

def expectation_step(
self,
t_source: np.ndarray,
target: np.ndarray,
sigma2: float,
sigma2_c: float,
w: float = 0.0,
) -> EstepResult:
"""Expectation step for CPD"""
assert t_source.ndim == 2 and target.ndim == 2, "source and target must have 2 dimensions."
pmat = self._compute_pmat_numerator(t_source[:, : self._N_DIM], target[:, : self._N_DIM], sigma2)
if self._use_color:
pmat_c = self._compute_pmat_numerator(t_source[:, 3:], target[:, 3:], sigma2_c)

c = (2.0 * np.pi * sigma2) ** (t_source.shape[1] * 0.5)
c = (2.0 * np.pi * sigma2) ** (self._N_DIM * 0.5)
c *= w / (1.0 - w) * t_source.shape[0] / target.shape[0]
den = self.xp.sum(pmat, axis=0)
den[den == 0] = self.xp.finfo(np.float32).eps
if self._use_color:
den_c = self.xp.sum(pmat_c, axis=0)
den_c[den_c == 0] = self.xp.finfo(np.float32).eps
den = np.multiply(den, den_c)
o_c = self.t_source.shape[0] * (2 * np.pi * sigma2_c) ** (0.5 * (self._N_DIM + self._N_COLOR - 1))
o_c *= self.xp.exp(-1.0 / self.t_source.shape[0] * self.xp.sum(pmat_c, axis=0).square() / (2.0 * sigma2_c))
den += o_c
c *= (2.0 * np.pi * sigma2_c) ** (self._N_COLOR * 0.5)
den += c

if self._use_color:
pmat = self.xp.multiply(pmat, pmat_c)
pmat = self.xp.divide(pmat, den)
pt1 = self.xp.sum(pmat, axis=0)
p1 = self.xp.sum(pmat, axis=1)
Expand All @@ -90,7 +117,9 @@ def expectation_step(self, t_source: np.ndarray, target: np.ndarray, sigma2: flo
def maximization_step(
self, target: np.ndarray, estep_res: EstepResult, sigma2_p: Optional[float] = None
) -> Optional[MstepResult]:
return self._maximization_step(self._source, target, estep_res, sigma2_p, xp=self.xp)
return self._maximization_step(
self._source[:, : self._N_DIM], target[:, : self._N_DIM], estep_res, sigma2_p, xp=self.xp
)

@staticmethod
@abc.abstractmethod
Expand All @@ -105,11 +134,14 @@ def _maximization_step(

def registration(self, target: np.ndarray, w: float = 0.0, maxiter: int = 50, tol: float = 0.001) -> MstepResult:
assert not self._tf_type is None, "transformation type is None."
res = self._initialize(target)
res = self._initialize(target[:, : self._N_DIM])
sigma2_c = 0.0
if self._use_color:
sigma2_c = self._squared_kernel_sum(self._source[:, self._N_DIM :], target[:, self._N_DIM :])
q = res.q
for i in range(maxiter):
t_source = res.transformation.transform(self._source)
estep_res = self.expectation_step(t_source, target, res.sigma2, w)
estep_res = self.expectation_step(t_source, target, res.sigma2, sigma2_c, w)
res = self.maximization_step(target, estep_res, res.sigma2)
for c in self._callbacks:
c(res.transformation)
Expand All @@ -127,6 +159,7 @@ class RigidCPD(CoherentPointDrift):
source (numpy.ndarray, optional): Source point cloud data.
update_scale (bool, optional): If this flag is True, compute the scale parameter.
tf_init_params (dict, optional): Parameters to initialize transformation.
use_color (bool, optional): Use color information (if available).
use_cuda (bool, optional): Use CUDA.
"""

Expand All @@ -135,15 +168,16 @@ def __init__(
source: Optional[np.ndarray] = None,
update_scale: bool = True,
tf_init_params: Dict = {},
use_color: bool = False,
use_cuda: bool = False,
) -> None:
super(RigidCPD, self).__init__(source, use_cuda)
super(RigidCPD, self).__init__(source, use_color, use_cuda)
self._tf_type = tf.RigidTransformation
self._update_scale = update_scale
self._tf_init_params = tf_init_params

def _initialize(self, target: np.ndarray) -> MstepResult:
dim = self._source.shape[1]
dim = self._N_DIM
sigma2 = self._squared_kernel_sum(self._source, target)
q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2)
if len(self._tf_init_params) == 0:
Expand All @@ -167,7 +201,7 @@ def _maximization_step(
xp: ModuleType = np,
) -> MstepResult:
pt1, p1, px, n_p = estep_res
dim = source.shape[1]
dim = CoherentPointDrift._N_DIM
mu_x = xp.sum(px, axis=0) / n_p
mu_y = xp.dot(source.T, p1) / n_p
target_hat = target - mu_x
Expand All @@ -187,7 +221,7 @@ def _maximization_step(
else:
sigma2 = (tr_xp1x + tr_yp1y - scale * tr_atr) / (n_p * dim)
sigma2 = max(sigma2, np.finfo(np.float32).eps)
q = (tr_xp1x - 2.0 * scale * tr_atr + (scale ** 2) * tr_yp1y) / (2.0 * sigma2)
q = (tr_xp1x - 2.0 * scale * tr_atr + (scale**2) * tr_yp1y) / (2.0 * sigma2)
q += dim * n_p * 0.5 * np.log(sigma2)
return MstepResult(tf.RigidTransformation(rot, t, scale, xp=xp), sigma2, q)

Expand All @@ -198,16 +232,23 @@ class AffineCPD(CoherentPointDrift):
Args:
source (numpy.ndarray, optional): Source point cloud data.
tf_init_params (dict, optional): Parameters to initialize transformation.
use_color (bool, optional): Use color information (if available).
use_cuda (bool, optional): Use CUDA.
"""

def __init__(self, source: Optional[np.ndarray] = None, tf_init_params: Dict = {}, use_cuda: bool = False) -> None:
super(AffineCPD, self).__init__(source, use_cuda)
def __init__(
self,
source: Optional[np.ndarray] = None,
tf_init_params: Dict = {},
use_color: bool = False,
use_cuda: bool = False,
) -> None:
super(AffineCPD, self).__init__(source, use_color, use_cuda)
self._tf_type = tf.AffineTransformation
self._tf_init_params = tf_init_params

def _initialize(self, target: np.ndarray) -> MstepResult:
dim = self._source.shape[1]
dim = self._N_DIM
sigma2 = self._squared_kernel_sum(self._source, target)
q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2)
if len(self._tf_init_params) == 0:
Expand All @@ -225,7 +266,7 @@ def _maximization_step(
xp: ModuleType = np,
) -> MstepResult:
pt1, p1, px, n_p = estep_res
dim = source.shape[1]
dim = CoherentPointDrift._N_DIM
mu_x = xp.sum(px, axis=0) / n_p
mu_y = xp.dot(source.T, p1) / n_p
target_hat = target - mu_x
Expand All @@ -251,13 +292,19 @@ class NonRigidCPD(CoherentPointDrift):
source (numpy.ndarray, optional): Source point cloud data.
beta (float, optional): Parameter of RBF kernel.
lmd (float, optional): Parameter for regularization term.
use_color (bool, optional): Use color information (if available).
use_cuda (bool, optional): Use CUDA.
"""

def __init__(
self, source: Optional[np.ndarray] = None, beta: float = 2.0, lmd: float = 2.0, use_cuda: bool = False
self,
source: Optional[np.ndarray] = None,
beta: float = 2.0,
lmd: float = 2.0,
use_color: bool = False,
use_cuda: bool = False,
) -> None:
super(NonRigidCPD, self).__init__(source, use_cuda)
super(NonRigidCPD, self).__init__(source, use_color, use_cuda)
self._tf_type = tf.NonRigidTransformation
self._beta = beta
self._lmd = lmd
Expand All @@ -275,7 +322,7 @@ def maximization_step(
return self._maximization_step(self._source, target, estep_res, sigma2_p, self._tf_obj, self._lmd, self.xp)

def _initialize(self, target: np.ndarray) -> MstepResult:
dim = self._source.shape[1]
dim = self._N_DIM
sigma2 = self._squared_kernel_sum(self._source, target)
q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2)
self._tf_obj.w = self.xp.zeros_like(self._source)
Expand All @@ -292,7 +339,7 @@ def _maximization_step(
xp: ModuleType = np,
) -> MstepResult:
pt1, p1, px, n_p = estep_res
dim = source.shape[1]
dim = CoherentPointDrift._N_DIM
w = xp.linalg.solve((p1 * tf_obj.g).T + lmd * sigma2_p * xp.identity(source.shape[0]), px - (source.T * p1).T)
t = source + xp.dot(tf_obj.g, w)
tr_xp1x = xp.trace(xp.dot(target.T * pt1, target))
Expand All @@ -316,6 +363,7 @@ class ConstrainedNonRigidCPD(CoherentPointDrift):
alpha (float): Degree of reliability of priors.
Approximately between 1e-8 (highly reliable) and 1 (highly unreliable)
use_cuda (bool, optional): Use CUDA.
use_color (bool, optional): Use color information (if available).
idx_source (numpy.ndarray of ints, optional): Indices in source matrix
for which a correspondance is known
idx_target (numpy.ndarray of ints, optional): Indices in target matrix
Expand All @@ -328,11 +376,12 @@ def __init__(
beta: float = 2.0,
lmd: float = 2.0,
alpha: float = 1e-8,
use_color: bool = False,
use_cuda: bool = False,
idx_source: Optional[np.ndarray] = None,
idx_target: Optional[np.ndarray] = None,
):
super(ConstrainedNonRigidCPD, self).__init__(source, use_cuda)
super(ConstrainedNonRigidCPD, self).__init__(source, use_color, use_cuda)
self._tf_type = tf.NonRigidTransformation
self._beta = beta
self._lmd = lmd
Expand Down Expand Up @@ -363,7 +412,7 @@ def maximization_step(
)

def _initialize(self, target: np.ndarray) -> MstepResult:
dim = self._source.shape[1]
dim = self._N_DIM
sigma2 = self._squared_kernel_sum(self._source, target)
q = 1.0 + target.shape[0] * dim * 0.5 * np.log(sigma2)
self._tf_obj.w = self.xp.zeros_like(self._source)
Expand All @@ -388,7 +437,7 @@ def _maximization_step(
xp: ModuleType = np,
) -> MstepResult:
pt1, p1, px, n_p = estep_res
dim = source.shape[1]
dim = CoherentPointDrift._N_DIM
w = xp.linalg.solve(
(p1 * tf_obj.g).T
+ sigma2_p / alpha * (p1_tilde * tf_obj.g).T
Expand All @@ -412,6 +461,7 @@ def registration_cpd(
maxiter: int = 50,
tol: float = 0.001,
callbacks: List[Callable] = [],
use_color: bool = False,
use_cuda: bool = False,
**kwargs: Any,
) -> MstepResult:
Expand All @@ -426,6 +476,7 @@ def registration_cpd(
tol (float, optional): Tolerance for termination.
callback (:obj:`list` of :obj:`function`, optional): Called after each iteration.
`callback(probreg.Transformation)`
use_color (bool, optional): Use color information (if available).
use_cuda (bool, optional): Use CUDA.
Keyword Args:
Expand All @@ -441,15 +492,24 @@ def registration_cpd(
import cupy as cp

xp = cp
cv = lambda x: xp.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x)
if use_color:
cv = (
lambda x: xp.c_[xp.asarray(x.points), xp.asarray(x.colors)]
if isinstance(x, o3.geometry.PointCloud)
else xp.asanyarray(x)[:, :6]
)
else:
cv = lambda x: xp.asarray(x.points if isinstance(x, o3.geometry.PointCloud) else x)[
:, : CoherentPointDrift._N_DIM
]
if tf_type_name == "rigid":
cpd = RigidCPD(cv(source), use_cuda=use_cuda, **kwargs)
cpd = RigidCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs)
elif tf_type_name == "affine":
cpd = AffineCPD(cv(source), use_cuda=use_cuda, **kwargs)
cpd = AffineCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs)
elif tf_type_name == "nonrigid":
cpd = NonRigidCPD(cv(source), use_cuda=use_cuda, **kwargs)
cpd = NonRigidCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs)
elif tf_type_name == "nonrigid_constrained":
cpd = ConstrainedNonRigidCPD(cv(source), use_cuda=use_cuda, **kwargs)
cpd = ConstrainedNonRigidCPD(cv(source), use_color=use_color, use_cuda=use_cuda, **kwargs)
else:
raise ValueError("Unknown transformation type %s" % tf_type_name)
cpd.set_callbacks(callbacks)
Expand Down
Loading

0 comments on commit 2e354a2

Please sign in to comment.