Source code for lavaflow.pipes.core

"""Core pipe functionality.
"""

import itertools
import collections
import pickle

from abc import ABC
from pathlib import Path
from typing import Any

from lavaflow.utils.core import compose, dictify, windowify

import logging
logger = logging.getLogger('lavaflow')


# -----------------------------------------------------------------------------

# Pipes

[docs]class AbstractPipe(ABC): """Pipe object for connecting map and filter type operations. Subclasses can override map or filter for convenience or __next__ for flexibility. """
[docs] def __iter__(self): """Return iterator. Returns: self (Pipe): pipe """ return self
[docs] def __next__(self): """Iterate source iterator. Returns: v (object): result from source iterator """ if not hasattr(self, '_started'): self.start() setattr(self, '_started', 1) try: v = next(self.source) if self.filter(v): return self.map(v) except StopIteration: self.stop() setattr(self, '_stopped', 1) raise StopIteration()
[docs] def __or__(self, other): """Connect pipes. Args: other (AbstractPipe): downstream pipe Returns: other (AbstractPipe): downstream pipe with source set to upstream pipe """ if isinstance(other, AbstractPipe): other.link(self) return other # Pass control to the sink to consume pipe if isinstance(other, AbstractSink): return other.__ror__(self) raise Exception(f'{type(other)} not supported')
[docs] def start(self): """Code to run before consuming source iterator. """ pass
[docs] def filter(self, v): """Filter which iterator values are mapped. Args: v (object): iterator value Returns: flag (bool): flag """ return True
[docs] def map(self, v): """Map iterator value. Args: v (object): iterator value Returns: v (object): iterator value with map applied """ return v
[docs] def stop(self): """Code to run after consuming source iterator. """ pass
[docs]class Pipe(AbstractPipe): """Pipe object for wrapping any iterable. """ def __init__(self, source=[]): """Constructor. Args: source (iterable): pipe source Returns: self (Pipe): pipe """ self.source = iter(source)
[docs]class WindowPipe(AbstractPipe): """Pipe for collecting a rolling window of an interable. """ def __init__(self, window_size): """Constructor. Args: window_size (int): window size Returns: self (WindowPipe): window pipe """ self.window_size = window_size self.window_pointer = 0 self.window = []
[docs] def __next__(self): """Collect a rolling window of an iterable (ignores filter and map). Returns: window (list): list of items from iterable """ if len(self.window) < self.window_size: try: while len(self.window) < self.window_size: self.window.append(next(self.source)) self.window_pointer = (self.window_pointer + 1) % self.window_size except StopIteration: raise ValueError(f'window size ({self.window_size}) > number of elements in source iterator ({len(self.window)})') else: self.window[self.window_pointer] = next(self.source) self.window_pointer = (self.window_pointer + 1) % self.window_size return self.window[self.window_pointer:self.window_size] + self.window[0:self.window_pointer]
[docs]class SelectPipe(AbstractPipe): """Pipe for selecting an item out of a rolling window. """ def __init__(self, index=-1): """Constructor. Args: index (int): item index Returns: self (SelectPipe): linear pipe """ self.index = index
[docs] def __next__(self): """Select an item out of a rolling window (ignores filter and map). Returns: item (object): item """ return next(self.source)[self.index]
[docs]class SplitPipe(AbstractPipe): """Pipe for splitting a pipe into multiple pipes. """ def __init__(self, pipes): """Constructor. Args: pipes (list): list of pipes Returns: self (SplitPipe): pipe """ self.pipes = pipes
[docs] def __next__(self): """Iterate source iterator. Returns: v (object): result from source iterator """ temp = [next(pipe) for pipe in self.pipes] if self.filter(temp): return self.map(temp)
# ----------------------------------------------------------------------------- # Pipes based on dictionary processing
[docs]class DictPipe(AbstractPipe): """Pipe for wrapping each item in an iterable with a dictionary. """ def __init__(self, key, constants={}): """Constructor. Args: key (str|list): key for obj constants (dict): constant metadata to insert into the dictionary Returns: self (DictPipe): dictionary pipe """ self.key = key self.constants = constants self.counter = 0
[docs] def map(self, v: Any): """Wrap item with a dictionary. Args: v (object): value to wrap Returns: d (dict): dictionary """ d = { 'counter': self.counter, **self.constants, } if isinstance(self.key, list): if not hasattr(v, '__iter__') and hasattr(v, '__len__') and len(v) == len(self.key): raise ValueError(f"item must have the same length as DictPipe.key = {self.key}") else: for k, x in zip(self.key, v): d[k] = x else: d[self.key] = v self.counter += 1 return d
[docs]class FuncPipe(AbstractPipe): """Pipe for applying a function to each item in an iterable. """ def __init__(self, f): """Constructor. Args: f (callable|list): callable function or list of functions to apply in sequence Returns: self (FuncPipe): pipe """ if callable(f): self.f = f elif isinstance(f, list): self.f = compose(f) else: raise Exception(f'{type(f)} not supported')
[docs] def map(self, v): """Apply function to item. Args: v (object): item Returns: v (object): item with function applied """ return self.f(v)
[docs]class DictFuncPipe(FuncPipe): """Pipe for applying a function to each dictionary in an iterable to map an input key to an output key. """ def __init__(self, f, x, y): """Constructor. Args: f (callable|list): callable function or list of functions to apply in sequence x (str|list): input key(s) y (str|list): output key(s) Returns: self (FuncPipe): pipe """ super().__init__(f) self.f = dictify(self.f, x, y)
[docs]class WindowDictFuncPipe(FuncPipe): """Pipe for applying a function to each dictionary in an iterable to map an input key to an output key. """ def __init__(self, f, i, x, j, y, chain_args): """Constructor. Args: f (callable|list): callable function or list of functions to apply in sequence i (int|list): input index(es) x (str|list): input key(s) j (int|list): output index(es) y (str|list): output key(s) chain_args (bool): flatten inputs Returns: self (FuncPipe): pipe """ super().__init__(f) self.f = windowify(self.f, i, x, j, y, chain_args)
# ----------------------------------------------------------------------------- # Pipes for outputs
[docs]class PickleWriterPipe(AbstractPipe): """Pipe for serializing each item in an iterable to a pickle file in a directory. """ def __init__(self, directory, prefix): """Construct image writer pipe. Args: directory (str): output directory prefix (str): output file prefix Returns: pipe (PickleWriterPipe): pipe """ self.directory = Path(directory) self.prefix = prefix
[docs] def start(self): """Initialize counter. """ self.counter = 0 self.directory.mkdir(parents=True, exist_ok=True)
[docs] def map(self, v): """Write pickle file to directory. Args: v (object): iterator value """ with open(self.directory / f'{self.prefix}-{self.counter:05d}.pk', 'w') as f: pickle.dump(v, f) self.counter += 1 return v
[docs] def stop(self): """Log counter. """ logger.info(f'{self.counter} pickle files written successfully')
# ----------------------------------------------------------------------------- # Sinks
[docs]class AbstractSink(ABC): """Sink object for consuming upstream pipe. Subclasses can override process, init, and post for convenience or __ror__ for flexibility. """ def __ror__(self, other): """Consume upstream pipe. Args: other (Pipe): upstream pipe """ self.init() for v in other: self.process(v) self.post()
[docs] def init(self): """Code to run before processing upstream pipe (optional). """ pass
[docs] def post(self): """Code to run before processing upstream pipe (optional). """ pass
[docs] def process(self, v): """Process upstream pipe value. Args: v (object): upstream pipe value """ pass
[docs]class ConsumeSink(AbstractSink): """Sink for consuming upstream pipe without collecting. """ def __init__(self): """Constructor. Returns: self (ConsumeSink): sink """ pass def __ror__(self, other): """Consume upstream pipe. Args: other (Pipe): upstream pipe """ collections.deque(other, maxlen=0)
[docs]class CollectSink(AbstractSink): """Sink for collecting upstream pipe into container. """ def __init__(self, container=list): """Constructor. Args: container (type): container type (default: list) Returns: self (CollectSink): sink """ self.container = container def __ror__(self, other): """Collect upstream pipe. Args: other (Pipe): upstream pipe """ return list(other)