"""Filters for smoothing velocity field.
"""
import cv2
import numpy as np
from abc import ABC, abstractmethod
from scipy.signal import butter, filtfilt
from scipy.special import expit
from scipy.ndimage import gaussian_filter1d
from lavaflow.pipes.core import AbstractPipe
import logging
logger = logging.getLogger('lavaflow')
[docs]class AbstractFilter(ABC):
""" Class representing an abstract filter with an apply_filter method.
"""
[docs] @abstractmethod
def apply_filter(self, window):
"""Applies a filter to the window.
Args:
window (np.ndarray): window
Returns:
filtered_window (np.ndarray): window with filter applied
"""
pass
# TODO: review the following
[docs]class ButterworthFilter(AbstractFilter):
"""Class representing a butterworth filter.
"""
def __init__(self, cutoff, fs, order):
"""Constructor.
Args:
cutoff: Cutoff frequency (Hz)
fs: Sampling frequency (Hz)
order: Filter order
Returns:
self (ButterworthFilter): butterworth filter
"""
normal_cutoff = cutoff / (2 * fs)
self.b, self.a = butter(order, normal_cutoff, btype='low', analog=False)
[docs] def apply_filter(self, window):
"""Applies the butterworth filter to a window along axis 0.
Args:
window: The window (usually np.ndarray) to apply the filter to. Can be more than 1D.
Returns:
filtered_window: The window after the butterworth filter is applied to it.
"""
return filtfilt(self.b, self.a, window, axis=0)
[docs]class GaussianFilter(AbstractFilter):
"""Class representing a gaussian filter.
"""
def __init__(self, sigma, truncation=None):
"""Constructor.
Args:
sigma: sigma value to use for the gaussian filter, higher means more blurring.
truncation: point at which the gaussian filter is truncated
Returns:
self (GaussianFilter): gaussian filter
"""
self.sigma = sigma
self.truncation = truncation
[docs] def apply_filter(self, window):
"""Applies the gaussian filter to a window along axis 0.
Args:
window: The window (usually np.ndarray) to apply the filter to. Can be more than 1D.
Returns:
filtered_window: The window after the gaussian filter is applied to it.
"""
if self.truncation is None:
return gaussian_filter1d(window, self.sigma, axis=0)
else:
return gaussian_filter1d(window, self.sigma, axis=0, truncate=self.truncation)
[docs]class DoubleWindowFilterPipe(AbstractPipe):
"""Pipe for double-window filtering.
This is done by taking two overlapping window each of size 2 * window_size and filling
them both up. Then when a window is full, the filter is applied to it and then the
filtered result is saved. The overlap between the two filtered windows is then averaged
together with a sigmoid function and this is kept as the output buffer.
This method means that only two windows of frames needs to be kept in memory and thus
the amount of memory used is bounded; as opposed to a method where the entire sequence
of frames is kept in memory. However, double windowing has a performance and accuracy
penalty as each frame needs to be filtered twice.
"""
def __init__(self, signal_filter: AbstractFilter, input_key: str, output_key: str, window_size: int):
"""Constructor.
Args:
signal_filter (AbstractFilter): The signal filter to filter the data with.
input_key (str): The key to use to retrieve the data from input dictionaries
output_key (str): The key to use to output the data in output dictionaries
window_size (int): The size of the filtering windows
Returns:
self (DoubleWindowFilterPipe): double window filter pipe
"""
# Initialise parameters
self.window_size = window_size
self.window_1 = None
self.window_2 = None
self.input_key = input_key
self.output_key = output_key
self.signal_filter = signal_filter
# Initialise the two windows and their filtered versions
self.window_1 = None
self.window_2 = None
self.filtered_window_1 = None
self.filtered_window_2 = None
self.output_buffer = None
# Initialise weights for averaging windows together
self.weights = None
self.last = None
# Initialise window position counters
self.window_counter_1 = 0
self.window_counter_2 = window_size
self.output_counter = 0
[docs] def initialise(self, window):
"""Initialise the double window filter.
Args:
window: Window of data to initialise the filter with.
"""
if len(window) < 2 * self.window_size:
raise ValueError(f"Window size ({len(window)}) should be at least twice the self.window_size ({self.window_size})")
# Re-initialise counters
self.window_counter_1 = 0
self.window_counter_2 = self.window_size
# Create windows
frame_shape = window[0][self.input_key].shape
self.window_1 = np.zeros((2 * self.window_size, *frame_shape))
self.window_2 = np.zeros((2 * self.window_size, *frame_shape))
# Initialise the weights
t = np.linspace(-6, 6, self.window_size)
self.weights = np.broadcast_to(expit(t), reversed((self.window_size, *frame_shape))).T
# Backfill window_2 with a reflection of the data
for i in range(self.window_size):
self.window_2[self.window_size - i - 1] = window[i][self.input_key]
# Fill window 2 then filter
for i in range(self.window_size):
self.window_1[i] = window[i][self.input_key]
self.window_2[i + self.window_size] = window[i][self.input_key]
self.filtered_window_2 = self.signal_filter.apply_filter(self.window_2)
# Fill window 1 then filter
for i in range(self.window_size):
self.window_1[i + self.window_size] = window[i + self.window_size][self.input_key]
self.window_2[i] = window[i + self.window_size][self.input_key]
self.filtered_window_1 = self.signal_filter.apply_filter(self.window_1)
self.output_buffer = self.filtered_window_1[:self.window_size] * self.weights + self.filtered_window_2[self.window_size:] * (1 - self.weights)
logger.info(f"Initialised {self}, input key = {self.input_key}, output key = {self.output_key}, "
f"window size = {self.window_size}, frame size = {frame_shape}")
[docs] def map(self, window):
"""Applies double-window filtering to a window of values.
Args:
window: Window of values to filter.
Returns:
window: Window of values with the first dictionary in the window now having
a value called self.output_key corresponding to the smoothed version
of that signal.
"""
if self.last is None or not np.all(self.last == window[2 * self.window_size - 2]):
# Either not initialised or skipped frames (so need to re-initialise)
if self.last is None:
logger.info(f"Initialising DoubleWindowFilterPipe on {self.input_key}")
else:
logger.info(f"Frame skip detected, re-initialising DoubleWindowFilterPipe on {self.input_key}")
self.initialise(window)
else:
# Add into the windows
self.window_1[self.window_counter_1] = window[2 * self.window_size - 1][self.input_key]
self.window_2[self.window_counter_2] = window[2 * self.window_size - 1][self.input_key]
self.window_counter_1 += 1
self.window_counter_2 += 1
# If one of the windows is full, perform the filtering and average the two
# filtered windows together, and reset the counters.
if self.window_counter_1 == 2 * self.window_size:
self.filtered_window_1 = self.signal_filter.apply_filter(self.window_1)
self.window_counter_1 = 0
self.output_buffer = self.filtered_window_1[:self.window_size] * self.weights + self.filtered_window_2[self.window_size:] * (1 - self.weights)
self.output_counter = 0
elif self.window_counter_2 == 2 * self.window_size:
self.filtered_window_2 = self.signal_filter.apply_filter(self.window_2)
self.window_counter_2 = 0
self.output_buffer = self.filtered_window_1[self.window_size:] * (1 - self.weights) + self.filtered_window_2[:self.window_size] * self.weights
self.output_counter = 0
# Put the smoothed value into the first dictionary in the window.
self.last = window[2 * self.window_size - 1]
window[0][self.output_key] = self.output_buffer[self.output_counter]
self.output_counter += 1
return window
[docs]class SingleExponentialFilterPipe(AbstractPipe):
"""Pipe for single exponential filtering.
The last filtered value is kept and is updated by
`last = alpha * current + (1 - alpha) * last`
where alpha is a parameter and current is the current value in the window.
This method of filtering is the fastest and requires the least memory since only
the self.last array is kept and only one elementwise array multiplication needs
to be done at each stage; however it is not the most accurate filtering method.
"""
def __init__(self, alpha, input_key, output_key):
"""Constructor.
Args:
alpha (float): alpha value (typically between 0 and 1) for updating filter
input_key (str): The key to use to retrieve the data from input dictionaries
output_key (str): The key to use to output the data in output dictionaries
Returns:
self (SingleExponentialFilterPipe): single exponential filter pipe
"""
self.alpha = alpha
self.input_key = input_key
self.output_key = output_key
self.last = None
[docs] def map(self, new):
"""Maps a new value to its smoothed value.
Args:
new: dict with self.input_key being data to be smoothed
Returns:
new: dict with self.output_key being smoothed data
"""
if self.last is None:
self.last = new[self.input_key]
else:
self.last = self.alpha * new[self.input_key] + (1 - self.alpha) * self.last
new[self.output_key] = self.last.copy()
return new