Source code for lavaflow.pipelines

"""Functions for boundary detection and masking.
"""

import cv2
import datetime
import numpy as np

from abc import ABC, abstractmethod
from collections import deque
from scipy.ndimage import gaussian_filter1d

import logging

from lavaflow.pipes.core import Pipe, DictPipe, DictFuncPipe, ConsumeSink
from lavaflow.processing import LensCorrector, Resizer, UnmaskSharpener, \
  PerspectiveTransformer, BrightnessAndContrastAdjuster
from lavaflow.streams import VideoStream
from lavaflow.utils.core import compose

logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Pipeline common functionality

[docs]class Pipeline(ABC): """Generic pipeline class for streaming images for processing and modeling. """ def __init__(self, config): """ Args: config: config dictionary """ self.config = config self.timestamp = datetime.datetime.now() self.timestamp_formatted = self.timestamp.strftime('%Y%m%d%H%M%S')
[docs] @abstractmethod def pipe(self): """Construct and return pipe. Returns: pipe (Pipe): pipe """ pass
[docs] @abstractmethod def consume(self): """Construct and run pipe for entire input stream and generate output specified in config. """ pass
# ----------------------------------------------------------------------------- # Pipeline with generic image processing
[docs]class VideoStreamPipeline(Pipeline): """Class for streaming frames of video from a file using iterables to consume less memory. """ visualizations = [ "raw", "raw_transformed", "raw_processed", ] def __init__(self, config): """ Args: config (dict): dictionary config, see ``sparse.optical.flow.example`` """ super().__init__(config) # Modify config xmin, ymin = np.array(self.config.video.processing.perspective.dst).min(axis=0) xmax, ymax = np.array(self.config.video.processing.perspective.dst).max(axis=0) self.config.video.processing.size_transformed = [xmax - xmin, ymax - ymin] # Helpers transforms = [ Resizer(self.config.video.processing.size), LensCorrector(**self.config.video.processing.profile), PerspectiveTransformer(**self.config.video.processing.perspective), ] self.transforms = compose(transforms) processors = [ BrightnessAndContrastAdjuster( self.config.video.processing.brightness, self.config.video.processing.contrast, ), UnmaskSharpener( self.config.video.processing.sharpness, self.config.video.processing.radius, self.config.video.processing.spread, ), ] self.processors = compose(processors)
[docs] def get_video_pipe(self, video): """Construct and return pipe with video transformation and processing only. Args: video (dict): video config Returns: pipe (Pipe): pipe with video transformation and processing only """ pipe = ( Pipe(VideoStream(**video)) | DictPipe(['raw', 'frame']) | DictFuncPipe(self.transforms, 'raw', 'raw_transformed') | DictFuncPipe(self.processors, 'raw_transformed', 'raw_processed') ) return pipe
[docs] def pipe(self): """Construct and return pipe. Returns: pipe (Pipe): pipe """ return self.get_video_pipe(self.config.video.stream)
[docs] def consume(self): """Construct and run pipe for entire video stream and generate output specified in config. """ return self.get_video_pipe(self.config.video.stream) | ConsumeSink()
[docs] def preview(self, file, index): """Construct and run pipe for a single frame of video. Args: file (str): video file index (int): frame index for generating preview Returns: pipe (Pipe): pipe configured to run for a single image only """ video = { "file": file, "frame_seek": index, "step_size": 1, "max_steps": 1, } return self.get_video_pipe(video)