Source code for lavaflow.pipes.filters

"""Filters for smoothing velocity field.
"""

import cv2
import numpy as np

from abc import ABC, abstractmethod
from scipy.signal import butter, filtfilt
from scipy.special import expit
from scipy.ndimage import gaussian_filter1d

from lavaflow.pipes.core import AbstractPipe

import logging
logger = logging.getLogger('lavaflow')


[docs]class AbstractFilter(ABC): """ Class representing an abstract filter with an apply_filter method. """
[docs] @abstractmethod def apply_filter(self, window): """Applies a filter to the window. Args: window (np.ndarray): window Returns: filtered_window (np.ndarray): window with filter applied """ pass
# TODO: review the following
[docs]class ButterworthFilter(AbstractFilter): """Class representing a butterworth filter. """ def __init__(self, cutoff, fs, order): """Constructor. Args: cutoff: Cutoff frequency (Hz) fs: Sampling frequency (Hz) order: Filter order Returns: self (ButterworthFilter): butterworth filter """ normal_cutoff = cutoff / (2 * fs) self.b, self.a = butter(order, normal_cutoff, btype='low', analog=False)
[docs] def apply_filter(self, window): """Applies the butterworth filter to a window along axis 0. Args: window: The window (usually np.ndarray) to apply the filter to. Can be more than 1D. Returns: filtered_window: The window after the butterworth filter is applied to it. """ return filtfilt(self.b, self.a, window, axis=0)
[docs]class GaussianFilter(AbstractFilter): """Class representing a gaussian filter. """ def __init__(self, sigma, truncation=None): """Constructor. Args: sigma: sigma value to use for the gaussian filter, higher means more blurring. truncation: point at which the gaussian filter is truncated Returns: self (GaussianFilter): gaussian filter """ self.sigma = sigma self.truncation = truncation
[docs] def apply_filter(self, window): """Applies the gaussian filter to a window along axis 0. Args: window: The window (usually np.ndarray) to apply the filter to. Can be more than 1D. Returns: filtered_window: The window after the gaussian filter is applied to it. """ if self.truncation is None: return gaussian_filter1d(window, self.sigma, axis=0) else: return gaussian_filter1d(window, self.sigma, axis=0, truncate=self.truncation)
[docs]class DoubleWindowFilterPipe(AbstractPipe): """Pipe for double-window filtering. This is done by taking two overlapping window each of size 2 * window_size and filling them both up. Then when a window is full, the filter is applied to it and then the filtered result is saved. The overlap between the two filtered windows is then averaged together with a sigmoid function and this is kept as the output buffer. This method means that only two windows of frames needs to be kept in memory and thus the amount of memory used is bounded; as opposed to a method where the entire sequence of frames is kept in memory. However, double windowing has a performance and accuracy penalty as each frame needs to be filtered twice. """ def __init__(self, signal_filter: AbstractFilter, input_key: str, output_key: str, window_size: int): """Constructor. Args: signal_filter (AbstractFilter): The signal filter to filter the data with. input_key (str): The key to use to retrieve the data from input dictionaries output_key (str): The key to use to output the data in output dictionaries window_size (int): The size of the filtering windows Returns: self (DoubleWindowFilterPipe): double window filter pipe """ # Initialise parameters self.window_size = window_size self.window_1 = None self.window_2 = None self.input_key = input_key self.output_key = output_key self.signal_filter = signal_filter # Initialise the two windows and their filtered versions self.window_1 = None self.window_2 = None self.filtered_window_1 = None self.filtered_window_2 = None self.output_buffer = None # Initialise weights for averaging windows together self.weights = None self.last = None # Initialise window position counters self.window_counter_1 = 0 self.window_counter_2 = window_size self.output_counter = 0
[docs] def initialise(self, window): """Initialise the double window filter. Args: window: Window of data to initialise the filter with. """ if len(window) < 2 * self.window_size: raise ValueError(f"Window size ({len(window)}) should be at least twice the self.window_size ({self.window_size})") # Re-initialise counters self.window_counter_1 = 0 self.window_counter_2 = self.window_size # Create windows frame_shape = window[0][self.input_key].shape self.window_1 = np.zeros((2 * self.window_size, *frame_shape)) self.window_2 = np.zeros((2 * self.window_size, *frame_shape)) # Initialise the weights t = np.linspace(-6, 6, self.window_size) self.weights = np.broadcast_to(expit(t), reversed((self.window_size, *frame_shape))).T # Backfill window_2 with a reflection of the data for i in range(self.window_size): self.window_2[self.window_size - i - 1] = window[i][self.input_key] # Fill window 2 then filter for i in range(self.window_size): self.window_1[i] = window[i][self.input_key] self.window_2[i + self.window_size] = window[i][self.input_key] self.filtered_window_2 = self.signal_filter.apply_filter(self.window_2) # Fill window 1 then filter for i in range(self.window_size): self.window_1[i + self.window_size] = window[i + self.window_size][self.input_key] self.window_2[i] = window[i + self.window_size][self.input_key] self.filtered_window_1 = self.signal_filter.apply_filter(self.window_1) self.output_buffer = self.filtered_window_1[:self.window_size] * self.weights + self.filtered_window_2[self.window_size:] * (1 - self.weights) logger.info(f"Initialised {self}, input key = {self.input_key}, output key = {self.output_key}, " f"window size = {self.window_size}, frame size = {frame_shape}")
[docs] def map(self, window): """Applies double-window filtering to a window of values. Args: window: Window of values to filter. Returns: window: Window of values with the first dictionary in the window now having a value called self.output_key corresponding to the smoothed version of that signal. """ if self.last is None or not np.all(self.last == window[2 * self.window_size - 2]): # Either not initialised or skipped frames (so need to re-initialise) if self.last is None: logger.info(f"Initialising DoubleWindowFilterPipe on {self.input_key}") else: logger.info(f"Frame skip detected, re-initialising DoubleWindowFilterPipe on {self.input_key}") self.initialise(window) else: # Add into the windows self.window_1[self.window_counter_1] = window[2 * self.window_size - 1][self.input_key] self.window_2[self.window_counter_2] = window[2 * self.window_size - 1][self.input_key] self.window_counter_1 += 1 self.window_counter_2 += 1 # If one of the windows is full, perform the filtering and average the two # filtered windows together, and reset the counters. if self.window_counter_1 == 2 * self.window_size: self.filtered_window_1 = self.signal_filter.apply_filter(self.window_1) self.window_counter_1 = 0 self.output_buffer = self.filtered_window_1[:self.window_size] * self.weights + self.filtered_window_2[self.window_size:] * (1 - self.weights) self.output_counter = 0 elif self.window_counter_2 == 2 * self.window_size: self.filtered_window_2 = self.signal_filter.apply_filter(self.window_2) self.window_counter_2 = 0 self.output_buffer = self.filtered_window_1[self.window_size:] * (1 - self.weights) + self.filtered_window_2[:self.window_size] * self.weights self.output_counter = 0 # Put the smoothed value into the first dictionary in the window. self.last = window[2 * self.window_size - 1] window[0][self.output_key] = self.output_buffer[self.output_counter] self.output_counter += 1 return window
[docs]class SingleExponentialFilterPipe(AbstractPipe): """Pipe for single exponential filtering. The last filtered value is kept and is updated by `last = alpha * current + (1 - alpha) * last` where alpha is a parameter and current is the current value in the window. This method of filtering is the fastest and requires the least memory since only the self.last array is kept and only one elementwise array multiplication needs to be done at each stage; however it is not the most accurate filtering method. """ def __init__(self, alpha, input_key, output_key): """Constructor. Args: alpha (float): alpha value (typically between 0 and 1) for updating filter input_key (str): The key to use to retrieve the data from input dictionaries output_key (str): The key to use to output the data in output dictionaries Returns: self (SingleExponentialFilterPipe): single exponential filter pipe """ self.alpha = alpha self.input_key = input_key self.output_key = output_key self.last = None
[docs] def map(self, new): """Maps a new value to its smoothed value. Args: new: dict with self.input_key being data to be smoothed Returns: new: dict with self.output_key being smoothed data """ if self.last is None: self.last = new[self.input_key] else: self.last = self.alpha * new[self.input_key] + (1 - self.alpha) * self.last new[self.output_key] = self.last.copy() return new