Waveform Database Software Package (WFDB) for Python 4.1.0
(5,876 bytes)
import numpy as np
import pandas as pd
from scipy import signal
from wfdb.io.annotation import Annotation
def resample_ann(ann_sample, fs, fs_target):
"""
Compute the new annotation indices.
Parameters
----------
ann_sample : ndarray
Array of annotation locations.
fs : int
The starting sampling frequency.
fs_target : int
The desired sampling frequency.
Returns
-------
ndarray
Array of resampled annotation locations.
"""
ratio = fs_target / fs
return (ratio * ann_sample).astype(np.int64)
def resample_sig(x, fs, fs_target):
"""
Resample a signal to a different frequency.
Parameters
----------
x : ndarray
Array containing the signal.
fs : int, float
The original sampling frequency.
fs_target : int, float
The target frequency.
Returns
-------
resampled_x : ndarray
Array of the resampled signal values.
resampled_t : ndarray
Array of the resampled signal locations.
"""
t = np.arange(x.shape[0]).astype("float64")
if fs == fs_target:
return x, t
new_length = int(x.shape[0] * fs_target / fs)
# Resample the array if NaN values are present
if np.isnan(x).any():
x = pd.Series(x.reshape((-1,))).interpolate().values
resampled_x, resampled_t = signal.resample(x, num=new_length, t=t)
assert (
resampled_x.shape == resampled_t.shape
and resampled_x.shape[0] == new_length
)
assert np.all(np.diff(resampled_t) > 0)
return resampled_x, resampled_t
def resample_singlechan(x, ann, fs, fs_target):
"""
Resample a single-channel signal with its annotations.
Parameters
----------
x: ndarray
The signal array.
ann : WFDB Annotation
The WFDB annotation object.
fs : int, float
The original frequency.
fs_target : int, float
The target frequency.
Returns
-------
resampled_x : ndarray
Array of the resampled signal values.
resampled_ann : WFDB Annotation
Annotation containing resampled annotation locations.
"""
resampled_x, _ = resample_sig(x, fs, fs_target)
new_sample = resample_ann(ann.sample, fs, fs_target)
resampled_ann = Annotation(
record_name=ann.record_name,
extension=ann.extension,
sample=new_sample,
symbol=ann.symbol,
subtype=ann.subtype,
chan=ann.chan,
num=ann.num,
aux_note=ann.aux_note,
fs=fs_target,
)
return resampled_x, resampled_ann
def resample_multichan(xs, ann, fs, fs_target, resamp_ann_chan=0):
"""
Resample multiple channels with their annotations.
Parameters
----------
xs: ndarray
The signal array.
ann : WFDB Annotation
The WFDB annotation object.
fs : int, float
The original frequency.
fs_target : int, float
The target frequency.
resample_ann_channel : int, optional
The signal channel used to compute new annotation indices.
Returns
-------
ndarray
Array of the resampled signal values.
resampled_ann : WFDB Annotation
Annotation containing resampled annotation locations.
"""
assert resamp_ann_chan < xs.shape[1]
lx = []
for chan in range(xs.shape[1]):
resampled_x, _ = resample_sig(xs[:, chan], fs, fs_target)
lx.append(resampled_x)
new_sample = resample_ann(ann.sample, fs, fs_target)
resampled_ann = Annotation(
record_name=ann.record_name,
extension=ann.extension,
sample=new_sample,
symbol=ann.symbol,
subtype=ann.subtype,
chan=ann.chan,
num=ann.num,
aux_note=ann.aux_note,
fs=fs_target,
)
return np.column_stack(lx), resampled_ann
def normalize_bound(sig, lb=0, ub=1):
"""
Normalize a signal between the lower and upper bound.
Parameters
----------
sig : ndarray
Original signal to be normalized.
lb : int, float, optional
Lower bound.
ub : int, float, optional
Upper bound.
Returns
-------
ndarray
Normalized signal.
"""
mid = ub - (ub - lb) / 2
min_v = np.min(sig)
max_v = np.max(sig)
mid_v = max_v - (max_v - min_v) / 2
coef = (ub - lb) / (max_v - min_v)
return sig * coef - (mid_v * coef) + mid
def smooth(sig, window_size):
"""
Apply a uniform moving average filter to a signal.
Parameters
----------
sig : ndarray
The signal to smooth.
window_size : int
The width of the moving average filter.
Returns
-------
ndarray
The convolved input signal with the desired box waveform.
"""
box = np.ones(window_size) / window_size
return np.convolve(sig, box, mode="same")
def get_filter_gain(b, a, f_gain, fs):
"""
Given filter coefficients, return the gain at a particular
frequency.
Parameters
----------
b : list
List of linear filter b coefficients.
a : list
List of linear filter a coefficients.
f_gain : int, float, optional
The frequency at which to calculate the gain.
fs : int, float, optional
The sampling frequency of the system.
Returns
-------
gain : int, float
The passband gain at the desired frequency.
"""
# Save the passband gain
w, h = signal.freqz(b, a)
w_gain = f_gain * 2 * np.pi / fs
ind = np.where(w >= w_gain)[0][0]
gain = abs(h[ind])
return gain
def normalize(X):
"""
Scale input vector to unit norm (vector length).
Parameters
----------
X : ndarray
The vector to normalize.
Returns
-------
ndarray
The normalized vector.
"""
return X / np.linalg.norm(X)