"""Functions for interpolating a velocity field.
"""
import cv2
import numpy as np
import triangle
from IPython import embed
import logging
logger = logging.getLogger('lavaflow')
# -----------------------------------------------------------------------------
# Sparse flow postprocessing
[docs]class SparseDelaunayTriangulation(object):
def __init__(self):
"""Construct class for finding a contrained delaunay triangulation with an internal and external boundary.
"""
pass
[docs] def __call__(self, p1, b1, p2, b2):
"""Interpolate tracked points.
Args:
p1 (np.ndarray): internal points in source image, shaped (N, 1, 2)
b1 (np.ndarray): boundary points in source image, shaped (N, 1, 2)
p2 (nd.ndarray): internal points in target image, shaped (N, 1, 2)
b2 (nd.ndarray): boundary points in target image, shaped (N, 1, 2)
Returns:
tri (dict): output from triangle.triangulate
internal_boundary (np.ndarray): ordered internal boundary segments
external_boundary (np.ndarray): ordered external boundary segments
s1 (np.ndarray): steiner points in source image added by triangle.triangulate, shaped (N, 1, 2)
s2 (np.ndarray): steiner points in target image added by triangle.triangulate, shaped (N, 1, 2)
"""
pb1 = np.vstack([p1, b1])
pb2 = np.vstack([p2, b2])
if pb1.shape[0] < 3:
tri = {
'vertices': np.empty((0, 2)),
'vertex_markers': np.empty((0, 2), dtype=np.int32),
'triangles': np.empty((0, 3), dtype=np.int32),
'segments': np.empty((0, 2), dtype=np.int32),
'segment_markers': np.empty((0, 1), dtype=np.int32),
'edges': np.empty((0, 2), dtype=np.int32),
'edge_markers': np.empty((0, 1), dtype=np.int32),
}
internal_boundary = np.empty((0, 2, 2))
external_boundary = np.empty((0, 2, 2))
s1 = np.empty((0, 2))
s2 = np.empty((0, 2))
return tri, internal_boundary, external_boundary, s1, s2 # TODO: check if return empty arrays is okay
# p1.shape[0] < 3 or
# embed(colors="neutral")
# raise ValueError("not enough points to evaluate Delaunay based interpolation grid")
# Internal boundary
if p1.shape[0] < 3:
internal_segments = np.empty((0, 2), dtype=np.int32)
else:
# Triangulate internal points
convex_boundary = triangle.convex_hull(p1)
tri = {
'vertices': p1,
'segments': convex_boundary,
}
tri = triangle.triangulate(tri, 'p')
b = b1
t = tri['triangles']
v = tri['vertices'][tri['triangles']]
xmin = v[:, :, 0].min(axis=1)
xmax = v[:, :, 0].max(axis=1)
ymin = v[:, :, 1].min(axis=1)
ymax = v[:, :, 1].max(axis=1)
bmask = (b[:, 0] > xmin.min()) & (b[:, 0] < xmax.max()) & (b[:, 1] > ymin.min()) & (b[:, 1] < ymax.max())
tmask = np.full(t.shape[0], True, dtype=bool)
vmask = np.full(t.shape, False, dtype=bool)
# Initialize concave boundary with convex_boundary
for edge in convex_boundary:
temp = np.isin(t, edge)
for k in np.flatnonzero(temp.sum(axis=1) == 2):
e = (np.flatnonzero(np.invert(temp[k]))[0] + 1) % 3
vmask[k, e] = True
# Check which boundary points are inside triangules to determine concave regions
for j, p in enumerate(b):
# Skip boundary points outside range
if not bmask[j]:
continue
# Check triangles within range
p = p.reshape(-1, 1)
pmask = (xmin < p[0]) & (p[0] < xmax) & (ymin < p[1]) & (p[1] < ymax) # strict inequality
for i in np.flatnonzero(pmask):
a0 = v[i, 0].reshape(-1, 1)
a1 = v[i, 1].reshape(-1, 1)
a2 = v[i, 2].reshape(-1, 1)
# Check if boundary point in triangle using barycentric coordinates
coef = np.linalg.solve(np.hstack([a1 - a0, a2 - a0]), p - a0)
if coef.sum() < 1 and all((coef > 0) & (coef < 1)):
# Recursively check boundary intersection with adjacent triangles
history = set()
def check_adjacent(i):
history.add(i)
tmask[i] = False
vmask[i] = False
sides = [(j + 1) % b.shape[0], (j - 1) % b.shape[0]]
edges = [[t[i, e-1], t[i, e]] for e in range(0, 3)]
# check each edge of the triangle against each side of the boundary
for edge in edges:
temp = np.isin(t, edge)
for k in set(np.flatnonzero(temp.sum(axis=1) == 2)) - history:
if tmask[k]:
e = (np.flatnonzero(np.invert(temp[k]))[0] + 1) % 3
vmask[k, e] = True
a, c = tri['vertices'][edge]
for side in sides:
try:
r, s = np.linalg.solve(np.hstack([(b[side] - b[j]).reshape(-1, 1), (a - c).reshape(-1, 1)]), (a - b[j]).reshape(-1, 1)).flatten()
if 0 <= r <= 1 and 0 <= s <= 1:
check_adjacent(k)
break
except np.linalg.LinAlgError:
pass
check_adjacent(i)
internal_segments = np.array([[t[i, j], t[i, (j + 1) % 3]] for i, j in zip(*np.where(vmask))])
# Triangulate points constrained by concave boundary
temp = p1.shape[0] + np.arange(0, b1.shape[0]).reshape(-1, 1)
external_segments = np.hstack([np.roll(temp, 1), temp])
tri = {
'vertices': pb1,
'segments': np.vstack([internal_segments, external_segments]),
}
tri = triangle.triangulate(tri, 'pe')
# Manually insert new points by averaging adjacent movements
c = pb1.shape[0]
d = tri['vertices'].shape[0]
pbs1 = tri['vertices']
pbs2 = np.vstack([pb2, np.empty((d - c, 2))])
if d > c:
for i in range(c, d):
matches = tri['edges'] == i
neighbors = tri['edges'][(tri['edges'] < c) & ~matches & matches.any(axis=1, keepdims=True)]
pbs2[i, :] = pbs1[i, :] + np.mean(pbs2[neighbors, :] - pbs1[neighbors, :], axis=0)
s1 = pbs1[c:d, :]
s2 = pbs2[c:d, :]
internal_boundary = pbs1[internal_segments]
external_boundary = pbs1[external_segments]
return tri, internal_boundary, external_boundary, s1, s2 # TODO: could return triangles indices only
# -----------------------------------------------------------------------------
# Sparse interpolation methods
[docs]class SparseInterpolation(object):
"""Class for interpolating on arbitrary points.
"""
def __init__(self, x, y):
"""
Args:
x (np.ndarray): x coordinates to interpolate (any shape)
y (np.ndarray): y coordinates to interpolate (any shape)
"""
self.x = x
self.y = y
self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T
[docs] def __call__(self, tri, p1, b1, s1, p2, b2, s2):
"""Interpolate tracked points.
This works by interpolating the velocities at each vertex of the Delaunay
triangulation based on the barycentric coordinates of each point and it's
corresponding triangle.
Args:
tri (dict): output from triangle.triangulate
p1 (np.ndarray): internal points in source image, shaped (N, 1, 2)
b1 (np.ndarray): boundary points in source image, shaped (N, 1, 2)
s1 (np.ndarray): steiner points in source image added by triangle.triangulate, shaped (N, 1, 2)
p2 (nd.ndarray): internal points in target image, shaped (N, 1, 2)
b2 (nd.ndarray): boundary points in target image, shaped (N, 1, 2)
s2 (np.ndarray): steiner points in target image added by triangle.triangulate, shaped (N, 1, 2)
Returns:
x (np.ndarray): x meshgrid
y (np.ndarray): y meshgrid
u (np.ndarray): x component of velocity on x, y meshgrid
v (np.ndarray): y component of velocity on x, y meshgrid
"""
# TODO: think about if steiner points should be included
pbs1 = np.vstack([p1, b1, s1])
pbs2 = np.vstack([p2, b2, s2])
v1 = pbs1[tri['triangles']] # shaped (# triangles, 3, 2)
v2 = pbs2[tri['triangles']] # shaped (# triangles, 3, 2)
xmin = v1[:, :, 0].min(axis=1)
xmax = v1[:, :, 0].max(axis=1)
ymin = v1[:, :, 1].min(axis=1)
ymax = v1[:, :, 1].max(axis=1)
vecs = np.empty(self.points.shape)
vecs[:] = np.NaN
for j, p in enumerate(self.points):
p = p.reshape(-1, 1)
mask = (xmin <= p[0]) & (p[0] <= xmax) & (ymin <= p[1]) & (p[1] <= ymax) # relaxed inequality
for i in np.flatnonzero(mask):
a0 = v1[i, 0].reshape(-1, 1)
a1 = v1[i, 1].reshape(-1, 1)
a2 = v1[i, 2].reshape(-1, 1)
coef = np.linalg.solve(np.hstack([a1 - a0, a2 - a0]), p - a0)
if coef.sum() < 1 and all((coef >= 0) & (coef <= 1)):
bary = np.insert(coef, 0, 1 - coef.sum())
vecs[j] = ((v2[i] - v1[i]) * bary.reshape(3, -1)).sum(axis=0)
break
u = vecs[:, 0].reshape(self.x.shape)
v = vecs[:, 1].reshape(self.x.shape)
# embed(colors='neutral')
return self.x, self.y, u, v
[docs]class SparseInterpolationGrid(SparseInterpolation):
"""Class for interpolating on a regular grid.
"""
def __init__(self, x, y, w, h, spacing):
"""
Args:
x (double): x coordinate of top left corner of grid
y (double): y coordinate of top left corner of grid
w (double): width of grid
h (double): height of grid
spacing (double): grid spacing for output velocity grid
"""
hpad = h % spacing / 2
wpad = w % spacing / 2
xrange = np.arange(x + wpad, x + w - wpad, spacing)
yrange = np.arange(y + hpad, y + h - hpad, spacing)
self.x, self.y = np.meshgrid(xrange, yrange)
self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T
[docs]class SparseInterpolationLine(SparseInterpolation):
"""Class for interpolating on a line.
"""
def __init__(self, points, spacing):
"""
Args:
points (np.ndarray): line points [[x, y], [x, y]]
spacing (double): point spacing for output velocity
"""
points = np.array(points).reshape(-1, 2)
v = points[1] - points[0]
s = np.linalg.norm(v)
tspacing = spacing / s
tpad = 1 % tspacing / 2
trange = np.arange(0 + tpad, 1 - tpad, tspacing)
self.points = points[0] + np.outer(trange, v)
self.x = self.points[:, 0]
self.y = self.points[:, 1]
# -----------------------------------------------------------------------------
# Dense interpolation methods
[docs]class DenseInterpolation(object):
"""Class for interpolating on a dense velocity grid.
"""
def __init__(self, x, y, kernel_size=21, sigma=20):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
x (np.ndarray): x coordinates to interpolate (any shape)
y (np.ndarray): y coordinates to interpolate (any shape)
kernel_size (int): kernel size for Guassian smoothing
sigma (double): sigma for Guassian smoothing
"""
self.x = x
self.y = y
self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T
self.kernel_size = kernel_size
self.sigma = sigma
[docs] def __call__(self, u, v):
"""Interpolate tracked points.
Args:
u (np.ndarray): x component of velocity on x, y pixel meshgrid
v (np.ndarray): y component of velocity on x, y pixel meshgrid
Returns:
x (np.ndarray): x meshgrid
y (np.ndarray): y meshgrid
u (np.ndarray): x component of velocity on x, y meshgrid
v (np.ndarray): y component of velocity on x, y meshgrid
"""
u_smoothed = cv2.GaussianBlur(u, (self.kernel_size, self.kernel_size), self.sigma)
v_smoothed = cv2.GaussianBlur(v, (self.kernel_size, self.kernel_size), self.sigma)
vecs = np.empty(self.points.shape)
vecs[:] = np.NaN
for j, (xcoord, ycoord) in enumerate(self.points):
vecs[j] = [u_smoothed[int(ycoord), int(xcoord)], v_smoothed[int(ycoord), int(xcoord)]]
u = vecs[:, 0].reshape(self.x.shape)
v = vecs[:, 1].reshape(self.x.shape)
return self.x, self.y, u, v
[docs]class DenseInterpolationGrid(DenseInterpolation):
"""Class for interpolating on a regular grid.
"""
def __init__(self, x, y, w, h, spacing, kernel_size=21, sigma=20):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
x (double): x coordinate of top left corner of grid
y (double): y coordinate of top left corner of grid
w (double): width of grid
h (double): height of grid
spacing (double): grid spacing for output velocity grid
kernel_size (int): kernel size for Guassian smoothing
sigma (double): sigma for Guassian smoothing
"""
hpad = h % spacing / 2
wpad = w % spacing / 2
xrange = np.arange(x + wpad, x + w - wpad, spacing)
yrange = np.arange(y + hpad, y + h - hpad, spacing)
self.x, self.y = np.meshgrid(xrange, yrange)
self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T
self.kernel_size = kernel_size
self.sigma = sigma
[docs]class DenseInterpolationLine(DenseInterpolation):
"""Class for interpolating on a line.
"""
def __init__(self, points, spacing, kernel_size=21, sigma=20):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
points (np.ndarray): line points [[x, y], [x, y]]
spacing (double): point spacing for output velocity
kernel_size (int): kernel size for Guassian smoothing
sigma (double): sigma for Guassian smoothing
"""
points = np.array(points).reshape(-1, 2)
v = points[1] - points[0]
s = np.linalg.norm(v)
tspacing = spacing / s
tpad = 1 % tspacing / 2
trange = np.arange(0 + tpad, 1 - tpad, tspacing)
self.points = points[0] + np.outer(trange, v)
self.x = self.points[:, 0]
self.y = self.points[:, 1]
self.kernel_size = kernel_size
self.sigma = sigma