Source code for lavaflow.boundary

"""Functions for boundary detection and masking.
"""

import cv2
import numpy as np

from abc import ABC, abstractmethod
from collections import deque

from IPython import embed
from pubsub import pub
from scipy.ndimage import gaussian_filter1d

import logging
logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Boundary detection common functionality

[docs]class BoundaryDetector(ABC): """Abstract class for boundary detection based on image masking. This class abstracts how the flow source geometry and flow plate geometry are defined. Any specific boundary detection method should extend this class and implement the ``__call__`` method. """ def __init__(self, res, flow_source_geometry, flow_plate_geometry): """ The flow source is used to filter out masks that do not include the flow source, and the flow plate is used to crop the mask down to the area of interest to remove any flow outside the boundary. Two flow source types are currently supported, circle geometry :: { type: 'circle', data: { center: [x, y], radius: r, } } and edge geometry :: edge_flow_source_geometry = { type: 'edge', data: { points: [[x, y], [x, y]], radius: r, } } The flow plate geometry is specified as a generic polygon only :: { type: 'polygon', data: { points: [[x, y], [x, y], ...], } } Args: res (tuple): image size written as ``(w, h)`` flow_source_geometry (dict): flow source geometry flow_plate_geometry (dict): flow plate geometry """ # Flow source geometry self.flow_source = flow_source_geometry['type'] if self.flow_source == 'circle': self.center = flow_source_geometry['data']['center'] self.radius = flow_source_geometry['data']['radius'] self.threshold = np.pi * self.radius ** 2 # threshold = area of circle temp = np.linspace(0, 2 * np.pi, 12, endpoint=False) self.source_boundary = (np.array(self.center) + self.radius * np.vstack([np.cos(temp), np.sin(temp)]).T).reshape(-1, 1, 2).astype(np.int32) elif self.flow_source == 'edge': self.edge = np.array(flow_source_geometry['data']['points']) self.center = np.mean(self.edge, axis=0) v = self.edge[1] - self.edge[0] self.threshold = np.linalg.norm(v) * flow_source_geometry['data']['width'] # threshold = edge length x edge width if v[0] == 0: # vertically aligned self.edge_alignment = 1 temp = np.array([flow_source_geometry['data']['radius'], 0]) self.source_boundary = np.vstack(self.edge - temp, np.flipud(self.edge + temp)).reshape(-1, 1, 2).astype(np.int32) elif v[1] == 0: # horizontally aligned self.edge_alignment = 0 temp = np.array(0, [flow_source_geometry['data']['radius']]) self.source_boundary = np.vstack(self.edge - temp, np.flipud(self.edge + temp)).reshape(-1, 1, 2).astype(np.int32) else: raise Exception('edge flow source points must be horizontally or vertically aligned') else: raise Exception('flow source type not supported') # Flow plate geometry self.flow_plate = flow_plate_geometry['type'] if self.flow_plate == 'polygon': flow_plate_boundary = np.array(flow_plate_geometry['data']['points']).reshape(1, -1, 2) self.flow_plate_mask = np.zeros(tuple(reversed(res))) cv2.fillPoly(self.flow_plate_mask, [flow_plate_boundary], (1, 1, 1)) else: raise Exception('flow plate type not supported')
[docs] @abstractmethod def __call__(self, img): """Map an image to a boundary. Args: img (np.ndarray): image """ pass
# ----------------------------------------------------------------------------- # Boundary detection based on image masking
[docs]class ColorMaskBoundaryDetector(BoundaryDetector): """Class for boundary detection based on color masking. This boundary detection method uses color masks, morphology transformations, and contour matching to find a contour that matches the boundary of the colored region with the largest area that contains the flow source geometry. Note that if the flow source geometry is temporarily occluded the boundary with the largest area will still be selected. """ def __init__(self, res, flow_source_geometry, flow_plate_geometry, color_mask): """ Args: res (tuple): image size written as ``(w, h)`` flow_source_geometry (dict): flow source geometry flow_plate_geometry (dict): flow plate geometry color_mask (ColorMask): color mask (already configured) """ super().__init__(res, flow_source_geometry, flow_plate_geometry) self.color_mask = color_mask
[docs] def __call__(self, img): """Map an image to a boundary. Args: img (np.ndarray): image """ # ------------------------------------------------------------------------- # (a) get color mask # embed(colors='neutral') mask = np.dstack([self.color_mask(img), self.flow_plate_mask]).min(axis=2).astype(np.uint8) # ------------------------------------------------------------------------- # (b) apply morphology to simplify mask mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((3, 3), np.uint8)) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((2, 2), np.uint8)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8)) # ------------------------------------------------------------------------- # (c) compute contours contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) # ------------------------------------------------------------------------- # (d) circle flow source candidate boundary if self.flow_source == 'circle': # TODO: need to improve how contours are filtered to ignore dynamic obstructions (e.g. hands hovering over the source point) if len(contours) > 0: temp = np.array([(i, cv2.pointPolygonTest(contour, self.center, True), cv2.contourArea(contour)) for i, contour in enumerate(contours)]) flag = (temp[:, 1] > 0) & (temp[:, 2] > self.threshold) if flag.any(): i = int(temp[flag, :][np.argmax(temp[flag, 2]), 0]) boundary = contours[i] else: i = np.argmax(temp[:, 2]) if temp[i, 2] > self.threshold: boundary = contours[i] else: return self.source_boundary else: return self.source_boundary # ------------------------------------------------------------------------- # (e) edge flow source candidate boundary elif self.flow_source == 'edge': if self.edge_alignment == 1: # vertically aligned def check_overlap(contour, edge): x = contour[:, :, 0] y = contour[:, :, 1] return int(x.min() < edge[0, 0] < x.max() and edge[:, 1].min() < y.max() and edge[:, 1].max() > y.min()) else: # horizontally aligned def check_overlap(contour, edge): x = contour[:, :, 0] y = contour[:, :, 1] return int(y.min() < edge[1, 1] < y.max() and edge[:, 0].min() < x.max() and edge[:, 0].max() > x.min()) if len(contours) > 0: temp = np.array([(i, cv2.pointPolygonTest(contour, self.center, True), cv2.contourArea(contour), check_overlap(contour, self.edge)) for i, contour in enumerate(contours)]) flag = (temp[:, 1] > 0) & (temp[:, 2] > self.threshold) & (temp[:, 3] == 1) if flag.any(): i = int(temp[flag, :][np.argmax(temp[flag, 2]), 0]) boundary = contours[i] else: i = np.argmax(temp[:, 2]) if temp[i, 2] > self.threshold: boundary = contours[i] else: return self.source_boundary else: return self.source_boundary # ------------------------------------------------------------------------- # (f) otherwise flow source not supported else: raise Exception('flow source not supported') # ------------------------------------------------------------------------- return boundary
# ----------------------------------------------------------------------------- # Boundary smoothing and expanding
[docs]class SmoothBoundary(object): """Class for boundary smoothing. This class smooths a window or sequence of boundaries using anti-aliasing by painting inside the boundaries with transparency and recomputing the boundary based on the overlay. """ def __init__(self, intensity=None, threshold=None): """ Args: intensity (float): intensity to apply to each frame in window, generally ``255 / N`` based on the number of boundaries in the window ``N`` threshold (float): threshold for smooth mask """ self.intensity = intensity self.threshold = threshold
[docs] def __call__(self, boundaries): """Apply smoothing to boundary. Args: boundaries (list): list of np.ndarray boundaries for smoothing Returns: boundary_smoothed (np.ndarray): smoothed boundary """ color = (self.intensity, self.intensity, self.intensity) min_coord = 0 max_coord = 0 for boundary in boundaries: min_coord = np.minimum(min_coord, boundary.min(axis=0)) max_coord = np.maximum(max_coord, boundary.max(axis=0)) res = tuple(reversed((max_coord - min_coord + 2).flatten())) mask = np.zeros(res) for boundary in boundaries: fill = np.zeros(res) cv2.fillPoly(fill, [boundary - (min_coord - 1)], color) mask += fill mask = mask.astype(np.uint8) mask[mask < self.threshold] = 0 contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) temp = np.array([cv2.contourArea(contour) for contour in contours]) if len(temp) > 0: boundary = contours[np.argmax(temp)] return boundary + (min_coord - 1) else: return boundaries[0]
[docs]class ExpandBoundary(object): """Class for boundary expanding. This class takes the union of a window or sequence of boundaries by painting inside the boundaries and recomputing the boundary based on the union of the overlay. """ def __init__(self): """ """ pass
[docs] def __call__(self, boundaries): """Apply expanding to boundary. Args: boundaries (list): list of np.ndarray boundaries for expanding Returns: boundary (np.ndarray): expanded boundary """ color = (1, 1, 1) min_coord = 0 max_coord = 0 for boundary in boundaries: min_coord = np.minimum(min_coord, boundary.min(axis=0)) max_coord = np.maximum(max_coord, boundary.max(axis=0)) res = tuple(reversed((max_coord - min_coord + 2).flatten())) mask = np.zeros(res) for boundary in boundaries: fill = np.zeros(res) cv2.fillPoly(fill, [boundary - (min_coord - 1)], color) mask += fill mask = mask.astype(np.uint8) mask[mask > 0] = 255 contours, _ = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) temp = np.array([cv2.contourArea(contour) for contour in contours]) boundary = contours[np.argmax(temp)] return boundary + (min_coord - 1)
# --------------------------------------------------------------------------------------------------------------------- # TODO: review the following # class BackgroundSubtractionBoundaryDetector(object): # # def __init__(self, res, flow_source_geometry, flow_plate_geometry, backsub_type, bounding_box, skip_frames, threshold=350, boundary_point_pixel_spacing=15, second_boundary_point_pixel_spacing=15, ignoring_lookback=24, ignoring_multiplier=20, ignoring_boundary_chord=1, max_ignoring_expansion=70, min_ignoring_expansion=15, bg_ignored_alpha=0.15, epsilon=0.5, epsilon_b=5): # """Construct class for boundary detection based on image masking. # Args: # res (tuple): image size written as (w, h) # flow_source_geometry (dict): flow source geometry, { type: 'circle', data: { center: [x, y], radius: r } } or { type: 'line', data: { points: [[x, y], [x, y]] } } # flow_plate_geometry (dict): flow plate geometry, { type: 'polygon', data: { points: [[x, y], [x, y], ...] } } # backsub_type (str): 'MOG2' currently only supported # bounding_box (List): list of tuples of coordinates specifying the bounding box # skip_frames (int): number of frames to skip at the start # threshold (int): threshold to use for the boundary detector # boundary_point_pixel_spacing # second_boundary_point_pixel_spacing # ignoring_lookback # ignoring_multiplier # ignoring_boundary_chord # max_ignoring_expansion # min_ignoring_expansion # bg_ignored_alpha # epsilon # epsilon_b # """ # # logger.info(f'initialising {self}') # # self.shape = tuple(reversed(res)) # # self.mask2 = np.zeros(self.shape).astype(np.uint8) # # self.flow_source = flow_source_geometry['type'] # if self.flow_source == 'circle': # self.center = flow_source_geometry['data']['center'] # self.radius = flow_source_geometry['data']['radius'] # self.mask2 = cv2.circle(self.mask2, self.center, self.radius, (255,), -1) # elif self.flow_source == 'edge': # self.edge = np.array(flow_source_geometry['data']['points']) # self.center = np.mean(self.edge, axis=0) # v = self.edge[1] - self.edge[0] # if v[0] == 0: # self.edge_alignment = 1 # vertically aligned # a = self.edge[0] - np.array([10, 0]) # b = self.edge[1] + np.array([10, 0]) # elif v[1] == 0: # self.edge_alignment = 0 # horizontally aligned # a = self.edge[0] - np.array([0, 10]) # b = self.edge[1] + np.array([0, 10]) # else: # raise Exception('edge flow source points must be horizontally or vertically aligned') # self.mask2 = cv2.rectangle(self.mask2, a, b, (255,), -1) # else: # raise Exception('flow source type not supported') # # # self.flow_plate = flow_plate_geometry['type'] # # if self.flow_plate == 'polygon': # # flow_plate_boundary = np.array(flow_plate_geometry['data']['points']).reshape(1, -1, 2) # # self.flow_plate_mask = np.zeros(tuple(reversed(res))) # # cv2.fillPoly(self.flow_plate_mask, [flow_plate_boundary], (1, 1, 1)) # # else: # # raise Exception('flow plate type not supported') # # # Initialise class parameters # self.bounding_box = np.array(bounding_box).reshape(-1, 2) # self.skip_frames = skip_frames # self.num_frames = 0 # # self.boundary_point_pixel_spacing = boundary_point_pixel_spacing # self.second_boundary_point_pixel_spacing = second_boundary_point_pixel_spacing # self.ignoring_lookback = ignoring_lookback # self.ignoring_multiplier = ignoring_multiplier # self.ignoring_boundary_chord = ignoring_boundary_chord # self.max_ignoring_expansion = max_ignoring_expansion # self.min_ignoring_expansion = min_ignoring_expansion # self.bg_ignored_alpha = bg_ignored_alpha # self.epsilon = epsilon # self.epsilon_b = epsilon_b # # # State variables # self.total_mask = np.zeros(self.shape).astype(np.uint8) # self.bg_ignored_area = np.zeros(self.shape, np.float64) # self.blank_frame = cv2.UMat(np.zeros_like(self.total_mask).astype(np.uint8)) # self.contour_history = deque() # # # Bounding box masks # self.mask1 = np.zeros(self.shape).astype(np.uint8) # cv2.fillPoly(self.mask1, [self.bounding_box], (255,)) # # # Initialise the OpenCV background subtraction algorithm # if backsub_type == 'MOG2': # self.back_sub = cv2.createBackgroundSubtractorMOG2(varThreshold=threshold, detectShadows=False) # self.back_sub.setBackgroundRatio(1) # else: # raise Exception('Backsub type not supported!') # # self.kernel_open = np.ones((7, 7), np.uint8) # self.kernel_close = np.ones((4, 4), np.uint8) # self.last_kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (40, 40)) # # def get_ignored_region(self, boundary, last_boundary, frame_to_annotate=None): # # Reshape boundaries into 2 column, N row array # temp_cb = boundary.reshape(-1, 2) # temp_lb = last_boundary.reshape(-1, 2) # # # Smooth both raw boundaries slightly # temp_cb = cv2.approxPolyDP(temp_cb, self.epsilon, True).reshape(-1, 2) # temp_lb = cv2.approxPolyDP(temp_lb, self.epsilon, True).reshape(-1, 2) # # # Calculate the normals at each corner on the boundary # normals = np.roll( # (np.roll(temp_lb, -self.ignoring_boundary_chord, axis=0) - np.roll(temp_lb, self.ignoring_boundary_chord, axis=0)), 1, # axis=1) * np.array([-1, 1]) # normals = normals / np.sqrt((normals ** 2).sum(axis=1)).reshape(-1, 1) # # # Remove points such that all points considered are now at least boundary_point_pixel_spacing apart # # on the old boundary # cumspacing = self.boundary_point_pixel_spacing * (np.cumsum( # np.sqrt(((temp_lb - np.roll(temp_lb, 1, axis=0)) ** 2).sum(axis=1))) / self.boundary_point_pixel_spacing).round(0) # index = np.flatnonzero((cumspacing - np.roll(cumspacing, 1, axis=0)) > 0) # temp_lb = temp_lb[index] # normals = normals[index] # # Do same for the current boundary # cumspacing2 = self.second_boundary_point_pixel_spacing * (np.cumsum( # np.sqrt(((temp_cb - np.roll(temp_cb, 1, axis=0)) ** 2).sum(axis=1))) / self.second_boundary_point_pixel_spacing).round(0) # index2 = np.flatnonzero((cumspacing2 - np.roll(cumspacing2, 1, axis=0)) > 0) # temp_cb = temp_cb[index2] # # # Return an empty contour if there are no points on either the old or current boundary # if temp_cb.shape[0] == 0 or temp_lb.shape[0] == 0: # return np.array([(0, 0)]).astype(np.int32) # # # Calculate the point on the current boundary closest to the first point on the old boundary # i = np.sqrt(((temp_lb[0] - temp_cb) ** 2).sum(axis=1)).argmin() # # # Create the new boundary the same shape as the old boundary and an array of vector lengths # # (1D) with same number of rows as the old boundary. # new_boundary = np.empty(temp_lb.shape) # vector_lengths = np.empty(temp_lb.shape[0]) # vector_lengths[:] = np.NaN # new_boundary[:] = np.NaN # # # For each point on the old boundary, consider a range of edges on the new boundary and see # # if the normal at that point intersects the edge; if so then compute the distance between # # the point and the edge along that normal vector. # for idx, (boundary_point, normal) in enumerate(zip(temp_lb, normals)): # for j in range(i - 30, i + 30 + 1): # j = j % temp_cb.shape[0] # # try: # old_pt1, old_pt2 = temp_cb[(j, (j + 1) % temp_cb.shape[0]), :] # normal_component, boundary_component = np.linalg.solve( # np.hstack([normal.reshape(-1, 1), (old_pt1 - old_pt2).reshape(-1, 1)]), # (old_pt1 - boundary_point).reshape(-1, 1) # ).flatten() # # if 0 <= boundary_component <= 1 and normal_component >= 0: # vector_lengths[idx] = np.clip( # normal_component * self.ignoring_multiplier, # self.min_ignoring_expansion, # self.max_ignoring_expansion # ) # # i = j # # break # # except: # # pass # # else: # # Even if no edge intersects, set i to the nearest point to keep the # # analyser on track # i = np.sqrt(((boundary_point - temp_cb) ** 2).sum(axis=1)).argmin() # # # Smooth vector lengths and make new boundary # nans = np.isnan(vector_lengths) # vector_lengths = vector_lengths[~nans].reshape(-1, 1) # new_boundary = new_boundary[~nans].reshape(-1, 2).astype(np.int32) # normals = normals[~nans].reshape(-1, 2) # temp_lb_filtered = temp_lb[~nans].reshape(-1, 2) # # logger.debug(np.reshape(vector_lengths, (1, -1))) # vector_lengths = gaussian_filter1d(vector_lengths.reshape(1, -1), 3, mode='wrap').reshape(-1, 1) # # logger.debug(np.reshape(vector_lengths, (1, -1))) # for i, (boundary_point, normal, vector_length) in enumerate(zip(temp_lb_filtered, normals, vector_lengths)): # new_boundary[i] = (boundary_point + normal * vector_length).astype(np.int32) # # new_boundary_smoothed = np.array([ # gaussian_filter1d(new_boundary[:, 0], 0.5, mode='wrap'), # gaussian_filter1d(new_boundary[:, 1], 0.5, mode='wrap') # ]).T # # logger.debug(new_boundary_smoothed) # # if frame_to_annotate is not None: # # Draw the contour points used # for point in temp_lb: # frame_to_annotate = cv2.circle(frame_to_annotate, point, 4, (255, 255, 255), -1) # # for point in temp_cb: # frame_to_annotate = cv2.circle(frame_to_annotate, point, 4, (0, 0, 0), -1) # # frame_to_annotate = cv2.circle(frame_to_annotate, temp_lb[0], 5, (0, 255, 255), -1) # # # Draw the normal vectors # # for (point, nb_point, normal) in zip(temp_lb_filtered, new_boundary, normals): # # frame_to_annotate = cv2.arrowedLine(frame_to_annotate, point, nb_point, (255, 255, 255), thickness=2) # # frame_to_annotate = cv2.arrowedLine(frame_to_annotate, point, (point + 15 * normal).astype(np.int32), (0, 0, 0), thickness=1) # # return new_boundary_smoothed, frame_to_annotate # # return new_boundary_smoothed # # def __call__(self, frame): # """Map an image to a boundary for the latest image. # """ # # if self.num_frames == 0: # # Initialise boundary detector on the first frame # self.back_sub.apply(cv2.bilateralFilter(frame, 5, 100, 100), None, 1) # self.mask2 = np.zeros(self.shape).astype(np.uint8) # if self.flow_source == 'circle': # self.mask2 = cv2.circle(self.mask2, self.center, self.radius, (255,), -1) # elif self.flow_source == 'edge': # v = self.edge[1] - self.edge[0] # if v[0] == 0: # self.edge_alignment = 1 # vertically aligned # a = self.edge[0] - np.array([10, 0]) # b = self.edge[1] + np.array([10, 0]) # elif v[1] == 0: # self.edge_alignment = 0 # horizontally aligned # a = self.edge[0] - np.array([0, 10]) # b = self.edge[1] + np.array([0, 10]) # else: # raise Exception('edge flow source points must be horizontally or vertically aligned') # self.mask2 = cv2.rectangle(self.mask2, a, b, (255,), -1) # self.mask1 = np.zeros(self.shape).astype(np.uint8) # cv2.fillPoly(self.mask1, [self.bounding_box], (255,)) # self.num_frames += 1 # return np.array([(0, 0)]) # elif self.num_frames < self.skip_frames: # # Give boundary detector some time to stabilise # self.num_frames += 1 # return np.array([(0, 0)]) # else: # self.num_frames += 1 # # Perform some image filtering on the frame # frameFiltered = cv2.bilateralFilter(frame, 5, 100, 100) # # # Create the foreground mask with the OpenCV background subtraction algorithm # fg_mask = cv2.UMat(self.blank_frame) # fg_mask = self.back_sub.apply(frameFiltered, cv2.UMat(fg_mask), 0) # # # Do image processing on the foreground mask # fg_mask = cv2.medianBlur(fg_mask, 11) # fg_mask = cv2.bitwise_and(fg_mask, self.mask1) # _, fg_mask = cv2.threshold(fg_mask, 127, 255, cv2.THRESH_BINARY) # fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, self.kernel_open) # fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, self.last_kernel_close) # # print(fg_mask.get().shape) # # print(self.total_mask.get().shape) # fg_mask = cv2.bitwise_or(fg_mask, self.mask2) # self.total_mask = cv2.bitwise_or(self.total_mask, fg_mask) # fg_mask = self.total_mask # # # Retrieve contour from foreground mask # annotated_frame = cv2.addWeighted(cv2.cvtColor(fg_mask, cv2.COLOR_GRAY2BGR), 0.1, frame, 0.9, 0).get() # annotated_frame = cv2.polylines(annotated_frame, [self.bounding_box], True, (0, 255, 0)) # contours, hierachy = cv2.findContours(fg_mask.get(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) # # main_contour = None # for contour in contours: # if cv2.pointPolygonTest(contour, self.center, False) == 1: # annotated_frame = cv2.polylines(annotated_frame, [contour], True, (0, 0, 255), thickness=3) # main_contour = contour # else: # annotated_frame = cv2.polylines(annotated_frame, [contour], True, (255, 255, 0), thickness=2) # # # Create background-updating mask # if self.num_frames - self.skip_frames < self.ignoring_lookback or main_contour is None: # fg_mask_dilated = cv2.morphologyEx(fg_mask, cv2.MORPH_DILATE, np.ones((50, 50), np.uint8)) # else: # annotated_frame = cv2.polylines(annotated_frame, [self.contour_history[0]], True, (0, 0, 255), thickness=2) # ignoring_boundary, annotated_frame = self.get_ignored_region(main_contour, self.contour_history[0], annotated_frame) # ignoring_boundary_mask = np.zeros_like(fg_mask.get(), np.uint8) # cv2.fillPoly(ignoring_boundary_mask, [ignoring_boundary], (255,)) # # fg_mask_dilated = cv2.morphologyEx(fg_mask, cv2.MORPH_DILATE, np.ones((self.min_ignoring_expansion, self.min_ignoring_expansion), np.uint8)) # fg_mask_dilated = cv2.bitwise_or(fg_mask_dilated, ignoring_boundary_mask) # # # Apply spatial and temporal smoothing to fg_mask_dilated # fg_mask_dilated = cv2.GaussianBlur(fg_mask_dilated.get(), (5, 5), 0) # fg_mask_dilated = cv2.morphologyEx(fg_mask_dilated, cv2.MORPH_CLOSE, np.ones((10, 10), np.uint8)) # self.bg_ignored_area = cv2.addWeighted(self.bg_ignored_area, 1 - self.bg_ignored_alpha, # fg_mask_dilated.astype(np.float64), self.bg_ignored_alpha, 0) # bg_ignored_area_uint8 = self.bg_ignored_area.astype(np.uint8) # _, bg_ignored_area_thres = cv2.threshold(bg_ignored_area_uint8, 127, 255, cv2.THRESH_BINARY) # # # Compute the new background to update MOG2 with # new_background = cv2.bitwise_and(frame, cv2.cvtColor(cv2.bitwise_not(bg_ignored_area_thres), cv2.COLOR_GRAY2BGR)) # old_background = cv2.bitwise_and(self.back_sub.getBackgroundImage(), # cv2.cvtColor(bg_ignored_area_thres, cv2.COLOR_GRAY2BGR)) # updated_background = cv2.bitwise_or(new_background, old_background) # self.back_sub.apply(updated_background, None, 0.02) # # # Draw contours on to the annotated frame # contours, _ = cv2.findContours(fg_mask_dilated, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) # for contour in contours: # annotated_frame = cv2.polylines(annotated_frame, [contour], True, (0, 255, 0), thickness=2) # # # Add new contour and pop old one from queue # self.contour_history.append(main_contour) # if len(self.contour_history) > self.ignoring_lookback: # self.contour_history.popleft() # # return main_contour if main_contour is not None else np.array([(0, 0)])