Source code for lavaflow.pipes.io

"""Pipes for writing outputs.
"""

import cv2
import gzip
import json
import numpy as np

from pathlib import Path

from pubsub import pub

from lavaflow.pipes.core import AbstractPipe
from lavaflow.utils.io import NumpyJsonEncoder, NumpyLineEncoder

import logging
logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Pipes for data outputs

[docs]class JsonWriterPipe(AbstractPipe): """Pipe for serializing each item in an iterable to a json in a directory. """ def __init__(self, directory, prefix, include=None, exclude=None, pretty=False): """Construct image writer pipe. Args: directory (str): output directory prefix (str): output file prefix include (list): specific keys to include (all other keys are excluded) exclude (list): specific keys to exclude (ignored if included is specified) pretty (bool): write pretty json format for readability Returns: pipe (JsonWriterPipe): pipe """ self.directory = Path(directory) self.prefix = prefix self.include = include if include is not None else [] self.exclude = exclude if include is not None else [] self.pretty = pretty
[docs] def start(self): """Initialize counter. """ self.counter = 0 self.directory.mkdir(parents=True, exist_ok=True)
[docs] def map(self, d): """Write image to directory. Args: d (dict): dictionary """ keys = set(d.keys()) if self.include: keys = keys.intersection(self.include) if self.exclude: keys = keys - set(self.exclude) with open(self.directory / f'{self.prefix}-{self.counter:05d}.json', 'w') as f: if self.pretty: f.write(json.dumps({k: d[k] for k in keys}, indent=2, cls=NumpyJsonEncoder)) else: f.write(json.dumps({k: d[k] for k in keys}, separators=(',', ':'), cls=NumpyJsonEncoder)) self.counter += 1 return d
[docs] def stop(self): """Log counter. """ logger.info(f'{self.counter} json files written successfully') pub.sendMessage("log", message=f'{self.directory} saved')
[docs]class JsonpWriterPipe(AbstractPipe): """Pipe for serializing each item in an iterable to a row in a jsonp. """ def __init__(self, file, include=None, exclude=None): """Construct image writer pipe. Args: file (str): output file name include (list): specific keys to include (all other keys are excluded) exclude (list): specific keys to exclude (ignored if included is specified) Returns: pipe (JsonWriterPipe): pipe """ self.file = Path(file) self.include = include if include is not None else [] self.exclude = exclude if include is not None else []
[docs] def start(self): """Open file and initialize counter. """ self.counter = 0 self.file.parent.mkdir(parents=True, exist_ok=True) self.f = open(self.file, 'w')
[docs] def map(self, d): """Write dictionary to directory. Args: d (dict): dictionary """ keys = set(d.keys()) if self.include: keys = keys.intersection(self.include) if self.exclude: keys = keys - set(self.exclude) self.f.write(json.dumps({k: d[k] for k in keys}, separators=(',', ':'), cls=NumpyJsonEncoder)) self.f.write("\n") self.counter += 1 return d
[docs] def stop(self): """Close file and log counter. """ self.f.close() logger.info(f'{self.counter} json lines written successfully') pub.sendMessage("log", message=f'{self.file} saved')
# ----------------------------------------------------------------------------- # Pipes for numpy arrays
[docs]class NumpyWriterPipe(AbstractPipe): """Pipe for serializing numpy arrays. """ def __init__(self, file, separators=(',', ';', ':'), **kwargs): """Construct video writer pipe. Args: file (str): output file name separators (list|tuple): separators to use for each axis in order kwargs (dict): keyword arguments to pass to np.array2string Returns: pipe (VideoWriterPipe): pipe """ self.file = Path(file) self.encoder = NumpyLineEncoder(separators, **kwargs) self.gzipped = file.endswith(".gz")
[docs] def start(self): """Open file and initialize counter. """ self.counter = 0 self.file.parent.mkdir(parents=True, exist_ok=True) if self.gzipped: self.f = gzip.open(self.file, 'wb') else: self.f = open(self.file, 'w')
[docs] def map(self, x): """Write array to file. Args: x (np.ndarray): numpy array """ line = self.encoder.encode(x) + '\n' if self.gzipped: self.f.write(line.encode()) else: self.f.write(line) self.counter += 1 return x
[docs] def stop(self): """Close file and log counter. """ self.f.close() logger.info(f'{self.counter} array lines written successfully') pub.sendMessage("log", message=f'{self.file} saved')
[docs]class DictNumpyWriterPipe(NumpyWriterPipe): """Pipe for serializing an array keyed from each dictionary in an iterable. """ def __init__(self, file, key, separators=(',', ';', ':'), **kwargs): """Construct video writer pipe. Args: file (str): output file name key (str): boundary key separators (list|tuple): separators to use for each axis in order kwargs (dict): keyword arguments to pass to np.array2string Returns: pipe (VideoWriterPipe): pipe """ super().__init__(file, separators, **kwargs) self.key = key
[docs] def map(self, d): """Write image from an input key to directory. Args: d (dict): dictionary """ d[self.key] = super().map(d[self.key]) return d
# ----------------------------------------------------------------------------- # Pipes for image outputs
[docs]class ImageWriterPipe(AbstractPipe): """Pipe for writing each image in an iterable to a jpg in a directory. """ def __init__(self, directory, prefix): """Construct image writer pipe. Args: directory (str): output directory prefix (str): output file prefix Returns: pipe (ImageWriterPipe): pipe """ self.directory = Path(directory) self.prefix = prefix
[docs] def start(self): """Initialize counter. """ self.counter = 0 self.directory.mkdir(parents=True, exist_ok=True)
[docs] def map(self, img): """Write image to directory. Args: img (np.ndarray): image """ cv2.imwrite(str(self.directory / f'{self.prefix}-{self.counter:05d}.jpg'), img) self.counter += 1 return img
[docs] def stop(self): """Log counter. """ logger.info(f'{self.counter} images written successfully') pub.sendMessage("log", message=f'{self.directory} saved')
[docs]class DictImageWriterPipe(ImageWriterPipe): """Pipe for writing an image keyed from each dictionary in an iterable to a jpg in a directory. """ def __init__(self, directory, prefix, key): """Constructor. Args: directory (str): output directory prefix (str): output file prefix key (str): image key Returns: self (FuncPipe): pipe """ super().__init__(directory, prefix) self.key = key
[docs] def map(self, d): """Write image from an input key to directory. Args: d (dict): dictionary """ d[self.key] = super().map(d[self.key]) return d
# ----------------------------------------------------------------------------- # Pipes for video outputs
[docs]class VideoWriterPipe(AbstractPipe): """Pipe for writing each image in an iterable to a video. """ def __init__(self, file, fps, res): """Construct video writer pipe. Args: file (str): output file name fps (int): output video frames per second res (tuple): output video size written as (w, h) Returns: pipe (VideoWriterPipe): pipe """ self.file = Path(file) self.fps = float(fps) self.res = tuple(res)
[docs] def start(self): """Open video writer. """ self.counter = 0 self.file.parent.mkdir(parents=True, exist_ok=True) self.video_writer = cv2.VideoWriter(str(self.file), cv2.VideoWriter_fourcc(*'XVID'), self.fps, self.res)
[docs] def map(self, img): """Write image to video. Args: img (np.ndarray): image """ if img.shape[1::-1] != self.res: print(img.shape[1::-1]) print(self.res) img = cv2.resize(img, self.res, 0, 0, cv2.INTER_AREA) self.video_writer.write(img) self.counter += 1 return img
[docs] def stop(self): """Release video writer. """ self.video_writer.release() logger.info(f'{self.counter} frames written successfully') pub.sendMessage("log", message=f'{self.file} saved')
[docs]class DictVideoWriterPipe(VideoWriterPipe): """Pipe for writing an image keyed from each dictionary in an iterable to a video. """ def __init__(self, file, fps, res, key): """Constructor. Args: file (str): output file name fps (int): output video frames per second res (tuple): output video size written as (w, h) key (str): image key Returns: self (FuncPipe): pipe """ super().__init__(file, fps, res) self.key = key
[docs] def map(self, d): """Write image from an input key to directory. Args: d (dict): dictionary """ d[self.key] = super().map(d[self.key]) return d