"""Pipes for writing outputs.
"""
import cv2
import gzip
import json
import numpy as np
from pathlib import Path
from pubsub import pub
from lavaflow.pipes.core import AbstractPipe
from lavaflow.utils.io import NumpyJsonEncoder, NumpyLineEncoder
import logging
logger = logging.getLogger('lavaflow')
# -----------------------------------------------------------------------------
# Pipes for data outputs
[docs]class JsonWriterPipe(AbstractPipe):
"""Pipe for serializing each item in an iterable to a json in a directory.
"""
def __init__(self, directory, prefix, include=None, exclude=None, pretty=False):
"""Construct image writer pipe.
Args:
directory (str): output directory
prefix (str): output file prefix
include (list): specific keys to include (all other keys are excluded)
exclude (list): specific keys to exclude (ignored if included is specified)
pretty (bool): write pretty json format for readability
Returns:
pipe (JsonWriterPipe): pipe
"""
self.directory = Path(directory)
self.prefix = prefix
self.include = include if include is not None else []
self.exclude = exclude if include is not None else []
self.pretty = pretty
[docs] def start(self):
"""Initialize counter.
"""
self.counter = 0
self.directory.mkdir(parents=True, exist_ok=True)
[docs] def map(self, d):
"""Write image to directory.
Args:
d (dict): dictionary
"""
keys = set(d.keys())
if self.include:
keys = keys.intersection(self.include)
if self.exclude:
keys = keys - set(self.exclude)
with open(self.directory / f'{self.prefix}-{self.counter:05d}.json', 'w') as f:
if self.pretty:
f.write(json.dumps({k: d[k] for k in keys}, indent=2, cls=NumpyJsonEncoder))
else:
f.write(json.dumps({k: d[k] for k in keys}, separators=(',', ':'), cls=NumpyJsonEncoder))
self.counter += 1
return d
[docs] def stop(self):
"""Log counter.
"""
logger.info(f'{self.counter} json files written successfully')
pub.sendMessage("log", message=f'{self.directory} saved')
[docs]class JsonpWriterPipe(AbstractPipe):
"""Pipe for serializing each item in an iterable to a row in a jsonp.
"""
def __init__(self, file, include=None, exclude=None):
"""Construct image writer pipe.
Args:
file (str): output file name
include (list): specific keys to include (all other keys are excluded)
exclude (list): specific keys to exclude (ignored if included is specified)
Returns:
pipe (JsonWriterPipe): pipe
"""
self.file = Path(file)
self.include = include if include is not None else []
self.exclude = exclude if include is not None else []
[docs] def start(self):
"""Open file and initialize counter.
"""
self.counter = 0
self.file.parent.mkdir(parents=True, exist_ok=True)
self.f = open(self.file, 'w')
[docs] def map(self, d):
"""Write dictionary to directory.
Args:
d (dict): dictionary
"""
keys = set(d.keys())
if self.include:
keys = keys.intersection(self.include)
if self.exclude:
keys = keys - set(self.exclude)
self.f.write(json.dumps({k: d[k] for k in keys}, separators=(',', ':'), cls=NumpyJsonEncoder))
self.f.write("\n")
self.counter += 1
return d
[docs] def stop(self):
"""Close file and log counter.
"""
self.f.close()
logger.info(f'{self.counter} json lines written successfully')
pub.sendMessage("log", message=f'{self.file} saved')
# -----------------------------------------------------------------------------
# Pipes for numpy arrays
[docs]class NumpyWriterPipe(AbstractPipe):
"""Pipe for serializing numpy arrays.
"""
def __init__(self, file, separators=(',', ';', ':'), **kwargs):
"""Construct video writer pipe.
Args:
file (str): output file name
separators (list|tuple): separators to use for each axis in order
kwargs (dict): keyword arguments to pass to np.array2string
Returns:
pipe (VideoWriterPipe): pipe
"""
self.file = Path(file)
self.encoder = NumpyLineEncoder(separators, **kwargs)
self.gzipped = file.endswith(".gz")
[docs] def start(self):
"""Open file and initialize counter.
"""
self.counter = 0
self.file.parent.mkdir(parents=True, exist_ok=True)
if self.gzipped:
self.f = gzip.open(self.file, 'wb')
else:
self.f = open(self.file, 'w')
[docs] def map(self, x):
"""Write array to file.
Args:
x (np.ndarray): numpy array
"""
line = self.encoder.encode(x) + '\n'
if self.gzipped:
self.f.write(line.encode())
else:
self.f.write(line)
self.counter += 1
return x
[docs] def stop(self):
"""Close file and log counter.
"""
self.f.close()
logger.info(f'{self.counter} array lines written successfully')
pub.sendMessage("log", message=f'{self.file} saved')
[docs]class DictNumpyWriterPipe(NumpyWriterPipe):
"""Pipe for serializing an array keyed from each dictionary in an iterable.
"""
def __init__(self, file, key, separators=(',', ';', ':'), **kwargs):
"""Construct video writer pipe.
Args:
file (str): output file name
key (str): boundary key
separators (list|tuple): separators to use for each axis in order
kwargs (dict): keyword arguments to pass to np.array2string
Returns:
pipe (VideoWriterPipe): pipe
"""
super().__init__(file, separators, **kwargs)
self.key = key
[docs] def map(self, d):
"""Write image from an input key to directory.
Args:
d (dict): dictionary
"""
d[self.key] = super().map(d[self.key])
return d
# -----------------------------------------------------------------------------
# Pipes for image outputs
[docs]class ImageWriterPipe(AbstractPipe):
"""Pipe for writing each image in an iterable to a jpg in a directory.
"""
def __init__(self, directory, prefix):
"""Construct image writer pipe.
Args:
directory (str): output directory
prefix (str): output file prefix
Returns:
pipe (ImageWriterPipe): pipe
"""
self.directory = Path(directory)
self.prefix = prefix
[docs] def start(self):
"""Initialize counter.
"""
self.counter = 0
self.directory.mkdir(parents=True, exist_ok=True)
[docs] def map(self, img):
"""Write image to directory.
Args:
img (np.ndarray): image
"""
cv2.imwrite(str(self.directory / f'{self.prefix}-{self.counter:05d}.jpg'), img)
self.counter += 1
return img
[docs] def stop(self):
"""Log counter.
"""
logger.info(f'{self.counter} images written successfully')
pub.sendMessage("log", message=f'{self.directory} saved')
[docs]class DictImageWriterPipe(ImageWriterPipe):
"""Pipe for writing an image keyed from each dictionary in an iterable to a jpg in a directory.
"""
def __init__(self, directory, prefix, key):
"""Constructor.
Args:
directory (str): output directory
prefix (str): output file prefix
key (str): image key
Returns:
self (FuncPipe): pipe
"""
super().__init__(directory, prefix)
self.key = key
[docs] def map(self, d):
"""Write image from an input key to directory.
Args:
d (dict): dictionary
"""
d[self.key] = super().map(d[self.key])
return d
# -----------------------------------------------------------------------------
# Pipes for video outputs
[docs]class VideoWriterPipe(AbstractPipe):
"""Pipe for writing each image in an iterable to a video.
"""
def __init__(self, file, fps, res):
"""Construct video writer pipe.
Args:
file (str): output file name
fps (int): output video frames per second
res (tuple): output video size written as (w, h)
Returns:
pipe (VideoWriterPipe): pipe
"""
self.file = Path(file)
self.fps = float(fps)
self.res = tuple(res)
[docs] def start(self):
"""Open video writer.
"""
self.counter = 0
self.file.parent.mkdir(parents=True, exist_ok=True)
self.video_writer = cv2.VideoWriter(str(self.file), cv2.VideoWriter_fourcc(*'XVID'), self.fps, self.res)
[docs] def map(self, img):
"""Write image to video.
Args:
img (np.ndarray): image
"""
if img.shape[1::-1] != self.res:
print(img.shape[1::-1])
print(self.res)
img = cv2.resize(img, self.res, 0, 0, cv2.INTER_AREA)
self.video_writer.write(img)
self.counter += 1
return img
[docs] def stop(self):
"""Release video writer.
"""
self.video_writer.release()
logger.info(f'{self.counter} frames written successfully')
pub.sendMessage("log", message=f'{self.file} saved')
[docs]class DictVideoWriterPipe(VideoWriterPipe):
"""Pipe for writing an image keyed from each dictionary in an iterable to a video.
"""
def __init__(self, file, fps, res, key):
"""Constructor.
Args:
file (str): output file name
fps (int): output video frames per second
res (tuple): output video size written as (w, h)
key (str): image key
Returns:
self (FuncPipe): pipe
"""
super().__init__(file, fps, res)
self.key = key
[docs] def map(self, d):
"""Write image from an input key to directory.
Args:
d (dict): dictionary
"""
d[self.key] = super().map(d[self.key])
return d