"""Core pipe functionality.
"""
import itertools
import collections
import pickle
from abc import ABC
from pathlib import Path
from typing import Any
from lavaflow.utils.core import compose, dictify, windowify
import logging
logger = logging.getLogger('lavaflow')
# -----------------------------------------------------------------------------
# Pipes
[docs]class AbstractPipe(ABC):
"""Pipe object for connecting map and filter type operations.
Subclasses can override map or filter for convenience or __next__ for flexibility.
"""
[docs] def __iter__(self):
"""Return iterator.
Returns:
self (Pipe): pipe
"""
return self
[docs] def __next__(self):
"""Iterate source iterator.
Returns:
v (object): result from source iterator
"""
if not hasattr(self, '_started'):
self.start()
setattr(self, '_started', 1)
try:
v = next(self.source)
if self.filter(v):
return self.map(v)
except StopIteration:
self.stop()
setattr(self, '_stopped', 1)
raise StopIteration()
[docs] def __or__(self, other):
"""Connect pipes.
Args:
other (AbstractPipe): downstream pipe
Returns:
other (AbstractPipe): downstream pipe with source set to upstream pipe
"""
if isinstance(other, AbstractPipe):
other.link(self)
return other
# Pass control to the sink to consume pipe
if isinstance(other, AbstractSink):
return other.__ror__(self)
raise Exception(f'{type(other)} not supported')
[docs] def link(self, source):
"""Assign the pipe source.
Args:
source (AbstractPipe): upstream pipe or iterator
"""
self.source = source
[docs] def start(self):
"""Code to run before consuming source iterator.
"""
pass
[docs] def filter(self, v):
"""Filter which iterator values are mapped.
Args:
v (object): iterator value
Returns:
flag (bool): flag
"""
return True
[docs] def map(self, v):
"""Map iterator value.
Args:
v (object): iterator value
Returns:
v (object): iterator value with map applied
"""
return v
[docs] def stop(self):
"""Code to run after consuming source iterator.
"""
pass
[docs]class Pipe(AbstractPipe):
"""Pipe object for wrapping any iterable.
"""
def __init__(self, source=[]):
"""Constructor.
Args:
source (iterable): pipe source
Returns:
self (Pipe): pipe
"""
self.source = iter(source)
[docs]class WindowPipe(AbstractPipe):
"""Pipe for collecting a rolling window of an interable.
"""
def __init__(self, window_size):
"""Constructor.
Args:
window_size (int): window size
Returns:
self (WindowPipe): window pipe
"""
self.window_size = window_size
self.window_pointer = 0
self.window = []
[docs] def __next__(self):
"""Collect a rolling window of an iterable (ignores filter and map).
Returns:
window (list): list of items from iterable
"""
if len(self.window) < self.window_size:
try:
while len(self.window) < self.window_size:
self.window.append(next(self.source))
self.window_pointer = (self.window_pointer + 1) % self.window_size
except StopIteration:
raise ValueError(f'window size ({self.window_size}) > number of elements in source iterator ({len(self.window)})')
else:
self.window[self.window_pointer] = next(self.source)
self.window_pointer = (self.window_pointer + 1) % self.window_size
return self.window[self.window_pointer:self.window_size] + self.window[0:self.window_pointer]
[docs]class SelectPipe(AbstractPipe):
"""Pipe for selecting an item out of a rolling window.
"""
def __init__(self, index=-1):
"""Constructor.
Args:
index (int): item index
Returns:
self (SelectPipe): linear pipe
"""
self.index = index
[docs] def __next__(self):
"""Select an item out of a rolling window (ignores filter and map).
Returns:
item (object): item
"""
return next(self.source)[self.index]
[docs]class SplitPipe(AbstractPipe):
"""Pipe for splitting a pipe into multiple pipes.
"""
def __init__(self, pipes):
"""Constructor.
Args:
pipes (list): list of pipes
Returns:
self (SplitPipe): pipe
"""
self.pipes = pipes
[docs] def __next__(self):
"""Iterate source iterator.
Returns:
v (object): result from source iterator
"""
temp = [next(pipe) for pipe in self.pipes]
if self.filter(temp):
return self.map(temp)
[docs] def link(self, source):
"""Assign the pipe source.
Args:
source (Pipe): upstream pipe or iterator
"""
self.sources = itertools.tee(source, len(self.pipes))
for source, pipe in zip(self.sources, self.pipes):
pipe.source = source
# -----------------------------------------------------------------------------
# Pipes based on dictionary processing
[docs]class DictPipe(AbstractPipe):
"""Pipe for wrapping each item in an iterable with a dictionary.
"""
def __init__(self, key, constants={}):
"""Constructor.
Args:
key (str|list): key for obj
constants (dict): constant metadata to insert into the dictionary
Returns:
self (DictPipe): dictionary pipe
"""
self.key = key
self.constants = constants
self.counter = 0
[docs] def map(self, v: Any):
"""Wrap item with a dictionary.
Args:
v (object): value to wrap
Returns:
d (dict): dictionary
"""
d = {
'counter': self.counter,
**self.constants,
}
if isinstance(self.key, list):
if not hasattr(v, '__iter__') and hasattr(v, '__len__') and len(v) == len(self.key):
raise ValueError(f"item must have the same length as DictPipe.key = {self.key}")
else:
for k, x in zip(self.key, v):
d[k] = x
else:
d[self.key] = v
self.counter += 1
return d
[docs]class FuncPipe(AbstractPipe):
"""Pipe for applying a function to each item in an iterable.
"""
def __init__(self, f):
"""Constructor.
Args:
f (callable|list): callable function or list of functions to apply in sequence
Returns:
self (FuncPipe): pipe
"""
if callable(f):
self.f = f
elif isinstance(f, list):
self.f = compose(f)
else:
raise Exception(f'{type(f)} not supported')
[docs] def map(self, v):
"""Apply function to item.
Args:
v (object): item
Returns:
v (object): item with function applied
"""
return self.f(v)
[docs]class DictFuncPipe(FuncPipe):
"""Pipe for applying a function to each dictionary in an iterable to map an input key to an output key.
"""
def __init__(self, f, x, y):
"""Constructor.
Args:
f (callable|list): callable function or list of functions to apply in sequence
x (str|list): input key(s)
y (str|list): output key(s)
Returns:
self (FuncPipe): pipe
"""
super().__init__(f)
self.f = dictify(self.f, x, y)
[docs]class WindowDictFuncPipe(FuncPipe):
"""Pipe for applying a function to each dictionary in an iterable to map an input key to an output key.
"""
def __init__(self, f, i, x, j, y, chain_args):
"""Constructor.
Args:
f (callable|list): callable function or list of functions to apply in sequence
i (int|list): input index(es)
x (str|list): input key(s)
j (int|list): output index(es)
y (str|list): output key(s)
chain_args (bool): flatten inputs
Returns:
self (FuncPipe): pipe
"""
super().__init__(f)
self.f = windowify(self.f, i, x, j, y, chain_args)
# -----------------------------------------------------------------------------
# Pipes for outputs
[docs]class PickleWriterPipe(AbstractPipe):
"""Pipe for serializing each item in an iterable to a pickle file in a directory.
"""
def __init__(self, directory, prefix):
"""Construct image writer pipe.
Args:
directory (str): output directory
prefix (str): output file prefix
Returns:
pipe (PickleWriterPipe): pipe
"""
self.directory = Path(directory)
self.prefix = prefix
[docs] def start(self):
"""Initialize counter.
"""
self.counter = 0
self.directory.mkdir(parents=True, exist_ok=True)
[docs] def map(self, v):
"""Write pickle file to directory.
Args:
v (object): iterator value
"""
with open(self.directory / f'{self.prefix}-{self.counter:05d}.pk', 'w') as f:
pickle.dump(v, f)
self.counter += 1
return v
[docs] def stop(self):
"""Log counter.
"""
logger.info(f'{self.counter} pickle files written successfully')
# -----------------------------------------------------------------------------
# Sinks
[docs]class AbstractSink(ABC):
"""Sink object for consuming upstream pipe.
Subclasses can override process, init, and post for convenience or __ror__ for flexibility.
"""
def __ror__(self, other):
"""Consume upstream pipe.
Args:
other (Pipe): upstream pipe
"""
self.init()
for v in other:
self.process(v)
self.post()
[docs] def init(self):
"""Code to run before processing upstream pipe (optional).
"""
pass
[docs] def post(self):
"""Code to run before processing upstream pipe (optional).
"""
pass
[docs] def process(self, v):
"""Process upstream pipe value.
Args:
v (object): upstream pipe value
"""
pass
[docs]class ConsumeSink(AbstractSink):
"""Sink for consuming upstream pipe without collecting.
"""
def __init__(self):
"""Constructor.
Returns:
self (ConsumeSink): sink
"""
pass
def __ror__(self, other):
"""Consume upstream pipe.
Args:
other (Pipe): upstream pipe
"""
collections.deque(other, maxlen=0)
[docs]class CollectSink(AbstractSink):
"""Sink for collecting upstream pipe into container.
"""
def __init__(self, container=list):
"""Constructor.
Args:
container (type): container type (default: list)
Returns:
self (CollectSink): sink
"""
self.container = container
def __ror__(self, other):
"""Collect upstream pipe.
Args:
other (Pipe): upstream pipe
"""
return list(other)