lavaflow.pipes package

lavaflow.pipes.boundary module

Pipes for improving boundary detection and masking.

class lavaflow.pipes.boundary.ExpandBoundaryWindowPipe(x, margin=10)[source]

Bases: AbstractPipe

Constructor.

Parameters

x (str) – boundary key to be expanded

Returns

image pipe

Return type

self (ImagePipe)

map(window)[source]

Map.

Parameters

window (list) – window of items

Returns

item with expanded smooth boundary

Return type

d (dict)

class lavaflow.pipes.boundary.SmoothBoundaryWindowPipe(x, y, intensity, threshold)[source]

Bases: AbstractPipe

Constructor.

Parameters
  • x (str) – input key

  • y (str) – output key

  • intensity (float) – intensity to apply to each frame in window, e.g. 255 / N

  • threshold (float) – threshold for smooth mask

Returns

image pipe

Return type

self (ImagePipe)

map(window)[source]

Map.

Parameters

window (list) – window of items

Returns

item with smooth boundary applied

Return type

d (dict)

lavaflow.pipes.core module

Core pipe functionality.

class lavaflow.pipes.core.AbstractPipe[source]

Bases: ABC

Pipe object for connecting map and filter type operations.

Subclasses can override map or filter for convenience or __next__ for flexibility.

__iter__()[source]

Return iterator.

Returns

pipe

Return type

self (Pipe)

__next__()[source]

Iterate source iterator.

Returns

result from source iterator

Return type

v (object)

__or__(other)[source]

Connect pipes.

Parameters

other (AbstractPipe) – downstream pipe

Returns

downstream pipe with source set to upstream pipe

Return type

other (AbstractPipe)

filter(v)[source]

Filter which iterator values are mapped.

Parameters

v (object) – iterator value

Returns

flag

Return type

flag (bool)

Assign the pipe source.

Parameters

source (AbstractPipe) – upstream pipe or iterator

map(v)[source]

Map iterator value.

Parameters

v (object) – iterator value

Returns

iterator value with map applied

Return type

v (object)

start()[source]

Code to run before consuming source iterator.

stop()[source]

Code to run after consuming source iterator.

class lavaflow.pipes.core.AbstractSink[source]

Bases: ABC

Sink object for consuming upstream pipe.

Subclasses can override process, init, and post for convenience or __ror__ for flexibility.

init()[source]

Code to run before processing upstream pipe (optional).

post()[source]

Code to run before processing upstream pipe (optional).

process(v)[source]

Process upstream pipe value.

Parameters

v (object) – upstream pipe value

class lavaflow.pipes.core.CollectSink(container=<class 'list'>)[source]

Bases: AbstractSink

Sink for collecting upstream pipe into container.

Constructor.

Parameters

container (type) – container type (default: list)

Returns

sink

Return type

self (CollectSink)

class lavaflow.pipes.core.ConsumeSink[source]

Bases: AbstractSink

Sink for consuming upstream pipe without collecting.

Constructor.

Returns

sink

Return type

self (ConsumeSink)

class lavaflow.pipes.core.DictFuncPipe(f, x, y)[source]

Bases: FuncPipe

Pipe for applying a function to each dictionary in an iterable to map an input key to an output key.

Constructor.

Parameters
  • f (callable|list) – callable function or list of functions to apply in sequence

  • x (str|list) – input key(s)

  • y (str|list) – output key(s)

Returns

pipe

Return type

self (FuncPipe)

class lavaflow.pipes.core.DictPipe(key, constants={})[source]

Bases: AbstractPipe

Pipe for wrapping each item in an iterable with a dictionary.

Constructor.

Parameters
  • key (str|list) – key for obj

  • constants (dict) – constant metadata to insert into the dictionary

Returns

dictionary pipe

Return type

self (DictPipe)

map(v: Any)[source]

Wrap item with a dictionary.

Parameters

v (object) – value to wrap

Returns

dictionary

Return type

d (dict)

class lavaflow.pipes.core.FuncPipe(f)[source]

Bases: AbstractPipe

Pipe for applying a function to each item in an iterable.

Constructor.

Parameters

f (callable|list) – callable function or list of functions to apply in sequence

Returns

pipe

Return type

self (FuncPipe)

map(v)[source]

Apply function to item.

Parameters

v (object) – item

Returns

item with function applied

Return type

v (object)

class lavaflow.pipes.core.PickleWriterPipe(directory, prefix)[source]

Bases: AbstractPipe

Pipe for serializing each item in an iterable to a pickle file in a directory.

Construct image writer pipe.

Parameters
  • directory (str) – output directory

  • prefix (str) – output file prefix

Returns

pipe

Return type

pipe (PickleWriterPipe)

map(v)[source]

Write pickle file to directory.

Parameters

v (object) – iterator value

start()[source]

Initialize counter.

stop()[source]

Log counter.

class lavaflow.pipes.core.Pipe(source=[])[source]

Bases: AbstractPipe

Pipe object for wrapping any iterable.

Constructor.

Parameters

source (iterable) – pipe source

Returns

pipe

Return type

self (Pipe)

class lavaflow.pipes.core.SelectPipe(index=-1)[source]

Bases: AbstractPipe

Pipe for selecting an item out of a rolling window.

Constructor.

Parameters

index (int) – item index

Returns

linear pipe

Return type

self (SelectPipe)

__next__()[source]

Select an item out of a rolling window (ignores filter and map).

Returns

item

Return type

item (object)

class lavaflow.pipes.core.SplitPipe(pipes)[source]

Bases: AbstractPipe

Pipe for splitting a pipe into multiple pipes.

Constructor.

Parameters

pipes (list) – list of pipes

Returns

pipe

Return type

self (SplitPipe)

__next__()[source]

Iterate source iterator.

Returns

result from source iterator

Return type

v (object)

Assign the pipe source.

Parameters

source (Pipe) – upstream pipe or iterator

class lavaflow.pipes.core.WindowDictFuncPipe(f, i, x, j, y, chain_args)[source]

Bases: FuncPipe

Pipe for applying a function to each dictionary in an iterable to map an input key to an output key.

Constructor.

Parameters
  • f (callable|list) – callable function or list of functions to apply in sequence

  • i (int|list) – input index(es)

  • x (str|list) – input key(s)

  • j (int|list) – output index(es)

  • y (str|list) – output key(s)

  • chain_args (bool) – flatten inputs

Returns

pipe

Return type

self (FuncPipe)

class lavaflow.pipes.core.WindowPipe(window_size)[source]

Bases: AbstractPipe

Pipe for collecting a rolling window of an interable.

Constructor.

Parameters

window_size (int) – window size

Returns

window pipe

Return type

self (WindowPipe)

__next__()[source]

Collect a rolling window of an iterable (ignores filter and map).

Returns

list of items from iterable

Return type

window (list)

lavaflow.pipes.filters module

Filters for smoothing velocity field.

class lavaflow.pipes.filters.AbstractFilter[source]

Bases: ABC

Class representing an abstract filter with an apply_filter method.

abstract apply_filter(window)[source]

Applies a filter to the window.

Parameters

window (np.ndarray) – window

Returns

window with filter applied

Return type

filtered_window (np.ndarray)

class lavaflow.pipes.filters.ButterworthFilter(cutoff, fs, order)[source]

Bases: AbstractFilter

Class representing a butterworth filter.

Constructor.

Parameters
  • cutoff – Cutoff frequency (Hz)

  • fs – Sampling frequency (Hz)

  • order – Filter order

Returns

butterworth filter

Return type

self (ButterworthFilter)

apply_filter(window)[source]

Applies the butterworth filter to a window along axis 0.

Parameters

window – The window (usually np.ndarray) to apply the filter to. Can be more than 1D.

Returns

The window after the butterworth filter is applied to it.

Return type

filtered_window

class lavaflow.pipes.filters.DoubleWindowFilterPipe(signal_filter: AbstractFilter, input_key: str, output_key: str, window_size: int)[source]

Bases: AbstractPipe

Pipe for double-window filtering.

This is done by taking two overlapping window each of size 2 * window_size and filling them both up. Then when a window is full, the filter is applied to it and then the filtered result is saved. The overlap between the two filtered windows is then averaged together with a sigmoid function and this is kept as the output buffer.

This method means that only two windows of frames needs to be kept in memory and thus the amount of memory used is bounded; as opposed to a method where the entire sequence of frames is kept in memory. However, double windowing has a performance and accuracy penalty as each frame needs to be filtered twice.

Constructor.

Parameters
  • signal_filter (AbstractFilter) – The signal filter to filter the data with.

  • input_key (str) – The key to use to retrieve the data from input dictionaries

  • output_key (str) – The key to use to output the data in output dictionaries

  • window_size (int) – The size of the filtering windows

Returns

double window filter pipe

Return type

self (DoubleWindowFilterPipe)

initialise(window)[source]

Initialise the double window filter.

Parameters

window – Window of data to initialise the filter with.

map(window)[source]

Applies double-window filtering to a window of values.

Parameters

window – Window of values to filter.

Returns

Window of values with the first dictionary in the window now having

a value called self.output_key corresponding to the smoothed version of that signal.

Return type

window

class lavaflow.pipes.filters.GaussianFilter(sigma, truncation=None)[source]

Bases: AbstractFilter

Class representing a gaussian filter.

Constructor.

Parameters
  • sigma – sigma value to use for the gaussian filter, higher means more blurring.

  • truncation – point at which the gaussian filter is truncated

Returns

gaussian filter

Return type

self (GaussianFilter)

apply_filter(window)[source]

Applies the gaussian filter to a window along axis 0.

Parameters

window – The window (usually np.ndarray) to apply the filter to. Can be more than 1D.

Returns

The window after the gaussian filter is applied to it.

Return type

filtered_window

class lavaflow.pipes.filters.SingleExponentialFilterPipe(alpha, input_key, output_key)[source]

Bases: AbstractPipe

Pipe for single exponential filtering.

The last filtered value is kept and is updated by

last = alpha * current + (1 - alpha) * last

where alpha is a parameter and current is the current value in the window.

This method of filtering is the fastest and requires the least memory since only the self.last array is kept and only one elementwise array multiplication needs to be done at each stage; however it is not the most accurate filtering method.

Constructor.

Parameters
  • alpha (float) – alpha value (typically between 0 and 1) for updating filter

  • input_key (str) – The key to use to retrieve the data from input dictionaries

  • output_key (str) – The key to use to output the data in output dictionaries

Returns

single exponential filter pipe

Return type

self (SingleExponentialFilterPipe)

map(new)[source]

Maps a new value to its smoothed value.

Parameters

new – dict with self.input_key being data to be smoothed

Returns

dict with self.output_key being smoothed data

Return type

new

lavaflow.pipes.io module

Pipes for writing outputs.

class lavaflow.pipes.io.DictImageWriterPipe(directory, prefix, key)[source]

Bases: ImageWriterPipe

Pipe for writing an image keyed from each dictionary in an iterable to a jpg in a directory.

Constructor.

Parameters
  • directory (str) – output directory

  • prefix (str) – output file prefix

  • key (str) – image key

Returns

pipe

Return type

self (FuncPipe)

map(d)[source]

Write image from an input key to directory.

Parameters

d (dict) – dictionary

class lavaflow.pipes.io.DictNumpyWriterPipe(file, key, separators=(',', ';', ':'), **kwargs)[source]

Bases: NumpyWriterPipe

Pipe for serializing an array keyed from each dictionary in an iterable.

Construct video writer pipe.

Parameters
  • file (str) – output file name

  • key (str) – boundary key

  • separators (list|tuple) – separators to use for each axis in order

  • kwargs (dict) – keyword arguments to pass to np.array2string

Returns

pipe

Return type

pipe (VideoWriterPipe)

map(d)[source]

Write image from an input key to directory.

Parameters

d (dict) – dictionary

class lavaflow.pipes.io.DictVideoWriterPipe(file, fps, res, key)[source]

Bases: VideoWriterPipe

Pipe for writing an image keyed from each dictionary in an iterable to a video.

Constructor.

Parameters
  • file (str) – output file name

  • fps (int) – output video frames per second

  • res (tuple) – output video size written as (w, h)

  • key (str) – image key

Returns

pipe

Return type

self (FuncPipe)

map(d)[source]

Write image from an input key to directory.

Parameters

d (dict) – dictionary

class lavaflow.pipes.io.ImageWriterPipe(directory, prefix)[source]

Bases: AbstractPipe

Pipe for writing each image in an iterable to a jpg in a directory.

Construct image writer pipe.

Parameters
  • directory (str) – output directory

  • prefix (str) – output file prefix

Returns

pipe

Return type

pipe (ImageWriterPipe)

map(img)[source]

Write image to directory.

Parameters

img (np.ndarray) – image

start()[source]

Initialize counter.

stop()[source]

Log counter.

class lavaflow.pipes.io.JsonWriterPipe(directory, prefix, include=None, exclude=None, pretty=False)[source]

Bases: AbstractPipe

Pipe for serializing each item in an iterable to a json in a directory.

Construct image writer pipe.

Parameters
  • directory (str) – output directory

  • prefix (str) – output file prefix

  • include (list) – specific keys to include (all other keys are excluded)

  • exclude (list) – specific keys to exclude (ignored if included is specified)

  • pretty (bool) – write pretty json format for readability

Returns

pipe

Return type

pipe (JsonWriterPipe)

map(d)[source]

Write image to directory.

Parameters

d (dict) – dictionary

start()[source]

Initialize counter.

stop()[source]

Log counter.

class lavaflow.pipes.io.JsonpWriterPipe(file, include=None, exclude=None)[source]

Bases: AbstractPipe

Pipe for serializing each item in an iterable to a row in a jsonp.

Construct image writer pipe.

Parameters
  • file (str) – output file name

  • include (list) – specific keys to include (all other keys are excluded)

  • exclude (list) – specific keys to exclude (ignored if included is specified)

Returns

pipe

Return type

pipe (JsonWriterPipe)

map(d)[source]

Write dictionary to directory.

Parameters

d (dict) – dictionary

start()[source]

Open file and initialize counter.

stop()[source]

Close file and log counter.

class lavaflow.pipes.io.NumpyWriterPipe(file, separators=(',', ';', ':'), **kwargs)[source]

Bases: AbstractPipe

Pipe for serializing numpy arrays.

Construct video writer pipe.

Parameters
  • file (str) – output file name

  • separators (list|tuple) – separators to use for each axis in order

  • kwargs (dict) – keyword arguments to pass to np.array2string

Returns

pipe

Return type

pipe (VideoWriterPipe)

map(x)[source]

Write array to file.

Parameters

x (np.ndarray) – numpy array

start()[source]

Open file and initialize counter.

stop()[source]

Close file and log counter.

class lavaflow.pipes.io.VideoWriterPipe(file, fps, res)[source]

Bases: AbstractPipe

Pipe for writing each image in an iterable to a video.

Construct video writer pipe.

Parameters
  • file (str) – output file name

  • fps (int) – output video frames per second

  • res (tuple) – output video size written as (w, h)

Returns

pipe

Return type

pipe (VideoWriterPipe)

map(img)[source]

Write image to video.

Parameters

img (np.ndarray) – image

start()[source]

Open video writer.

stop()[source]

Release video writer.

lavaflow.pipes.tracking module

Pipes for apply feature tracking.

class lavaflow.pipes.tracking.FeatureDescriptorMatchingWindowPipe(feature_tracker, x, y, z)[source]

Bases: AbstractPipe

Pipe for applying a function to each dictionary in an iterable to map an input key to an output key.

Constructor.

Parameters
  • feature_tracker (FeatureDescriptorMatching) – feature descriptor matching

  • x (str) – image key

  • y (str) – boundary key

  • z (str) – output key

Returns

pipe

Return type

self (FuncPipe)

map(window)[source]

Map.

Parameters

window (list) – window of items

Returns

item with feature tracking

Return type

d (dict)

lavaflow.pipes.visualizations module

Pipes for improving boundary detection and masking.

class lavaflow.pipes.visualizations.VisualizeBoundaryPipe(x, boundary_detector, contours)[source]

Bases: AbstractPipe

Constructor.

Parameters
  • x (str) – input key

  • boundary_detector (dict) – boundary detector to draw

  • contours (list) – list of contours to draw

Returns

image pipe

Return type

self (ImagePipe)

map(d)[source]

Apply image processing.

Parameters

d (np.ndarray) – image to be processed

Returns

processed image

Return type

d (np.ndarray)