"""Functions for boundary detection and masking.
"""
import cv2
import numpy as np
from abc import ABC, abstractmethod
from IPython import embed
from lavaflow.utils.geometry import distance
import logging
from lavaflow.utils.opencv import applyMask
logger = logging.getLogger('lavaflow')
# -----------------------------------------------------------------------------
# Abstract class
[docs]class SparseFlow(ABC):
"""Generic class for tracking sparse flow.
"""
[docs] @abstractmethod
def __call__(self, images, boundaries):
"""Extract and track specific feature points from source image.
The images and boundaries lists should have the same length and should
correspond to an ordered sequence of images. The results points ``p1``
and ``p2`` come from the first and last images respectively
Args:
images (list): sequence of images
boundaries (list): sequence of boundary contour points from each image
Returns:
p1 (np.ndarray): feature points in first image, shaped (N, 1, 2)
p2 (nd.ndarray): feature points in final image, shaped (N, 1, 2)
"""
pass
# -----------------------------------------------------------------------------
# Sparse optical flow
[docs]class LucasKanadeSparseOpticalFlow(SparseFlow):
"""Class for Lucas Kanade sparse optical flow based tracking.
"""
def __init__(self, flow_source_geometry, extractor, winSize, maxLevel, maxIters, epsilon, min_displacement, max_displacement, exclude):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
flow_source_geometry (dict): flow source geometry
extractor (cv2.Feature2D): any class implementing the detect and compute methods defined by cv2::Feature2D
winSize (int): size of the search window
maxLevel (int): number of pyramid levels
maxIters (int): maximum number of iterations
epsilon (float): minimum convergence required
min_displacement (float): minimum displacement required
max_displacement (float): maximum displacement allowed
exclude (list): list of points to exclude expressed as (point center, radius)
"""
self.flow_source_geometry = flow_source_geometry
self.extractor = extractor
self.winSize = winSize
self.maxLevel = maxLevel
self.criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, maxIters, epsilon)
self.min_displacement = min_displacement
self.max_displacement = max_displacement
self.exclude = exclude
[docs] def __call__(self, images, boundaries):
"""Extract and track feature points using Luca Kanade sparse optical flow.
Args:
images (list): sequence of images
boundaries (list): sequence of boundary contour points from each image
Returns:
p1 (np.ndarray): feature points in first image, shaped (N, 1, 2)
p2 (nd.ndarray): feature points in final image, shaped (N, 1, 2)
"""
# Extract points and run sparse optical flow
# embed(colors='neutral')
boundary_masks = []
for i in range(0, len(images)):
boundary_mask = np.zeros(images[i].shape[:2], dtype="uint8")
cv2.fillPoly(boundary_mask, [boundaries[0]], (255, 255, 255))
boundary_masks.append(boundary_mask)
img1 = applyMask(images[0], boundary_masks[0], -1, 3, 3)
img2 = applyMask(images[1], boundary_masks[1], -1, 3, 3)
kp1 = self.extractor.detect(img1, None)
p1 = np.array([kp.pt for kp in kp1]).reshape(-1, 1, 2).astype(np.float32)
pi = p1.copy()
for i in range(1, len(images) - 1):
if len(p1) == 0:
return np.empty((0, 2)), np.empty((0, 2))
# Calculate optical flow
p2, st2, err2 = cv2.calcOpticalFlowPyrLK(
img1,
img2,
p1,
None,
winSize=self.winSize,
maxLevel=self.maxLevel,
criteria=self.criteria
)
if len(p2) == 0:
return np.empty((0, 2)), np.empty((0, 2))
p3, st3, err3 = cv2.calcOpticalFlowPyrLK(
img2,
img1,
p2,
None,
winSize=self.winSize,
maxLevel=self.maxLevel,
criteria=self.criteria
)
if len(p3) == 0:
return np.empty((0, 2)), np.empty((0, 2))
# Remove points that are involid
d = np.sqrt(np.sum((p1 - p2).reshape(-1, 2) ** 2, axis=1))
mask = (
(st2.flatten() == 1) &
(st3.flatten() == 1) &
(d >= self.min_displacement) &
(d <= self.max_displacement)
# np.array([cv2.pointPolygonTest(boundaries[i], (x, y), False) > 0 for x, y in p2])
)
# Remove points that are excluded
for p, r in self.exclude:
mask &= np.sqrt(((p1 - p) ** 2).reshape(-1, 2).sum(axis=1)) > r
# Loop
pi = pi[mask]
p1 = p2[mask]
img1 = img2
img2 = applyMask(images[i + 1], boundary_masks[i + 1], -1, 3, 3)
pf = p1.copy()
pi = pi.reshape(-1, 2)
pf = pf.reshape(-1, 2)
if self.flow_source_geometry['type'] == 'circle':
pi = np.vstack([pi, self.flow_source_geometry['data']['center']])
pf = np.vstack([pf, self.flow_source_geometry['data']['center']])
return pi, pf
# -----------------------------------------------------------------------------
# Boundary flow
[docs]class NormalBoundaryFlow(object):
"""Parameters are documented in examples on the maing page (work in progress).
"""
def __init__(self, boundary_chord, boundary_point_pixel_spacing, boundary_intersection_lookahead):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
boundary_chord (int): number of points to use for determining normal vector to boundary chord
boundary_point_pixel_spacing (int): approximate pixel spacing between boundary points (used to simplify boundary)
boundary_intersection_lookahead (int): number of points to look ahead to determine target boundary intersections
"""
self.boundary_chord = boundary_chord
self.boundary_point_pixel_spacing = boundary_point_pixel_spacing
self.boundary_intersection_lookahead = boundary_intersection_lookahead
[docs] def __call__(self, img1, img2, boundary1=None, boundary2=None):
"""Extract feature points from source image using Luca Kanade sparse optical flow.
Args:
img1 (np.ndarray): source image (not used)
img2 (np.ndarray): target image (not used)
boundary1 (np.ndarray): list of boundary contour points from source image
boundary2 (np.ndarray): list of boundary contour points from target image
Returns:
p1 (np.ndarray): feature points in source image, shaped (N, 1, 2)
p2 (nd.ndarray): feature points in target image, shaped (N, 1, 2)
"""
# TODO: migrate to
# embed(colors='neutral')
# Extract points and compute center points and normals
temp1 = boundary1.reshape(-1, 2).astype(np.float)
temp2 = boundary2.reshape(-1, 2).astype(np.float)
if temp1.shape[0] > 25: # TODO: improve or parameterise threshold
boundary_chord = self.boundary_chord
else:
boundary_chord = 1
p1 = (temp1 + np.roll(temp1, 1, axis=0)) / 2
normals = np.roll((np.roll(temp1, -boundary_chord, axis=0) - np.roll(temp1, boundary_chord, axis=0)), 1, axis=1) * np.array([-1, 1])
normals = normals / np.sqrt((normals ** 2).sum(axis=1)).reshape(-1, 1)
# Simplify boundary uniformly
if ((p1.max(axis=0) - p1.min(axis=0)) > (self.boundary_point_pixel_spacing * 4)).any(): # TODO: improve or parameterise threshold
temp = self.boundary_point_pixel_spacing * (np.cumsum(np.sqrt(((temp1 - np.roll(temp1, 1, axis=0)) ** 2).sum(axis=1))) / self.boundary_point_pixel_spacing).round(0)
index = np.flatnonzero((temp - np.roll(temp, 1, axis=0)) > 0)
p1 = p1[index]
normals = normals[index]
# Find intersection with second boundary
i = np.sqrt(((p1[0] - temp2) ** 2).sum(axis=1)).argmin()
vectors = np.zeros(p1.shape)
# vectors[:] = np.NaN
for z, (p, n) in enumerate(zip(p1, normals)):
for j in range(i - self.boundary_intersection_lookahead, i + self.boundary_intersection_lookahead + 1):
j = j % temp2.shape[0]
try:
a, b = temp2[(j, (j + 1) % temp2.shape[0]), :]
s, t = np.linalg.solve(np.hstack([n.reshape(-1, 1), (a - b).reshape(-1, 1)]), (a - p).reshape(-1, 1)).flatten()
if t >= 0 and t <= 1 and s >= -5: # TODO: investigate why this condition isn't being satisfied
q = p + s * n
i = j
vectors[z] = (q - p) / 12
break
except np.linalg.LinAlgError:
pass
p2 = p1 + vectors
_, i1 = np.unique(p1, axis=0, return_index=True)
_, i2 = np.unique(p2, axis=0, return_index=True)
unique = sorted(set(i1).intersection(set(i2)))
p1 = p1[unique]
p2 = p2[unique]
return p1, p2 # TODO: store and visualize normals
# -----------------------------------------------------------------------------
# OpenCV feature + descriptor matcher tracking
[docs]class FeatureDescriptorMatching(SparseFlow):
"""Class for descriptor based feature matching and tracking.
"""
def __init__(self, extractor, matcher, ratio, speed_lower_bound, speed_upper_bound):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
extractor (cv2.Feature2D): any class implementing the detect and compute methods defined by cv2::Feature2D
matcher (cv2.DescriptorMatcher): any class implementing the knnMatch method defined by cv2::DescriptorMatcher
ratio (float): distance ratio used to filter candidates
speed_lower_bound (float): speed must be above lower bound to be accepted
speed_upper_bound (float): speed must be below upper bound to be accepted
"""
self.extractor = extractor
self.matcher = matcher
self.ratio = ratio
self.speed_lower_bound = speed_lower_bound
self.speed_upper_bound = speed_upper_bound
[docs] def __call__(self, img1, img2, boundary1=None, boundary2=None):
"""Compute feature tracking across a pair of images.
Args:
img1 (np.ndarray): source image
img2 (np.ndarray): target image
boundary1 (np.ndarray): list of boundary contour points from source image
boundary2 (np.ndarray): list of boundary contour points from target image
Returns:
p1 (np.ndarray): feature points in source image, shaped (N, 1, 2)
p2 (nd.ndarray): feature points in target image, shaped (N, 1, 2)
"""
kp1, des1 = self.extractor.detectAndCompute(img1, None)
kp2, des2 = self.extractor.detectAndCompute(img2, None)
matches = self.matcher.knnMatch(des1, des2, k=2)
p1 = np.empty((len(matches), 2))
p2 = np.empty((len(matches), 2))
mask = np.ones((len(matches),), dtype=bool)
for i, (a, b) in enumerate(matches):
if a.distance < self.ratio * b.distance:
x1, y1 = int(kp1[a.queryIdx].pt[0]), int(kp1[a.queryIdx].pt[1])
x2, y2 = int(kp2[a.trainIdx].pt[0]), int(kp2[a.trainIdx].pt[1])
speed = distance((x1, y1), (x2, y2))
angle = np.arctan2((y1 - y2), (x1 - x2))
if angle < 0:
angle += 2 * np.pi
flag = True
flag = flag and speed > self.speed_lower_bound
flag = flag and speed < self.speed_upper_bound
if boundary1 is not None:
flag = flag and cv2.pointPolygonTest(boundary1, (x1, y1), False) > 0
if boundary2 is not None:
flag = flag and cv2.pointPolygonTest(boundary2, (x2, y2), False) > 0
p1[i] = (x1, y1)
p2[i] = (x2, y2)
mask[i] = flag
return p1[mask], p2[mask]
# -----------------------------------------------------------------------------
# Dense optical flow
[docs]class FarnebackDenseOpticalFlow(object):
"""Class for Farneback dense optical flow.
"""
def __init__(self, pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, flags):
"""
Parameters are documented in examples on the maing page (work in progress).
Args:
pyr_scale (float): scale between layers
levels (int): number of layers
winsize (int): averaging window size
iterations (int): number of iterations algorithm performs at each level
poly_n (int): size of pixel neighbourhood used for polynomial expansion in each pixel
poly_sigma (float): standard deviation of Gaussian used for smoothing derivatives
flags (str): operation flags
"""
self.pyr_scale = pyr_scale
self.levels = levels
self.winsize = winsize
self.iterations = iterations
self.poly_n = poly_n
self.poly_sigma = poly_sigma
self.flags = flags
[docs] def __call__(self, img1, img2):
"""Generate flow vector field from source image using Farneback dense optical flow.
Args:
img1 (np.ndarray): source image
img2 (np.ndarray): target image
Returns:
u (np.ndarray): x component of velocity on x, y pixel meshgrid
v (np.ndarray): y component of velocity on x, y pixel meshgrid
"""
# Input frame preprocessing
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
# gray1 = cv2.UMat(gray1)
# gray2 = cv2.UMat(gray2)
# Compute optical flow
flow = cv2.calcOpticalFlowFarneback(
gray1,
gray2,
None,
self.pyr_scale,
self.levels,
self.winsize,
self.iterations,
self.poly_n,
self.poly_sigma,
self.flags
)
# flow = flow.get()
u = flow[..., 0]
v = flow[..., 1]
return u, v