Waveform Database Software Package (WFDB) for Python 4.1.0

File: <base>/wfdb/processing/basic.py (5,876 bytes)
import numpy as np
import pandas as pd
from scipy import signal

from wfdb.io.annotation import Annotation


def resample_ann(ann_sample, fs, fs_target):
    """
    Compute the new annotation indices.

    Parameters
    ----------
    ann_sample : ndarray
        Array of annotation locations.
    fs : int
        The starting sampling frequency.
    fs_target : int
        The desired sampling frequency.

    Returns
    -------
    ndarray
        Array of resampled annotation locations.

    """
    ratio = fs_target / fs
    return (ratio * ann_sample).astype(np.int64)


def resample_sig(x, fs, fs_target):
    """
    Resample a signal to a different frequency.

    Parameters
    ----------
    x : ndarray
        Array containing the signal.
    fs : int, float
        The original sampling frequency.
    fs_target : int, float
        The target frequency.

    Returns
    -------
    resampled_x : ndarray
        Array of the resampled signal values.
    resampled_t : ndarray
        Array of the resampled signal locations.

    """
    t = np.arange(x.shape[0]).astype("float64")

    if fs == fs_target:
        return x, t

    new_length = int(x.shape[0] * fs_target / fs)
    # Resample the array if NaN values are present
    if np.isnan(x).any():
        x = pd.Series(x.reshape((-1,))).interpolate().values
    resampled_x, resampled_t = signal.resample(x, num=new_length, t=t)
    assert (
        resampled_x.shape == resampled_t.shape
        and resampled_x.shape[0] == new_length
    )
    assert np.all(np.diff(resampled_t) > 0)

    return resampled_x, resampled_t


def resample_singlechan(x, ann, fs, fs_target):
    """
    Resample a single-channel signal with its annotations.

    Parameters
    ----------
    x: ndarray
        The signal array.
    ann : WFDB Annotation
        The WFDB annotation object.
    fs : int, float
        The original frequency.
    fs_target : int, float
        The target frequency.

    Returns
    -------
    resampled_x : ndarray
        Array of the resampled signal values.
    resampled_ann : WFDB Annotation
        Annotation containing resampled annotation locations.

    """
    resampled_x, _ = resample_sig(x, fs, fs_target)
    new_sample = resample_ann(ann.sample, fs, fs_target)

    resampled_ann = Annotation(
        record_name=ann.record_name,
        extension=ann.extension,
        sample=new_sample,
        symbol=ann.symbol,
        subtype=ann.subtype,
        chan=ann.chan,
        num=ann.num,
        aux_note=ann.aux_note,
        fs=fs_target,
    )

    return resampled_x, resampled_ann


def resample_multichan(xs, ann, fs, fs_target, resamp_ann_chan=0):
    """
    Resample multiple channels with their annotations.

    Parameters
    ----------
    xs: ndarray
        The signal array.
    ann : WFDB Annotation
        The WFDB annotation object.
    fs : int, float
        The original frequency.
    fs_target : int, float
        The target frequency.
    resample_ann_channel : int, optional
        The signal channel used to compute new annotation indices.

    Returns
    -------
    ndarray
        Array of the resampled signal values.
    resampled_ann : WFDB Annotation
        Annotation containing resampled annotation locations.

    """
    assert resamp_ann_chan < xs.shape[1]

    lx = []
    for chan in range(xs.shape[1]):
        resampled_x, _ = resample_sig(xs[:, chan], fs, fs_target)
        lx.append(resampled_x)

    new_sample = resample_ann(ann.sample, fs, fs_target)

    resampled_ann = Annotation(
        record_name=ann.record_name,
        extension=ann.extension,
        sample=new_sample,
        symbol=ann.symbol,
        subtype=ann.subtype,
        chan=ann.chan,
        num=ann.num,
        aux_note=ann.aux_note,
        fs=fs_target,
    )

    return np.column_stack(lx), resampled_ann


def normalize_bound(sig, lb=0, ub=1):
    """
    Normalize a signal between the lower and upper bound.

    Parameters
    ----------
    sig : ndarray
        Original signal to be normalized.
    lb : int, float, optional
        Lower bound.
    ub : int, float, optional
        Upper bound.

    Returns
    -------
    ndarray
        Normalized signal.

    """
    mid = ub - (ub - lb) / 2
    min_v = np.min(sig)
    max_v = np.max(sig)
    mid_v = max_v - (max_v - min_v) / 2
    coef = (ub - lb) / (max_v - min_v)
    return sig * coef - (mid_v * coef) + mid


def smooth(sig, window_size):
    """
    Apply a uniform moving average filter to a signal.

    Parameters
    ----------
    sig : ndarray
        The signal to smooth.
    window_size : int
        The width of the moving average filter.

    Returns
    -------
    ndarray
        The convolved input signal with the desired box waveform.

    """
    box = np.ones(window_size) / window_size
    return np.convolve(sig, box, mode="same")


def get_filter_gain(b, a, f_gain, fs):
    """
    Given filter coefficients, return the gain at a particular
    frequency.

    Parameters
    ----------
    b : list
        List of linear filter b coefficients.
    a : list
        List of linear filter a coefficients.
    f_gain : int, float, optional
        The frequency at which to calculate the gain.
    fs : int, float, optional
        The sampling frequency of the system.

    Returns
    -------
    gain : int, float
        The passband gain at the desired frequency.

    """
    # Save the passband gain
    w, h = signal.freqz(b, a)
    w_gain = f_gain * 2 * np.pi / fs

    ind = np.where(w >= w_gain)[0][0]
    gain = abs(h[ind])

    return gain


def normalize(X):
    """
    Scale input vector to unit norm (vector length).

    Parameters
    ----------
    X : ndarray
        The vector to normalize.

    Returns
    -------
    ndarray
        The normalized vector.

    """
    return X / np.linalg.norm(X)