Source code for lavaflow.pipes.boundary

"""Pipes for improving boundary detection and masking.
"""

import cv2
import numpy as np

from lavaflow.pipes.core import AbstractPipe

import logging
logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Pipes to improve boundary detection over a window

[docs]class SmoothBoundaryWindowPipe(AbstractPipe): def __init__(self, x, y, intensity, threshold): """Constructor. Args: x (str): input key y (str): output key intensity (float): intensity to apply to each frame in window, e.g. 255 / N threshold (float): threshold for smooth mask Returns: self (ImagePipe): image pipe """ self.x = x self.y = y self.intensity = intensity self.threshold = threshold
[docs] def map(self, window): """Map. Args: window (list): window of items Returns: d (dict): item with smooth boundary applied """ N = len(window) i = int(N / 2) color = (self.intensity, self.intensity, self.intensity) resmin = window[0][self.x].min(axis=0) resmax = window[0][self.x].max(axis=0) for d in window: resmin = np.minimum(resmin, d[self.x].min(axis=0)) resmax = np.maximum(resmax, d[self.x].max(axis=0)) res = tuple(reversed((resmax - resmin + 2).flatten())) mask = np.zeros(res) for d in window: fill = np.zeros(res) cv2.fillPoly(fill, [d[self.x] - (resmin - 1)], color) mask += fill mask = mask.astype(np.uint8) mask[mask < self.threshold] = 0 contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) temp = np.array([cv2.contourArea(contour) for contour in contours]) boundary = contours[np.argmax(temp)] d = window[i] d[self.y] = boundary + (resmin - 1) return d
[docs]class ExpandBoundaryWindowPipe(AbstractPipe): def __init__(self, x, margin=10): """Constructor. Args: x (str): boundary key to be expanded Returns: self (ImagePipe): image pipe """ self.x = x
[docs] def map(self, window): """Map. Args: window (list): window of items Returns: d (dict): item with expanded smooth boundary """ color = (1, 1, 1) resmin = window[0][self.x].min(axis=0) resmax = window[0][self.x].max(axis=0) for d in window: resmin = np.minimum(resmin, d[self.x].min(axis=0)) resmax = np.maximum(resmax, d[self.x].max(axis=0)) res = tuple(reversed((resmax - resmin + 2).flatten())) mask = np.zeros(res) for d in window: fill = np.zeros(res) cv2.fillPoly(fill, [d[self.x] - (resmin - 1)], color) mask += fill mask = mask.astype(np.uint8) mask[mask > 0] = 255 contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) temp = np.array([cv2.contourArea(contour) for contour in contours]) boundary = contours[np.argmax(temp)] d = window[-1] d[self.x] = boundary + (resmin - 1) return d