Source code for lavaflow.velocity

"""Functions for interpolating a velocity field.
"""

import cv2
import numpy as np
import triangle

from IPython import embed

import logging
logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Sparse flow postprocessing

[docs]class SparseDelaunayTriangulation(object): def __init__(self): """Construct class for finding a contrained delaunay triangulation with an internal and external boundary. """ pass
[docs] def __call__(self, p1, b1, p2, b2): """Interpolate tracked points. Args: p1 (np.ndarray): internal points in source image, shaped (N, 1, 2) b1 (np.ndarray): boundary points in source image, shaped (N, 1, 2) p2 (nd.ndarray): internal points in target image, shaped (N, 1, 2) b2 (nd.ndarray): boundary points in target image, shaped (N, 1, 2) Returns: tri (dict): output from triangle.triangulate internal_boundary (np.ndarray): ordered internal boundary segments external_boundary (np.ndarray): ordered external boundary segments s1 (np.ndarray): steiner points in source image added by triangle.triangulate, shaped (N, 1, 2) s2 (np.ndarray): steiner points in target image added by triangle.triangulate, shaped (N, 1, 2) """ pb1 = np.vstack([p1, b1]) pb2 = np.vstack([p2, b2]) if pb1.shape[0] < 3: tri = { 'vertices': np.empty((0, 2)), 'vertex_markers': np.empty((0, 2), dtype=np.int32), 'triangles': np.empty((0, 3), dtype=np.int32), 'segments': np.empty((0, 2), dtype=np.int32), 'segment_markers': np.empty((0, 1), dtype=np.int32), 'edges': np.empty((0, 2), dtype=np.int32), 'edge_markers': np.empty((0, 1), dtype=np.int32), } internal_boundary = np.empty((0, 2, 2)) external_boundary = np.empty((0, 2, 2)) s1 = np.empty((0, 2)) s2 = np.empty((0, 2)) return tri, internal_boundary, external_boundary, s1, s2 # TODO: check if return empty arrays is okay # p1.shape[0] < 3 or # embed(colors="neutral") # raise ValueError("not enough points to evaluate Delaunay based interpolation grid") # Internal boundary if p1.shape[0] < 3: internal_segments = np.empty((0, 2), dtype=np.int32) else: # Triangulate internal points convex_boundary = triangle.convex_hull(p1) tri = { 'vertices': p1, 'segments': convex_boundary, } tri = triangle.triangulate(tri, 'p') b = b1 t = tri['triangles'] v = tri['vertices'][tri['triangles']] xmin = v[:, :, 0].min(axis=1) xmax = v[:, :, 0].max(axis=1) ymin = v[:, :, 1].min(axis=1) ymax = v[:, :, 1].max(axis=1) bmask = (b[:, 0] > xmin.min()) & (b[:, 0] < xmax.max()) & (b[:, 1] > ymin.min()) & (b[:, 1] < ymax.max()) tmask = np.full(t.shape[0], True, dtype=bool) vmask = np.full(t.shape, False, dtype=bool) # Initialize concave boundary with convex_boundary for edge in convex_boundary: temp = np.isin(t, edge) for k in np.flatnonzero(temp.sum(axis=1) == 2): e = (np.flatnonzero(np.invert(temp[k]))[0] + 1) % 3 vmask[k, e] = True # Check which boundary points are inside triangules to determine concave regions for j, p in enumerate(b): # Skip boundary points outside range if not bmask[j]: continue # Check triangles within range p = p.reshape(-1, 1) pmask = (xmin < p[0]) & (p[0] < xmax) & (ymin < p[1]) & (p[1] < ymax) # strict inequality for i in np.flatnonzero(pmask): a0 = v[i, 0].reshape(-1, 1) a1 = v[i, 1].reshape(-1, 1) a2 = v[i, 2].reshape(-1, 1) # Check if boundary point in triangle using barycentric coordinates coef = np.linalg.solve(np.hstack([a1 - a0, a2 - a0]), p - a0) if coef.sum() < 1 and all((coef > 0) & (coef < 1)): # Recursively check boundary intersection with adjacent triangles history = set() def check_adjacent(i): history.add(i) tmask[i] = False vmask[i] = False sides = [(j + 1) % b.shape[0], (j - 1) % b.shape[0]] edges = [[t[i, e-1], t[i, e]] for e in range(0, 3)] # check each edge of the triangle against each side of the boundary for edge in edges: temp = np.isin(t, edge) for k in set(np.flatnonzero(temp.sum(axis=1) == 2)) - history: if tmask[k]: e = (np.flatnonzero(np.invert(temp[k]))[0] + 1) % 3 vmask[k, e] = True a, c = tri['vertices'][edge] for side in sides: try: r, s = np.linalg.solve(np.hstack([(b[side] - b[j]).reshape(-1, 1), (a - c).reshape(-1, 1)]), (a - b[j]).reshape(-1, 1)).flatten() if 0 <= r <= 1 and 0 <= s <= 1: check_adjacent(k) break except np.linalg.LinAlgError: pass check_adjacent(i) internal_segments = np.array([[t[i, j], t[i, (j + 1) % 3]] for i, j in zip(*np.where(vmask))]) # Triangulate points constrained by concave boundary temp = p1.shape[0] + np.arange(0, b1.shape[0]).reshape(-1, 1) external_segments = np.hstack([np.roll(temp, 1), temp]) tri = { 'vertices': pb1, 'segments': np.vstack([internal_segments, external_segments]), } tri = triangle.triangulate(tri, 'pe') # Manually insert new points by averaging adjacent movements c = pb1.shape[0] d = tri['vertices'].shape[0] pbs1 = tri['vertices'] pbs2 = np.vstack([pb2, np.empty((d - c, 2))]) if d > c: for i in range(c, d): matches = tri['edges'] == i neighbors = tri['edges'][(tri['edges'] < c) & ~matches & matches.any(axis=1, keepdims=True)] pbs2[i, :] = pbs1[i, :] + np.mean(pbs2[neighbors, :] - pbs1[neighbors, :], axis=0) s1 = pbs1[c:d, :] s2 = pbs2[c:d, :] internal_boundary = pbs1[internal_segments] external_boundary = pbs1[external_segments] return tri, internal_boundary, external_boundary, s1, s2 # TODO: could return triangles indices only
# ----------------------------------------------------------------------------- # Sparse interpolation methods
[docs]class SparseInterpolation(object): """Class for interpolating on arbitrary points. """ def __init__(self, x, y): """ Args: x (np.ndarray): x coordinates to interpolate (any shape) y (np.ndarray): y coordinates to interpolate (any shape) """ self.x = x self.y = y self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T
[docs] def __call__(self, tri, p1, b1, s1, p2, b2, s2): """Interpolate tracked points. This works by interpolating the velocities at each vertex of the Delaunay triangulation based on the barycentric coordinates of each point and it's corresponding triangle. Args: tri (dict): output from triangle.triangulate p1 (np.ndarray): internal points in source image, shaped (N, 1, 2) b1 (np.ndarray): boundary points in source image, shaped (N, 1, 2) s1 (np.ndarray): steiner points in source image added by triangle.triangulate, shaped (N, 1, 2) p2 (nd.ndarray): internal points in target image, shaped (N, 1, 2) b2 (nd.ndarray): boundary points in target image, shaped (N, 1, 2) s2 (np.ndarray): steiner points in target image added by triangle.triangulate, shaped (N, 1, 2) Returns: x (np.ndarray): x meshgrid y (np.ndarray): y meshgrid u (np.ndarray): x component of velocity on x, y meshgrid v (np.ndarray): y component of velocity on x, y meshgrid """ # TODO: think about if steiner points should be included pbs1 = np.vstack([p1, b1, s1]) pbs2 = np.vstack([p2, b2, s2]) v1 = pbs1[tri['triangles']] # shaped (# triangles, 3, 2) v2 = pbs2[tri['triangles']] # shaped (# triangles, 3, 2) xmin = v1[:, :, 0].min(axis=1) xmax = v1[:, :, 0].max(axis=1) ymin = v1[:, :, 1].min(axis=1) ymax = v1[:, :, 1].max(axis=1) vecs = np.empty(self.points.shape) vecs[:] = np.NaN for j, p in enumerate(self.points): p = p.reshape(-1, 1) mask = (xmin <= p[0]) & (p[0] <= xmax) & (ymin <= p[1]) & (p[1] <= ymax) # relaxed inequality for i in np.flatnonzero(mask): a0 = v1[i, 0].reshape(-1, 1) a1 = v1[i, 1].reshape(-1, 1) a2 = v1[i, 2].reshape(-1, 1) coef = np.linalg.solve(np.hstack([a1 - a0, a2 - a0]), p - a0) if coef.sum() < 1 and all((coef >= 0) & (coef <= 1)): bary = np.insert(coef, 0, 1 - coef.sum()) vecs[j] = ((v2[i] - v1[i]) * bary.reshape(3, -1)).sum(axis=0) break u = vecs[:, 0].reshape(self.x.shape) v = vecs[:, 1].reshape(self.x.shape) # embed(colors='neutral') return self.x, self.y, u, v
[docs]class SparseInterpolationGrid(SparseInterpolation): """Class for interpolating on a regular grid. """ def __init__(self, x, y, w, h, spacing): """ Args: x (double): x coordinate of top left corner of grid y (double): y coordinate of top left corner of grid w (double): width of grid h (double): height of grid spacing (double): grid spacing for output velocity grid """ hpad = h % spacing / 2 wpad = w % spacing / 2 xrange = np.arange(x + wpad, x + w - wpad, spacing) yrange = np.arange(y + hpad, y + h - hpad, spacing) self.x, self.y = np.meshgrid(xrange, yrange) self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T
[docs]class SparseInterpolationLine(SparseInterpolation): """Class for interpolating on a line. """ def __init__(self, points, spacing): """ Args: points (np.ndarray): line points [[x, y], [x, y]] spacing (double): point spacing for output velocity """ points = np.array(points).reshape(-1, 2) v = points[1] - points[0] s = np.linalg.norm(v) tspacing = spacing / s tpad = 1 % tspacing / 2 trange = np.arange(0 + tpad, 1 - tpad, tspacing) self.points = points[0] + np.outer(trange, v) self.x = self.points[:, 0] self.y = self.points[:, 1]
# ----------------------------------------------------------------------------- # Dense interpolation methods
[docs]class DenseInterpolation(object): """Class for interpolating on a dense velocity grid. """ def __init__(self, x, y, kernel_size=21, sigma=20): """ Parameters are documented in examples on the maing page (work in progress). Args: x (np.ndarray): x coordinates to interpolate (any shape) y (np.ndarray): y coordinates to interpolate (any shape) kernel_size (int): kernel size for Guassian smoothing sigma (double): sigma for Guassian smoothing """ self.x = x self.y = y self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T self.kernel_size = kernel_size self.sigma = sigma
[docs] def __call__(self, u, v): """Interpolate tracked points. Args: u (np.ndarray): x component of velocity on x, y pixel meshgrid v (np.ndarray): y component of velocity on x, y pixel meshgrid Returns: x (np.ndarray): x meshgrid y (np.ndarray): y meshgrid u (np.ndarray): x component of velocity on x, y meshgrid v (np.ndarray): y component of velocity on x, y meshgrid """ u_smoothed = cv2.GaussianBlur(u, (self.kernel_size, self.kernel_size), self.sigma) v_smoothed = cv2.GaussianBlur(v, (self.kernel_size, self.kernel_size), self.sigma) vecs = np.empty(self.points.shape) vecs[:] = np.NaN for j, (xcoord, ycoord) in enumerate(self.points): vecs[j] = [u_smoothed[int(ycoord), int(xcoord)], v_smoothed[int(ycoord), int(xcoord)]] u = vecs[:, 0].reshape(self.x.shape) v = vecs[:, 1].reshape(self.x.shape) return self.x, self.y, u, v
[docs]class DenseInterpolationGrid(DenseInterpolation): """Class for interpolating on a regular grid. """ def __init__(self, x, y, w, h, spacing, kernel_size=21, sigma=20): """ Parameters are documented in examples on the maing page (work in progress). Args: x (double): x coordinate of top left corner of grid y (double): y coordinate of top left corner of grid w (double): width of grid h (double): height of grid spacing (double): grid spacing for output velocity grid kernel_size (int): kernel size for Guassian smoothing sigma (double): sigma for Guassian smoothing """ hpad = h % spacing / 2 wpad = w % spacing / 2 xrange = np.arange(x + wpad, x + w - wpad, spacing) yrange = np.arange(y + hpad, y + h - hpad, spacing) self.x, self.y = np.meshgrid(xrange, yrange) self.points = np.vstack([self.x.ravel(), self.y.ravel()]).T self.kernel_size = kernel_size self.sigma = sigma
[docs]class DenseInterpolationLine(DenseInterpolation): """Class for interpolating on a line. """ def __init__(self, points, spacing, kernel_size=21, sigma=20): """ Parameters are documented in examples on the maing page (work in progress). Args: points (np.ndarray): line points [[x, y], [x, y]] spacing (double): point spacing for output velocity kernel_size (int): kernel size for Guassian smoothing sigma (double): sigma for Guassian smoothing """ points = np.array(points).reshape(-1, 2) v = points[1] - points[0] s = np.linalg.norm(v) tspacing = spacing / s tpad = 1 % tspacing / 2 trange = np.arange(0 + tpad, 1 - tpad, tspacing) self.points = points[0] + np.outer(trange, v) self.x = self.points[:, 0] self.y = self.points[:, 1] self.kernel_size = kernel_size self.sigma = sigma