"""Pipes for improving boundary detection and masking.
"""
import cv2
import numpy as np
from lavaflow.pipes.core import AbstractPipe
import logging
logger = logging.getLogger('lavaflow')
# -----------------------------------------------------------------------------
# Pipes to improve boundary detection over a window
[docs]class SmoothBoundaryWindowPipe(AbstractPipe):
def __init__(self, x, y, intensity, threshold):
"""Constructor.
Args:
x (str): input key
y (str): output key
intensity (float): intensity to apply to each frame in window, e.g. 255 / N
threshold (float): threshold for smooth mask
Returns:
self (ImagePipe): image pipe
"""
self.x = x
self.y = y
self.intensity = intensity
self.threshold = threshold
[docs] def map(self, window):
"""Map.
Args:
window (list): window of items
Returns:
d (dict): item with smooth boundary applied
"""
N = len(window)
i = int(N / 2)
color = (self.intensity, self.intensity, self.intensity)
resmin = window[0][self.x].min(axis=0)
resmax = window[0][self.x].max(axis=0)
for d in window:
resmin = np.minimum(resmin, d[self.x].min(axis=0))
resmax = np.maximum(resmax, d[self.x].max(axis=0))
res = tuple(reversed((resmax - resmin + 2).flatten()))
mask = np.zeros(res)
for d in window:
fill = np.zeros(res)
cv2.fillPoly(fill, [d[self.x] - (resmin - 1)], color)
mask += fill
mask = mask.astype(np.uint8)
mask[mask < self.threshold] = 0
contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
temp = np.array([cv2.contourArea(contour) for contour in contours])
boundary = contours[np.argmax(temp)]
d = window[i]
d[self.y] = boundary + (resmin - 1)
return d
[docs]class ExpandBoundaryWindowPipe(AbstractPipe):
def __init__(self, x, margin=10):
"""Constructor.
Args:
x (str): boundary key to be expanded
Returns:
self (ImagePipe): image pipe
"""
self.x = x
[docs] def map(self, window):
"""Map.
Args:
window (list): window of items
Returns:
d (dict): item with expanded smooth boundary
"""
color = (1, 1, 1)
resmin = window[0][self.x].min(axis=0)
resmax = window[0][self.x].max(axis=0)
for d in window:
resmin = np.minimum(resmin, d[self.x].min(axis=0))
resmax = np.maximum(resmax, d[self.x].max(axis=0))
res = tuple(reversed((resmax - resmin + 2).flatten()))
mask = np.zeros(res)
for d in window:
fill = np.zeros(res)
cv2.fillPoly(fill, [d[self.x] - (resmin - 1)], color)
mask += fill
mask = mask.astype(np.uint8)
mask[mask > 0] = 255
contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
temp = np.array([cv2.contourArea(contour) for contour in contours])
boundary = contours[np.argmax(temp)]
d = window[-1]
d[self.x] = boundary + (resmin - 1)
return d