"""Functions for boundary detection and masking.
"""
import cv2
import datetime
import numpy as np
from abc import ABC, abstractmethod
from collections import deque
from scipy.ndimage import gaussian_filter1d
import logging
from lavaflow.pipes.core import Pipe, DictPipe, DictFuncPipe, ConsumeSink
from lavaflow.processing import LensCorrector, Resizer, UnmaskSharpener, \
PerspectiveTransformer, BrightnessAndContrastAdjuster
from lavaflow.streams import VideoStream
from lavaflow.utils.core import compose
logger = logging.getLogger('lavaflow')
# -----------------------------------------------------------------------------
# Pipeline common functionality
[docs]class Pipeline(ABC):
"""Generic pipeline class for streaming images for processing and modeling.
"""
def __init__(self, config):
"""
Args:
config: config dictionary
"""
self.config = config
self.timestamp = datetime.datetime.now()
self.timestamp_formatted = self.timestamp.strftime('%Y%m%d%H%M%S')
[docs] @abstractmethod
def pipe(self):
"""Construct and return pipe.
Returns:
pipe (Pipe): pipe
"""
pass
[docs] @abstractmethod
def consume(self):
"""Construct and run pipe for entire input stream and generate output specified in config.
"""
pass
# -----------------------------------------------------------------------------
# Pipeline with generic image processing
[docs]class VideoStreamPipeline(Pipeline):
"""Class for streaming frames of video from a file using iterables to consume less memory.
"""
visualizations = [
"raw",
"raw_transformed",
"raw_processed",
]
def __init__(self, config):
"""
Args:
config (dict): dictionary config, see ``sparse.optical.flow.example``
"""
super().__init__(config)
# Modify config
xmin, ymin = np.array(self.config.video.processing.perspective.dst).min(axis=0)
xmax, ymax = np.array(self.config.video.processing.perspective.dst).max(axis=0)
self.config.video.processing.size_transformed = [xmax - xmin, ymax - ymin]
# Helpers
transforms = [
Resizer(self.config.video.processing.size),
LensCorrector(**self.config.video.processing.profile),
PerspectiveTransformer(**self.config.video.processing.perspective),
]
self.transforms = compose(transforms)
processors = [
BrightnessAndContrastAdjuster(
self.config.video.processing.brightness,
self.config.video.processing.contrast,
),
UnmaskSharpener(
self.config.video.processing.sharpness,
self.config.video.processing.radius,
self.config.video.processing.spread,
),
]
self.processors = compose(processors)
[docs] def get_video_pipe(self, video):
"""Construct and return pipe with video transformation and processing only.
Args:
video (dict): video config
Returns:
pipe (Pipe): pipe with video transformation and processing only
"""
pipe = (
Pipe(VideoStream(**video)) |
DictPipe(['raw', 'frame']) |
DictFuncPipe(self.transforms, 'raw', 'raw_transformed') |
DictFuncPipe(self.processors, 'raw_transformed', 'raw_processed')
)
return pipe
[docs] def pipe(self):
"""Construct and return pipe.
Returns:
pipe (Pipe): pipe
"""
return self.get_video_pipe(self.config.video.stream)
[docs] def consume(self):
"""Construct and run pipe for entire video stream and generate output specified in config.
"""
return self.get_video_pipe(self.config.video.stream) | ConsumeSink()
[docs] def preview(self, file, index):
"""Construct and run pipe for a single frame of video.
Args:
file (str): video file
index (int): frame index for generating preview
Returns:
pipe (Pipe): pipe configured to run for a single image only
"""
video = {
"file": file,
"frame_seek": index,
"step_size": 1,
"max_steps": 1,
}
return self.get_video_pipe(video)