Source code for lavaflow.tracking

"""Functions for boundary detection and masking.
"""

import cv2
import numpy as np

from abc import ABC, abstractmethod

from IPython import embed

from lavaflow.utils.geometry import distance

import logging

from lavaflow.utils.opencv import applyMask

logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Abstract class

[docs]class SparseFlow(ABC): """Generic class for tracking sparse flow. """
[docs] @abstractmethod def __call__(self, images, boundaries): """Extract and track specific feature points from source image. The images and boundaries lists should have the same length and should correspond to an ordered sequence of images. The results points ``p1`` and ``p2`` come from the first and last images respectively Args: images (list): sequence of images boundaries (list): sequence of boundary contour points from each image Returns: p1 (np.ndarray): feature points in first image, shaped (N, 1, 2) p2 (nd.ndarray): feature points in final image, shaped (N, 1, 2) """ pass
# ----------------------------------------------------------------------------- # Sparse optical flow
[docs]class LucasKanadeSparseOpticalFlow(SparseFlow): """Class for Lucas Kanade sparse optical flow based tracking. """ def __init__(self, flow_source_geometry, extractor, winSize, maxLevel, maxIters, epsilon, min_displacement, max_displacement, exclude): """ Parameters are documented in examples on the maing page (work in progress). Args: flow_source_geometry (dict): flow source geometry extractor (cv2.Feature2D): any class implementing the detect and compute methods defined by cv2::Feature2D winSize (int): size of the search window maxLevel (int): number of pyramid levels maxIters (int): maximum number of iterations epsilon (float): minimum convergence required min_displacement (float): minimum displacement required max_displacement (float): maximum displacement allowed exclude (list): list of points to exclude expressed as (point center, radius) """ self.flow_source_geometry = flow_source_geometry self.extractor = extractor self.winSize = winSize self.maxLevel = maxLevel self.criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, maxIters, epsilon) self.min_displacement = min_displacement self.max_displacement = max_displacement self.exclude = exclude
[docs] def __call__(self, images, boundaries): """Extract and track feature points using Luca Kanade sparse optical flow. Args: images (list): sequence of images boundaries (list): sequence of boundary contour points from each image Returns: p1 (np.ndarray): feature points in first image, shaped (N, 1, 2) p2 (nd.ndarray): feature points in final image, shaped (N, 1, 2) """ # Extract points and run sparse optical flow # embed(colors='neutral') boundary_masks = [] for i in range(0, len(images)): boundary_mask = np.zeros(images[i].shape[:2], dtype="uint8") cv2.fillPoly(boundary_mask, [boundaries[0]], (255, 255, 255)) boundary_masks.append(boundary_mask) img1 = applyMask(images[0], boundary_masks[0], -1, 3, 3) img2 = applyMask(images[1], boundary_masks[1], -1, 3, 3) kp1 = self.extractor.detect(img1, None) p1 = np.array([kp.pt for kp in kp1]).reshape(-1, 1, 2).astype(np.float32) pi = p1.copy() for i in range(1, len(images) - 1): if len(p1) == 0: return np.empty((0, 2)), np.empty((0, 2)) # Calculate optical flow p2, st2, err2 = cv2.calcOpticalFlowPyrLK( img1, img2, p1, None, winSize=self.winSize, maxLevel=self.maxLevel, criteria=self.criteria ) if len(p2) == 0: return np.empty((0, 2)), np.empty((0, 2)) p3, st3, err3 = cv2.calcOpticalFlowPyrLK( img2, img1, p2, None, winSize=self.winSize, maxLevel=self.maxLevel, criteria=self.criteria ) if len(p3) == 0: return np.empty((0, 2)), np.empty((0, 2)) # Remove points that are involid d = np.sqrt(np.sum((p1 - p2).reshape(-1, 2) ** 2, axis=1)) mask = ( (st2.flatten() == 1) & (st3.flatten() == 1) & (d >= self.min_displacement) & (d <= self.max_displacement) # np.array([cv2.pointPolygonTest(boundaries[i], (x, y), False) > 0 for x, y in p2]) ) # Remove points that are excluded for p, r in self.exclude: mask &= np.sqrt(((p1 - p) ** 2).reshape(-1, 2).sum(axis=1)) > r # Loop pi = pi[mask] p1 = p2[mask] img1 = img2 img2 = applyMask(images[i + 1], boundary_masks[i + 1], -1, 3, 3) pf = p1.copy() pi = pi.reshape(-1, 2) pf = pf.reshape(-1, 2) if self.flow_source_geometry['type'] == 'circle': pi = np.vstack([pi, self.flow_source_geometry['data']['center']]) pf = np.vstack([pf, self.flow_source_geometry['data']['center']]) return pi, pf
# ----------------------------------------------------------------------------- # Boundary flow
[docs]class NormalBoundaryFlow(object): """Parameters are documented in examples on the maing page (work in progress). """ def __init__(self, boundary_chord, boundary_point_pixel_spacing, boundary_intersection_lookahead): """ Parameters are documented in examples on the maing page (work in progress). Args: boundary_chord (int): number of points to use for determining normal vector to boundary chord boundary_point_pixel_spacing (int): approximate pixel spacing between boundary points (used to simplify boundary) boundary_intersection_lookahead (int): number of points to look ahead to determine target boundary intersections """ self.boundary_chord = boundary_chord self.boundary_point_pixel_spacing = boundary_point_pixel_spacing self.boundary_intersection_lookahead = boundary_intersection_lookahead
[docs] def __call__(self, img1, img2, boundary1=None, boundary2=None): """Extract feature points from source image using Luca Kanade sparse optical flow. Args: img1 (np.ndarray): source image (not used) img2 (np.ndarray): target image (not used) boundary1 (np.ndarray): list of boundary contour points from source image boundary2 (np.ndarray): list of boundary contour points from target image Returns: p1 (np.ndarray): feature points in source image, shaped (N, 1, 2) p2 (nd.ndarray): feature points in target image, shaped (N, 1, 2) """ # TODO: migrate to # embed(colors='neutral') # Extract points and compute center points and normals temp1 = boundary1.reshape(-1, 2).astype(np.float) temp2 = boundary2.reshape(-1, 2).astype(np.float) if temp1.shape[0] > 25: # TODO: improve or parameterise threshold boundary_chord = self.boundary_chord else: boundary_chord = 1 p1 = (temp1 + np.roll(temp1, 1, axis=0)) / 2 normals = np.roll((np.roll(temp1, -boundary_chord, axis=0) - np.roll(temp1, boundary_chord, axis=0)), 1, axis=1) * np.array([-1, 1]) normals = normals / np.sqrt((normals ** 2).sum(axis=1)).reshape(-1, 1) # Simplify boundary uniformly if ((p1.max(axis=0) - p1.min(axis=0)) > (self.boundary_point_pixel_spacing * 4)).any(): # TODO: improve or parameterise threshold temp = self.boundary_point_pixel_spacing * (np.cumsum(np.sqrt(((temp1 - np.roll(temp1, 1, axis=0)) ** 2).sum(axis=1))) / self.boundary_point_pixel_spacing).round(0) index = np.flatnonzero((temp - np.roll(temp, 1, axis=0)) > 0) p1 = p1[index] normals = normals[index] # Find intersection with second boundary i = np.sqrt(((p1[0] - temp2) ** 2).sum(axis=1)).argmin() vectors = np.zeros(p1.shape) # vectors[:] = np.NaN for z, (p, n) in enumerate(zip(p1, normals)): for j in range(i - self.boundary_intersection_lookahead, i + self.boundary_intersection_lookahead + 1): j = j % temp2.shape[0] try: a, b = temp2[(j, (j + 1) % temp2.shape[0]), :] s, t = np.linalg.solve(np.hstack([n.reshape(-1, 1), (a - b).reshape(-1, 1)]), (a - p).reshape(-1, 1)).flatten() if t >= 0 and t <= 1 and s >= -5: # TODO: investigate why this condition isn't being satisfied q = p + s * n i = j vectors[z] = (q - p) / 12 break except np.linalg.LinAlgError: pass p2 = p1 + vectors _, i1 = np.unique(p1, axis=0, return_index=True) _, i2 = np.unique(p2, axis=0, return_index=True) unique = sorted(set(i1).intersection(set(i2))) p1 = p1[unique] p2 = p2[unique] return p1, p2 # TODO: store and visualize normals
# ----------------------------------------------------------------------------- # OpenCV feature + descriptor matcher tracking
[docs]class FeatureDescriptorMatching(SparseFlow): """Class for descriptor based feature matching and tracking. """ def __init__(self, extractor, matcher, ratio, speed_lower_bound, speed_upper_bound): """ Parameters are documented in examples on the maing page (work in progress). Args: extractor (cv2.Feature2D): any class implementing the detect and compute methods defined by cv2::Feature2D matcher (cv2.DescriptorMatcher): any class implementing the knnMatch method defined by cv2::DescriptorMatcher ratio (float): distance ratio used to filter candidates speed_lower_bound (float): speed must be above lower bound to be accepted speed_upper_bound (float): speed must be below upper bound to be accepted """ self.extractor = extractor self.matcher = matcher self.ratio = ratio self.speed_lower_bound = speed_lower_bound self.speed_upper_bound = speed_upper_bound
[docs] def __call__(self, img1, img2, boundary1=None, boundary2=None): """Compute feature tracking across a pair of images. Args: img1 (np.ndarray): source image img2 (np.ndarray): target image boundary1 (np.ndarray): list of boundary contour points from source image boundary2 (np.ndarray): list of boundary contour points from target image Returns: p1 (np.ndarray): feature points in source image, shaped (N, 1, 2) p2 (nd.ndarray): feature points in target image, shaped (N, 1, 2) """ kp1, des1 = self.extractor.detectAndCompute(img1, None) kp2, des2 = self.extractor.detectAndCompute(img2, None) matches = self.matcher.knnMatch(des1, des2, k=2) p1 = np.empty((len(matches), 2)) p2 = np.empty((len(matches), 2)) mask = np.ones((len(matches),), dtype=bool) for i, (a, b) in enumerate(matches): if a.distance < self.ratio * b.distance: x1, y1 = int(kp1[a.queryIdx].pt[0]), int(kp1[a.queryIdx].pt[1]) x2, y2 = int(kp2[a.trainIdx].pt[0]), int(kp2[a.trainIdx].pt[1]) speed = distance((x1, y1), (x2, y2)) angle = np.arctan2((y1 - y2), (x1 - x2)) if angle < 0: angle += 2 * np.pi flag = True flag = flag and speed > self.speed_lower_bound flag = flag and speed < self.speed_upper_bound if boundary1 is not None: flag = flag and cv2.pointPolygonTest(boundary1, (x1, y1), False) > 0 if boundary2 is not None: flag = flag and cv2.pointPolygonTest(boundary2, (x2, y2), False) > 0 p1[i] = (x1, y1) p2[i] = (x2, y2) mask[i] = flag return p1[mask], p2[mask]
# ----------------------------------------------------------------------------- # Dense optical flow
[docs]class FarnebackDenseOpticalFlow(object): """Class for Farneback dense optical flow. """ def __init__(self, pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, flags): """ Parameters are documented in examples on the maing page (work in progress). Args: pyr_scale (float): scale between layers levels (int): number of layers winsize (int): averaging window size iterations (int): number of iterations algorithm performs at each level poly_n (int): size of pixel neighbourhood used for polynomial expansion in each pixel poly_sigma (float): standard deviation of Gaussian used for smoothing derivatives flags (str): operation flags """ self.pyr_scale = pyr_scale self.levels = levels self.winsize = winsize self.iterations = iterations self.poly_n = poly_n self.poly_sigma = poly_sigma self.flags = flags
[docs] def __call__(self, img1, img2): """Generate flow vector field from source image using Farneback dense optical flow. Args: img1 (np.ndarray): source image img2 (np.ndarray): target image Returns: u (np.ndarray): x component of velocity on x, y pixel meshgrid v (np.ndarray): y component of velocity on x, y pixel meshgrid """ # Input frame preprocessing gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) # gray1 = cv2.UMat(gray1) # gray2 = cv2.UMat(gray2) # Compute optical flow flow = cv2.calcOpticalFlowFarneback( gray1, gray2, None, self.pyr_scale, self.levels, self.winsize, self.iterations, self.poly_n, self.poly_sigma, self.flags ) # flow = flow.get() u = flow[..., 0] v = flow[..., 1] return u, v