Waveform Database Software Package (WFDB) for Python 4.1.0
(90,295 bytes)
import math
import os
import sys
import numpy as np
from wfdb.io import download, _coreio, util
MAX_I32 = 2147483647
MIN_I32 = -2147483648
# Formats in which all samples align with integer (power-of-two) boundaries
ALIGNED_FMTS = ["8", "16", "32", "61", "80", "160"]
# Formats in which not all samples align with integer boundaries
UNALIGNED_FMTS = ["212", "310", "311", "24"]
# Formats in which samples are encoded in a variable number of bits
COMPRESSED_FMTS = ["508", "516", "524"]
# Formats which are stored in offset binary form
OFFSET_FMTS = ["80", "160"]
# All WFDB dat formats - https://www.physionet.org/physiotools/wag/signal-5.htm
DAT_FMTS = ALIGNED_FMTS + UNALIGNED_FMTS + COMPRESSED_FMTS
# Bytes required to hold each sample (including wasted space) for each
# WFDB dat formats
BYTES_PER_SAMPLE = {
"8": 1,
"16": 2,
"24": 3,
"32": 4,
"61": 2,
"80": 1,
"160": 2,
"212": 1.5,
"310": 4 / 3.0,
"311": 4 / 3.0,
"508": 0,
"516": 0,
"524": 0,
}
# The bit resolution of each WFDB dat format
BIT_RES = {
"8": 8,
"16": 16,
"24": 24,
"32": 32,
"61": 16,
"80": 8,
"160": 16,
"212": 12,
"310": 10,
"311": 10,
"508": 8,
"516": 16,
"524": 24,
}
# Numpy dtypes used to load dat files of each format.
DATA_LOAD_TYPES = {
"8": "<i1",
"16": "<i2",
"24": "<u1",
"32": "<i4",
"61": ">i2",
"80": "<u1",
"160": "<u2",
"212": "<u1",
"310": "<u1",
"311": "<u1",
}
# Minimum and maximum digital sample values for each of the WFDB dat
# formats.
SAMPLE_VALUE_RANGE = {
"80": (-(2**7), 2**7 - 1),
"508": (-(2**7), 2**7 - 1),
"310": (-(2**9), 2**9 - 1),
"311": (-(2**9), 2**9 - 1),
"212": (-(2**11), 2**11 - 1),
"16": (-(2**15), 2**15 - 1),
"61": (-(2**15), 2**15 - 1),
"160": (-(2**15), 2**15 - 1),
"516": (-(2**15), 2**15 - 1),
"24": (-(2**23), 2**23 - 1),
"524": (-(2**23), 2**23 - 1),
"32": (-(2**31), 2**31 - 1),
"8": (-(2**31), 2**31 - 1),
}
# Digital value used to represent a missing/invalid sample, in each of the
# WFDB dat formats.
INVALID_SAMPLE_VALUE = {
"80": -(2**7),
"508": -(2**7),
"310": -(2**9),
"311": -(2**9),
"212": -(2**11),
"16": -(2**15),
"61": -(2**15),
"160": -(2**15),
"516": -(2**15),
"24": -(2**23),
"524": -(2**23),
"32": -(2**31),
"8": None,
}
class SignalMixin(object):
"""
Mixin class with signal methods. Inherited by Record class.
Attributes
----------
N/A
"""
def wr_dats(self, expanded, write_dir):
"""
Write all dat files associated with a record
expanded=True to use e_d_signal instead of d_signal.
Parameters
----------
expanded : bool
Whether to transform the `e_d_signal` attribute (True) or
the `d_signal` attribute (False).
write_dir : str
The directory to write the output file to.
Returns
-------
N/A
"""
if not self.n_sig:
return
# Get all the fields used to write the header
# Assuming this method was called through wrsamp,
# these will have already been checked in wrheader()
_, _ = self.get_write_fields()
if expanded:
# Using list of arrays e_d_signal
self.check_field("e_d_signal")
else:
# Check the validity of the d_signal field
self.check_field("d_signal")
# Check the cohesion of the d_signal field against the other
# fields used to write the header. (Note that for historical
# reasons, this doesn't actually check any of the optional
# header fields.)
self.check_sig_cohesion([], expanded)
# Write each of the specified dat files
self.wr_dat_files(expanded=expanded, write_dir=write_dir)
def check_sig_cohesion(self, write_fields, expanded):
"""
Check the cohesion of the d_signal/e_d_signal field with the other
fields used to write the record.
Parameters
----------
write_fields : list
All the fields used to write the header.
expanded : bool
Whether to transform the `e_d_signal` attribute (True) or
the `d_signal` attribute (False).
Returns
-------
N/A
"""
# Using list of arrays e_d_signal
if expanded:
# Set default samps_per_frame
spf = self.samps_per_frame
for ch in range(len(spf)):
if spf[ch] is None:
spf[ch] = 1
# Match the actual signal shape against stated length and number of channels
if self.n_sig != len(self.e_d_signal):
raise ValueError(
"n_sig does not match the length of e_d_signal"
)
for ch in range(self.n_sig):
if len(self.e_d_signal[ch]) != spf[ch] * self.sig_len:
raise ValueError(
f"Length of channel {ch} does not match "
f"samps_per_frame[{ch}]*sig_len"
)
# For each channel (if any), make sure the digital format has no values out of bounds
for ch in range(self.n_sig):
fmt = self.fmt[ch]
dmin, dmax = _digi_bounds(self.fmt[ch])
chmin = min(self.e_d_signal[ch])
chmax = max(self.e_d_signal[ch])
if (chmin < dmin) or (chmax > dmax):
raise IndexError(
"Channel "
+ str(ch)
+ " contain values outside allowed range ["
+ str(dmin)
+ ", "
+ str(dmax)
+ "] for fmt "
+ str(fmt)
)
# Ensure that the checksums and initial value fields match the digital signal (if the fields are present)
if self.n_sig > 0:
if "checksum" in write_fields:
realchecksum = self.calc_checksum(expanded)
if self.checksum != realchecksum:
print(
"The actual checksum of e_d_signal is: ",
realchecksum,
)
raise ValueError(
"checksum field does not match actual checksum of e_d_signal"
)
if "init_value" in write_fields:
realinit_value = [
self.e_d_signal[ch][0] for ch in range(self.n_sig)
]
if self.init_value != realinit_value:
print(
"The actual init_value of e_d_signal is: ",
realinit_value,
)
raise ValueError(
"init_value field does not match actual init_value of e_d_signal"
)
# Using uniform d_signal
else:
# Match the actual signal shape against stated length and number of channels
if (self.sig_len, self.n_sig) != self.d_signal.shape:
print("sig_len: ", self.sig_len)
print("n_sig: ", self.n_sig)
print("d_signal.shape: ", self.d_signal.shape)
raise ValueError(
"sig_len and n_sig do not match shape of d_signal"
)
# For each channel (if any), make sure the digital format has no values out of bounds
for ch in range(self.n_sig):
fmt = self.fmt[ch]
dmin, dmax = _digi_bounds(self.fmt[ch])
chmin = min(self.d_signal[:, ch])
chmax = max(self.d_signal[:, ch])
if (chmin < dmin) or (chmax > dmax):
raise IndexError(
"Channel "
+ str(ch)
+ " contain values outside allowed range ["
+ str(dmin)
+ ", "
+ str(dmax)
+ "] for fmt "
+ str(fmt)
)
# Ensure that the checksums and initial value fields match the digital signal (if the fields are present)
if self.n_sig > 0:
if "checksum" in write_fields:
realchecksum = self.calc_checksum()
if self.checksum != realchecksum:
print(
"The actual checksum of d_signal is: ", realchecksum
)
raise ValueError(
"checksum field does not match actual checksum of d_signal"
)
if "init_value" in write_fields:
realinit_value = list(self.d_signal[0, :])
if self.init_value != realinit_value:
print(
"The actual init_value of d_signal is: ",
realinit_value,
)
raise ValueError(
"init_value field does not match actual init_value of d_signal"
)
def set_p_features(self, do_dac=False, expanded=False):
"""
Use properties of the physical signal field to set the following
features: n_sig, sig_len.
Parameters
----------
do_dac : bool, optional
Whether to use the digital signal field to perform dac
conversion to get the physical signal field beforehand.
expanded : bool, optional
Whether to transform the `e_p_signal` attribute (True) or
the `p_signal` attribute (False). If True, the `samps_per_frame`
attribute is also required.
Returns
-------
N/A
Notes
-----
Regarding dac conversion:
- fmt, gain, and baseline must all be set in order to perform
dac.
- Unlike with adc, there is no way to infer these fields.
- Using the fmt, gain and baseline fields, dac is performed,
and (e_)p_signal is set.
*Developer note: Seems this function will be very infrequently used.
The set_d_features function seems far more useful.
"""
if expanded:
if do_dac:
self.check_field("e_d_signal")
self.check_field("fmt", "all")
self.check_field("adc_gain", "all")
self.check_field("baseline", "all")
self.check_field("samps_per_frame", "all")
# All required fields are present and valid. Perform DAC
self.e_p_signal = self.dac(expanded)
# Use e_p_signal to set fields
self.check_field("e_p_signal", "all")
self.sig_len = int(
len(self.e_p_signal[0]) / self.samps_per_frame[0]
)
self.n_sig = len(self.e_p_signal)
else:
if do_dac:
self.check_field("d_signal")
self.check_field("fmt", "all")
self.check_field("adc_gain", "all")
self.check_field("baseline", "all")
# All required fields are present and valid. Perform DAC
self.p_signal = self.dac()
# Use p_signal to set fields
self.check_field("p_signal")
self.sig_len = self.p_signal.shape[0]
self.n_sig = self.p_signal.shape[1]
def set_d_features(self, do_adc=False, single_fmt=True, expanded=False):
"""
Use properties of the digital signal field to set the following
features: n_sig, sig_len, init_value, checksum, and possibly
*(fmt, adc_gain, baseline).
Parameters
----------
do_adc : bools, optional
Whether to use the physical signal field to perform adc
conversion to get the digital signal field beforehand.
single_fmt : bool, optional
Whether to use a single digital format during adc, if it is
performed.
expanded : bool, optional
Whether to transform the `e_p_signal` attribute (True) or
the `p_signal` attribute (False).
Returns
-------
N/A
Notes
-----
Regarding adc conversion:
- If fmt is unset:
- Neither adc_gain nor baseline may be set. If the digital values
used to store the signal are known, then the file format should
also be known.
- The most appropriate fmt for the signals will be calculated and the
`fmt` attribute will be set. Given that neither `adc_gain` nor
`baseline` is allowed to be set, optimal values for those fields will
then be calculated and set as well.
- If fmt is set:
- If both adc_gain and baseline are unset, optimal values for those
fields will be calculated the fields will be set.
- If both adc_gain and baseline are set, the function will continue.
- If only one of adc_gain and baseline are set, this function will
raise an error. It makes no sense to know only one of those fields.
- ADC will occur after valid values for fmt, adc_gain, and baseline are
present, using all three fields.
"""
if expanded:
# adc is performed.
if do_adc:
self.check_field("e_p_signal", "all")
# If there is no fmt set it, adc_gain, and baseline
if self.fmt is None:
# Make sure that neither adc_gain nor baseline are set
if self.adc_gain is not None or self.baseline is not None:
raise Exception(
"If fmt is not set, gain and baseline may not be set either."
)
# Choose appropriate fmts based on estimated signal resolutions.
res = est_res(self.e_p_signal)
self.fmt = _wfdb_fmt(res, single_fmt)
# If there is a fmt set
else:
self.check_field("fmt", "all")
# Neither field set
if self.adc_gain is None and self.baseline is None:
# Calculate and set optimal gain and baseline values to convert physical signals
self.adc_gain, self.baseline = self.calc_adc_params()
# Exactly one field set
elif (self.adc_gain is None) ^ (self.baseline is None):
raise Exception(
"If fmt is set, gain and baseline should both be set or not set."
)
self.check_field("adc_gain", "all")
self.check_field("baseline", "all")
# All required fields are present and valid. Perform ADC
self.d_signal = self.adc(expanded)
# Use e_d_signal to set fields
self.check_field("e_d_signal", "all")
self.sig_len = int(
len(self.e_d_signal[0]) / self.samps_per_frame[0]
)
self.n_sig = len(self.e_d_signal)
self.init_value = [sig[0] for sig in self.e_d_signal]
self.checksum = self.calc_checksum(expanded)
else:
# adc is performed.
if do_adc:
self.check_field("p_signal")
# If there is no fmt set
if self.fmt is None:
# Make sure that neither adc_gain nor baseline are set
if self.adc_gain is not None or self.baseline is not None:
raise Exception(
"If fmt is not set, gain and baseline may not be set either."
)
# Choose appropriate fmts based on estimated signal resolutions.
res = est_res(self.p_signal)
self.fmt = _wfdb_fmt(res, single_fmt)
# Calculate and set optimal gain and baseline values to convert physical signals
self.adc_gain, self.baseline = self.calc_adc_params()
# If there is a fmt set
else:
self.check_field("fmt", "all")
# Neither field set
if self.adc_gain is None and self.baseline is None:
# Calculate and set optimal gain and baseline values to convert physical signals
self.adc_gain, self.baseline = self.calc_adc_params()
# Exactly one field set
elif (self.adc_gain is None) ^ (self.baseline is None):
raise Exception(
"If fmt is set, gain and baseline should both be set or not set."
)
self.check_field("adc_gain", "all")
self.check_field("baseline", "all")
# All required fields are present and valid. Perform ADC
self.d_signal = self.adc()
# Use d_signal to set fields
self.check_field("d_signal")
self.sig_len = self.d_signal.shape[0]
self.n_sig = self.d_signal.shape[1]
self.init_value = list(self.d_signal[0, :])
self.checksum = self.calc_checksum()
def adc(self, expanded=False, inplace=False):
"""
Performs analogue to digital conversion of the physical signal stored
in p_signal if expanded is False, or e_p_signal if expanded is True.
The p_signal/e_p_signal, fmt, gain, and baseline fields must all be
valid.
If inplace is True, the adc will be performed inplace on the variable,
the d_signal/e_d_signal attribute will be set, and the
p_signal/e_p_signal field will be set to None.
Parameters
----------
expanded : bool, optional
Whether to transform the `e_p_signal` attribute (True) or
the `p_signal` attribute (False).
inplace : bool, optional
Whether to automatically set the object's corresponding
digital signal attribute and set the physical
signal attribute to None (True), or to return the converted
signal as a separate variable without changing the original
physical signal attribute (False).
Returns
-------
d_signal : ndarray, optional
The digital conversion of the signal. Either a 2d numpy
array or a list of 1d numpy arrays.
Examples:
---------
>>> import wfdb
>>> record = wfdb.rdsamp('sample-data/100')
>>> d_signal = record.adc()
>>> record.adc(inplace=True)
>>> record.dac(inplace=True)
"""
# The digital NAN values for each channel
d_nans = _digi_nan(self.fmt)
# To do: choose the minimum return res needed
intdtype = "int64"
# Do inplace conversion and set relevant variables.
if inplace:
if expanded:
for ch in range(self.n_sig):
# NAN locations for the channel
ch_nanlocs = np.isnan(self.e_p_signal[ch])
np.multiply(
self.e_p_signal[ch],
self.adc_gain[ch],
self.e_p_signal[ch],
)
np.add(
self.e_p_signal[ch],
self.baseline[ch],
self.e_p_signal[ch],
)
np.round(self.e_p_signal[ch], 0, self.e_p_signal[ch])
self.e_p_signal[ch] = self.e_p_signal[ch].astype(
intdtype, copy=False
)
self.e_p_signal[ch][ch_nanlocs] = d_nans[ch]
self.e_d_signal = self.e_p_signal
self.e_p_signal = None
else:
nanlocs = np.isnan(self.p_signal)
np.multiply(self.p_signal, self.adc_gain, self.p_signal)
np.add(self.p_signal, self.baseline, self.p_signal)
np.round(self.p_signal, 0, self.p_signal)
self.p_signal = self.p_signal.astype(intdtype, copy=False)
self.d_signal = self.p_signal
self.p_signal = None
# Return the variable
else:
if expanded:
d_signal = []
for ch in range(self.n_sig):
# NAN locations for the channel
ch_nanlocs = np.isnan(self.e_p_signal[ch])
ch_d_signal = self.e_p_signal[ch].copy()
np.multiply(ch_d_signal, self.adc_gain[ch], ch_d_signal)
np.add(ch_d_signal, self.baseline[ch], ch_d_signal)
np.round(ch_d_signal, 0, ch_d_signal)
ch_d_signal = ch_d_signal.astype(intdtype, copy=False)
ch_d_signal[ch_nanlocs] = d_nans[ch]
d_signal.append(ch_d_signal)
else:
nanlocs = np.isnan(self.p_signal)
# Cannot cast dtype to int now because gain is float.
d_signal = self.p_signal.copy()
np.multiply(d_signal, self.adc_gain, d_signal)
np.add(d_signal, self.baseline, d_signal)
np.round(d_signal, 0, d_signal)
d_signal = d_signal.astype(intdtype, copy=False)
if nanlocs.any():
for ch in range(d_signal.shape[1]):
if nanlocs[:, ch].any():
d_signal[nanlocs[:, ch], ch] = d_nans[ch]
return d_signal
def dac(self, expanded=False, return_res=64, inplace=False):
"""
Performs the digital to analogue conversion of the signal stored
in `d_signal` if expanded is False, or `e_d_signal` if expanded
is True.
The d_signal/e_d_signal, fmt, gain, and baseline fields must all be
valid.
If inplace is True, the dac will be performed inplace on the
variable, the p_signal/e_p_signal attribute will be set, and the
d_signal/e_d_signal field will be set to None.
Parameters
----------
expanded : bool, optional
Whether to transform the `e_d_signal attribute` (True) or
the `d_signal` attribute (False).
return_res : int, optional
The numpy array dtype of the returned signals. Options are: 64,
32, 16, and 8, where the value represents the numpy int or float
dtype. Note that the value cannot be 8 when physical is True
since there is no float8 format.
inplace : bool, optional
Whether to automatically set the object's corresponding
physical signal attribute and set the digital signal
attribute to None (True), or to return the converted
signal as a separate variable without changing the original
digital signal attribute (False).
Returns
-------
p_signal : ndarray, optional
The physical conversion of the signal. Either a 2d numpy
array or a list of 1d numpy arrays.
Examples
--------
>>> import wfdb
>>> record = wfdb.rdsamp('sample-data/100', physical=False)
>>> p_signal = record.dac()
>>> record.dac(inplace=True)
>>> record.adc(inplace=True)
"""
# The digital NAN values for each channel
d_nans = _digi_nan(self.fmt)
# Get the appropriate float dtype
if return_res == 64:
floatdtype = "float64"
elif return_res == 32:
floatdtype = "float32"
else:
floatdtype = "float16"
# Do inplace conversion and set relevant variables.
if inplace:
if expanded:
for ch in range(self.n_sig):
# NAN locations for the channel
ch_nanlocs = self.e_d_signal[ch] == d_nans[ch]
self.e_d_signal[ch] = self.e_d_signal[ch].astype(
floatdtype, copy=False
)
np.subtract(
self.e_d_signal[ch],
self.baseline[ch],
self.e_d_signal[ch],
)
np.divide(
self.e_d_signal[ch],
self.adc_gain[ch],
self.e_d_signal[ch],
)
self.e_d_signal[ch][ch_nanlocs] = np.nan
self.e_p_signal = self.e_d_signal
self.e_d_signal = None
else:
nanlocs = self.d_signal == d_nans
# Do float conversion immediately to avoid potential under/overflow
# of efficient int dtype
self.d_signal = self.d_signal.astype(floatdtype, copy=False)
np.subtract(self.d_signal, self.baseline, self.d_signal)
np.divide(self.d_signal, self.adc_gain, self.d_signal)
self.d_signal[nanlocs] = np.nan
self.p_signal = self.d_signal
self.d_signal = None
# Return the variable
else:
if expanded:
p_signal = []
for ch in range(self.n_sig):
# NAN locations for the channel
ch_nanlocs = self.e_d_signal[ch] == d_nans[ch]
ch_p_signal = self.e_d_signal[ch].astype(
floatdtype, copy=False
)
np.subtract(ch_p_signal, self.baseline[ch], ch_p_signal)
np.divide(ch_p_signal, self.adc_gain[ch], ch_p_signal)
ch_p_signal[ch_nanlocs] = np.nan
p_signal.append(ch_p_signal)
else:
nanlocs = self.d_signal == d_nans
p_signal = self.d_signal.astype(floatdtype, copy=False)
np.subtract(p_signal, self.baseline, p_signal)
np.divide(p_signal, self.adc_gain, p_signal)
p_signal[nanlocs] = np.nan
return p_signal
def calc_adc_params(self):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.
Parameters
----------
N/A
Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel.
Notes
-----
This is the mapping equation:
`digital - baseline / adc_gain = physical`
`physical * adc_gain + baseline = digital`
The original WFDB library stores `baseline` as int32.
Constrain abs(adc_gain) <= 2**31 == 2147483648.
This function does carefully deal with overflow for calculated
int32 `baseline` values, but does not consider over/underflow
for calculated float `adc_gain` values.
"""
adc_gains = []
baselines = []
if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")
# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)
for ch in range(np.shape(self.p_signal)[1]):
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1
pmin = minvals[ch]
pmax = maxvals[ch]
# Figure out digital samples used to store physical samples
# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g
# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.
# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin
# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))
# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin
# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32
adc_gains.append(adc_gain)
baselines.append(baseline)
return (adc_gains, baselines)
def convert_dtype(self, physical, return_res, smooth_frames):
"""
Convert the dtype of the signal.
Parameters
----------
physical : bool
Specifies whether to return dtype in physical (float) units in the
`p_signal` field (True), or digital (int) units in the `d_signal`
field (False).
return_res : int
The numpy array dtype of the returned signals. Options are: 64,
32, 16, and 8, where the value represents the numpy int or float
dtype. Note that the value cannot be 8 when physical is True
since there is no float8 format.
smooth_frames : bool
Used when reading records with signals having multiple samples
per frame. Specifies whether to smooth the samples in signals
with more than one sample per frame and return an (MxN) uniform
numpy array as the `d_signal` or `p_signal` field (True), or to
return a list of 1d numpy arrays containing every expanded
sample as the `e_d_signal` or `e_p_signal` field (False).
Returns
-------
N/A
"""
if physical:
return_dtype = "float" + str(return_res)
if smooth_frames:
current_dtype = self.p_signal.dtype
if current_dtype != return_dtype:
self.p_signal = self.p_signal.astype(
return_dtype, copy=False
)
else:
for ch in range(self.n_sig):
if self.e_p_signal[ch].dtype != return_dtype:
self.e_p_signal[ch] = self.e_p_signal[ch].astype(
return_dtype, copy=False
)
else:
return_dtype = "int" + str(return_res)
if smooth_frames:
current_dtype = self.d_signal.dtype
if current_dtype != return_dtype:
# Do not allow changing integer dtype to lower value due to over/underflow
if int(str(current_dtype)[3:]) > int(str(return_dtype)[3:]):
raise Exception(
"Cannot convert digital samples to lower dtype. Risk of overflow/underflow."
)
self.d_signal = self.d_signal.astype(
return_dtype, copy=False
)
else:
for ch in range(self.n_sig):
current_dtype = self.e_d_signal[ch].dtype
if current_dtype != return_dtype:
# Do not allow changing integer dtype to lower value due to over/underflow
if int(str(current_dtype)[3:]) > int(
str(return_dtype)[3:]
):
raise Exception(
"Cannot convert digital samples to lower dtype. Risk of overflow/underflow."
)
self.e_d_signal[ch] = self.e_d_signal[ch].astype(
return_dtype, copy=False
)
return
def calc_checksum(self, expanded=False):
"""
Calculate the checksum(s) of the input signal.
Parameters
----------
expanded : bool, optional
Whether to transform the `e_d_signal` attribute (True) or
the `d_signal` attribute (False).
Returns
-------
cs : list
The resulting checksum-ed signal.
"""
if expanded:
cs = [int(np.sum(s) % 65536) for s in self.e_d_signal]
else:
cs = np.sum(self.d_signal, 0) % 65536
cs = [int(c) for c in cs]
return cs
def wr_dat_files(self, expanded=False, write_dir=""):
"""
Write each of the specified dat files.
Parameters
----------
expanded : bool, optional
Whether to transform the `e_d_signal` attribute (True) or
the `d_signal` attribute (False).
write_dir : str, optional
The directory to write the output file to.
Returns
-------
N/A
"""
# Get the set of dat files to be written, and
# the channels to be written to each file.
file_names, dat_channels = describe_list_indices(self.file_name)
# Get the fmt and byte offset corresponding to each dat file
DAT_FMTS = {}
dat_offsets = {}
for fn in file_names:
DAT_FMTS[fn] = self.fmt[dat_channels[fn][0]]
# byte_offset may not be present
if self.byte_offset is None:
dat_offsets[fn] = 0
else:
dat_offsets[fn] = self.byte_offset[dat_channels[fn][0]]
# Write the dat files
if expanded:
for fn in file_names:
wr_dat_file(
fn,
DAT_FMTS[fn],
None,
dat_offsets[fn],
True,
[self.e_d_signal[ch] for ch in dat_channels[fn]],
[self.samps_per_frame[ch] for ch in dat_channels[fn]],
write_dir=write_dir,
)
else:
dsig = self.d_signal
for fn in file_names:
wr_dat_file(
fn,
DAT_FMTS[fn],
dsig[:, dat_channels[fn][0] : dat_channels[fn][-1] + 1],
dat_offsets[fn],
write_dir=write_dir,
)
def smooth_frames(self, sigtype="physical"):
"""
Convert expanded signals with different samples/frame into
a uniform numpy array.
Parameters
----------
sigtype (default='physical') : str
Specifies whether to mooth the e_p_signal field ('physical'), or the e_d_signal
field ('digital').
Returns
-------
signal : ndarray
Tranformed expanded signal into uniform signal.
"""
spf = self.samps_per_frame[:]
for ch in range(len(spf)):
if spf[ch] is None:
spf[ch] = 1
# The output data type should be the smallest type that can
# represent any input sample value. The intermediate data type
# must be able to represent the sum of spf[ch] sample values.
if sigtype == "physical":
expanded_signal = self.e_p_signal
intermediate_dtype = np.dtype("float64")
allowed_dtypes = [
np.dtype("float32"),
np.dtype("float64"),
]
elif sigtype == "digital":
expanded_signal = self.e_d_signal
intermediate_dtype = np.dtype("int64")
allowed_dtypes = [
np.dtype("int8"),
np.dtype("int16"),
np.dtype("int32"),
np.dtype("int64"),
]
else:
raise ValueError("sigtype must be 'physical' or 'digital'")
n_sig = len(expanded_signal)
sig_len = len(expanded_signal[0]) // spf[0]
input_dtypes = set()
for ch in range(n_sig):
if len(expanded_signal[ch]) != sig_len * spf[ch]:
raise ValueError(
"length mismatch: signal %d has %d samples,"
" expected %dx%d"
% (ch, len(expanded_signal), sig_len, spf[ch])
)
input_dtypes.add(expanded_signal[ch].dtype)
for output_dtype in allowed_dtypes:
if all(dt <= output_dtype for dt in input_dtypes):
break
signal = np.empty((sig_len, n_sig), dtype=output_dtype)
# Large input arrays will be processed in chunks to avoid the need
# to allocate a single huge temporary array.
CHUNK_SIZE = 65536
for ch in range(n_sig):
if spf[ch] == 1:
signal[:, ch] = expanded_signal[ch]
else:
frames = expanded_signal[ch].reshape(-1, spf[ch])
for chunk_start in range(0, sig_len, CHUNK_SIZE):
chunk_end = chunk_start + CHUNK_SIZE
signal_sum = np.sum(
frames[chunk_start:chunk_end, :],
axis=1,
dtype=intermediate_dtype,
)
signal[chunk_start:chunk_end, ch] = signal_sum / spf[ch]
return signal
# ------------------- Reading Signals -------------------#
def _rd_segment(
file_name,
dir_name,
pn_dir,
fmt,
n_sig,
sig_len,
byte_offset,
samps_per_frame,
skew,
init_value,
sampfrom,
sampto,
channels,
ignore_skew,
no_file=False,
sig_data=None,
return_res=64,
):
"""
Read the digital samples from a single segment record's associated
dat file(s).
Parameters
----------
file_name : list
The names of the dat files to be read.
dir_name : str
The full directory where the dat file(s) are located, if the dat
file(s) are local.
pn_dir : str
The PhysioNet directory where the dat file(s) are located, if
the dat file(s) are remote.
fmt : list
The formats of the dat files.
n_sig : int
The number of signals contained in the dat file.
sig_len : int
The signal length (per channel) of the dat file.
byte_offset : int
The byte offset of the dat file.
samps_per_frame : list
The samples/frame for each signal of the dat file.
skew : list
The skew for the signals of the dat file.
init_value : list
The initial value for each signal of the dat file.
sampfrom : int
The starting sample number to be read from the signals.
sampto : int
The final sample number to be read from the signals.
ignore_skew : bool
Used when reading records with at least one skewed signal.
Specifies whether to apply the skew to align the signals in the
output variable (False), or to ignore the skew field and load in
all values contained in the dat files unaligned (True).
no_file : bool, optional
Used when using this function with just an array of signal data
and no associated file to read the data from.
sig_data : ndarray, optional
The signal data that would normally be imported using the associated
.dat and .hea files. Should only be used when no_file is set to True.
return_res : int, optional
The numpy array dtype of the returned signals. Options are: 64,
32, 16, and 8, where the value represents the numpy int or float
dtype. Note that the value cannot be 8 when physical is True
since there is no float8 format.
Returns
-------
signals : list
The signals read from the dat file(s). Each signal is returned as a
one-dimensional numpy array.
Notes
-----
'channels', 'sampfrom', 'sampto', and 'ignore_skew' are user desired
input fields. All other parameters are specifications of the segment.
"""
# Check for valid inputs
if no_file and sig_data is None:
raise Exception("signal_dat empty: No signal data provided")
# Avoid changing outer variables
byte_offset = byte_offset[:]
samps_per_frame = samps_per_frame[:]
skew = skew[:]
init_value = init_value[:]
# Set defaults for empty fields
for i in range(n_sig):
if byte_offset[i] == None:
byte_offset[i] = 0
if samps_per_frame[i] == None:
samps_per_frame[i] = 1
if skew[i] == None:
skew[i] = 0
if init_value[i] == None:
init_value[i] = 0
# If skew is to be ignored, set all to 0
if ignore_skew:
skew = [0] * n_sig
# Get the set of dat files, and the
# channels that belong to each file.
file_name, datchannel = describe_list_indices(file_name)
# Some files will not be read depending on input channels.
# Get the the wanted fields only.
w_file_name = [] # one scalar per dat file
w_fmt = {} # one scalar per dat file
w_byte_offset = {} # one scalar per dat file
w_samps_per_frame = {} # one list per dat file
w_skew = {} # one list per dat file
w_init_value = {} # one list per dat file
w_channel = {} # one list per dat file
for fn in file_name:
# intersecting dat channels between the input channels and the channels of the file
idc = [c for c in datchannel[fn] if c in channels]
# There is at least one wanted channel in the dat file
if idc != []:
w_file_name.append(fn)
w_fmt[fn] = fmt[datchannel[fn][0]]
w_byte_offset[fn] = byte_offset[datchannel[fn][0]]
w_samps_per_frame[fn] = [samps_per_frame[c] for c in datchannel[fn]]
w_skew[fn] = [skew[c] for c in datchannel[fn]]
w_init_value[fn] = [init_value[c] for c in datchannel[fn]]
w_channel[fn] = idc
# Wanted dat channels, relative to the dat file itself
r_w_channel = {}
# The channels in the final output array that correspond to the read channels in each dat file
out_dat_channel = {}
for fn in w_channel:
r_w_channel[fn] = [c - min(datchannel[fn]) for c in w_channel[fn]]
out_dat_channel[fn] = [channels.index(c) for c in w_channel[fn]]
# Return each sample in signals with multiple samples/frame, without smoothing.
# Return a list of numpy arrays for each signal.
signals = [None] * len(channels)
for fn in w_file_name:
# Get the list of all signals contained in the dat file
datsignals = _rd_dat_signals(
file_name=fn,
dir_name=dir_name,
pn_dir=pn_dir,
fmt=w_fmt[fn],
n_sig=len(datchannel[fn]),
sig_len=sig_len,
byte_offset=w_byte_offset[fn],
samps_per_frame=w_samps_per_frame[fn],
skew=w_skew[fn],
init_value=w_init_value[fn],
sampfrom=sampfrom,
sampto=sampto,
no_file=no_file,
sig_data=sig_data,
)
# Copy over the wanted signals
for cn in range(len(out_dat_channel[fn])):
signals[out_dat_channel[fn][cn]] = datsignals[r_w_channel[fn][cn]]
return signals
def _rd_dat_signals(
file_name,
dir_name,
pn_dir,
fmt,
n_sig,
sig_len,
byte_offset,
samps_per_frame,
skew,
init_value,
sampfrom,
sampto,
no_file=False,
sig_data=None,
):
"""
Read all signals from a WFDB dat file.
Parameters
----------
file_name : str
The name of the dat file.
dir_name : str
The full directory where the dat file(s) are located, if the dat
file(s) are local.
pn_dir : str
The PhysioNet directory where the dat file(s) are located, if
the dat file(s) are remote.
fmt : str
The format of the dat file.
n_sig : int
The number of signals contained in the dat file.
sig_len : int
The signal length (per channel) of the dat file.
byte_offset : int
The byte offset of the dat file.
samps_per_frame : list
The samples/frame for each signal of the dat file.
skew : list
The skew for the signals of the dat file.
init_value : list
The initial value for each signal of the dat file.
sampfrom : int
The starting sample number to be read from the signals.
sampto : int
The final sample number to be read from the signals.
no_file : bool, optional
Used when using this function with just an array of signal data
and no associated file to read the data from.
sig_data : ndarray, optional
The signal data that would normally be imported using the associated
.dat and .hea files. Should only be used when no_file is set to True.
Returns
-------
signal : ndarray, list
The signals read from the dat file(s). Each signal is returned as a
one-dimensional numpy array.
Notes
-----
'channels', 'sampfrom', 'sampto', and 'ignore_skew' are user desired
input fields. All other parameters are specifications of the segment.
"""
# Check for valid inputs
if no_file and sig_data is None:
raise Exception("signal_dat empty: No signal data provided")
# Total number of samples per frame
tsamps_per_frame = sum(samps_per_frame)
# The signal length to read (per channel)
read_len = sampto - sampfrom
# Calculate parameters used to read and process the dat file
(
start_byte,
n_read_samples,
block_floor_samples,
extra_flat_samples,
nan_replace,
) = _dat_read_params(
fmt, sig_len, byte_offset, skew, tsamps_per_frame, sampfrom, sampto
)
# Number of bytes to be read from the dat file
total_read_bytes = _required_byte_num("read", fmt, n_read_samples)
# Total samples to be processed in intermediate step. Includes extra
# padded samples beyond dat file
total_process_samples = n_read_samples + extra_flat_samples
# Total number of bytes to be processed in intermediate step.
total_process_bytes = _required_byte_num("read", fmt, total_process_samples)
# Get the intermediate bytes or samples to process. Bit of a
# discrepancy. Recall special formats load uint8 bytes, other formats
# already load samples.
# Read values from dat file. Append bytes/samples if needed.
if no_file:
data_to_read = sig_data
elif fmt in COMPRESSED_FMTS:
data_to_read = _rd_compressed_file(
file_name=file_name,
dir_name=dir_name,
pn_dir=pn_dir,
fmt=fmt,
sample_offset=byte_offset,
n_sig=n_sig,
samps_per_frame=samps_per_frame,
start_frame=sampfrom,
end_frame=sampto,
)
else:
data_to_read = _rd_dat_file(
file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples
)
if extra_flat_samples:
if fmt in UNALIGNED_FMTS:
# Extra number of bytes to append onto the bytes read from
# the dat file.
n_extra_bytes = total_process_bytes - total_read_bytes
sig_data = np.concatenate(
(
data_to_read,
np.zeros(
n_extra_bytes, dtype=np.dtype(DATA_LOAD_TYPES[fmt])
),
)
)
else:
sig_data = np.concatenate(
(
data_to_read,
np.zeros(
extra_flat_samples, dtype=np.dtype(DATA_LOAD_TYPES[fmt])
),
)
)
else:
sig_data = data_to_read
# Finish processing the read data into proper samples if not already
# For unaligned fmts, turn the uint8 blocks into actual samples
if fmt in UNALIGNED_FMTS:
sig_data = _blocks_to_samples(sig_data, total_process_samples, fmt)
# Remove extra leading sample read within the byte block if any
if block_floor_samples:
sig_data = sig_data[block_floor_samples:]
# Adjust samples values for byte offset formats
if fmt in OFFSET_FMTS:
if fmt == "80":
sig_data = (sig_data.astype("int16") - 128).astype("int8")
elif fmt == "160":
sig_data = (sig_data.astype("int32") - 32768).astype("int16")
# For format 8, convert sample differences to absolute samples. Note
# that if sampfrom is not 0, the results will be wrong, since we can't
# know the starting value without reading the entire record from the
# beginning - an inherent limitation of the format, and the use of
# format 8 is discouraged for this reason! However, the following is
# consistent with the behavior of the WFDB library: the initial value
# specified by the header file is used as the starting sample value,
# regardless of where in the record we begin reading. Therefore, the
# following should give the same results as rdsamp.
if fmt == "8":
dif_frames = sig_data.reshape(-1, tsamps_per_frame)
abs_frames = np.empty(dif_frames.shape, dtype="int32")
ch_start = 0
for ch in range(n_sig):
ch_end = ch_start + samps_per_frame[ch]
# Extract sample differences as a 2D array
ch_dif_signal = dif_frames[:, ch_start:ch_end]
# Convert to a 1D array of absolute samples
ch_abs_signal = ch_dif_signal.cumsum(dtype=abs_frames.dtype)
ch_abs_signal += init_value[ch]
# Transfer to the output array
ch_abs_signal = ch_abs_signal.reshape(ch_dif_signal.shape)
abs_frames[:, ch_start:ch_end] = ch_abs_signal
ch_start = ch_end
sig_data = abs_frames.reshape(-1)
# At this point, dtype of sig_data is the minimum integer format
# required for storing the final digital samples.
# List of 1d numpy arrays
signal = []
# Transfer over samples
sig_frames = sig_data.reshape(-1, tsamps_per_frame)
ch_start = 0
for ch in range(n_sig):
ch_end = ch_start + samps_per_frame[ch]
ch_signal = sig_frames[:, ch_start:ch_end].reshape(-1)
signal.append(ch_signal)
ch_start = ch_end
# Skew the signal
signal = _skew_sig(
signal, skew, n_sig, read_len, fmt, nan_replace, samps_per_frame
)
# Integrity check of signal shape after reading
_check_sig_dims(signal, read_len, n_sig, samps_per_frame)
return signal
def _dat_read_params(
fmt, sig_len, byte_offset, skew, tsamps_per_frame, sampfrom, sampto
):
"""
Calculate the parameters used to read and process a dat file, given
its layout, and the desired sample range.
Parameters
----------
fmt : str
The format of the dat file.
sig_len : int
The signal length (per channel) of the dat file.
byte_offset : int
The byte offset of the dat file.
skew : list
The skew for the signals of the dat file.
tsamps_per_frame : int
The total samples/frame for all channels of the dat file.
sampfrom : int
The starting sample number to be read from the signals.
sampto : int
The final sample number to be read from the signals.
Returns
-------
start_byte : int
The starting byte to read the dat file from. Always points to
the start of a byte block for special formats.
n_read_samples : int
The number of flat samples to read from the dat file.
block_floor_samples : int
The extra samples read prior to the first desired sample, for
special formats, in order to ensure entire byte blocks are read.
extra_flat_samples : int
The extra samples desired beyond what is contained in the file.
nan_replace : list
The number of samples to replace with NAN at the end of each
signal, due to skew wanting samples beyond the file.
Examples
--------
sig_len=100, t = 4 (total samples/frame), skew = [0, 2, 4, 5]
sampfrom=0, sampto=100 --> read_len = 100, n_sampread = 100*t, extralen = 5, nan_replace = [0, 2, 4, 5]
sampfrom=50, sampto=100 --> read_len = 50, n_sampread = 50*t, extralen = 5, nan_replace = [0, 2, 4, 5]
sampfrom=0, sampto=50 --> read_len = 50, n_sampread = 55*t, extralen = 0, nan_replace = [0, 0, 0, 0]
sampfrom=95, sampto=99 --> read_len = 4, n_sampread = 5*t, extralen = 4, nan_replace = [0, 1, 3, 4]
"""
# First flat sample number to read (if all channels were flattened)
start_flat_sample = sampfrom * tsamps_per_frame
# Calculate the last flat sample number to read.
# Cannot exceed sig_len * tsamps_per_frame, the number of samples
# stored in the file. If extra 'samples' are desired by the skew,
# keep track.
# Where was the -sampfrom derived from? Why was it in the formula?
if (sampto + max(skew)) > sig_len:
end_flat_sample = sig_len * tsamps_per_frame
extra_flat_samples = (sampto + max(skew) - sig_len) * tsamps_per_frame
else:
end_flat_sample = (sampto + max(skew)) * tsamps_per_frame
extra_flat_samples = 0
# Adjust the starting sample number to read from start of blocks for special fmts.
# Keep track of how many preceeding samples are read, to be discarded later.
if fmt == "212":
# Samples come in groups of 2, in 3 byte blocks
block_floor_samples = start_flat_sample % 2
start_flat_sample = start_flat_sample - block_floor_samples
elif fmt in ["310", "311"]:
# Samples come in groups of 3, in 4 byte blocks
block_floor_samples = start_flat_sample % 3
start_flat_sample = start_flat_sample - block_floor_samples
else:
block_floor_samples = 0
# The starting byte to read from
start_byte = byte_offset + int(start_flat_sample * BYTES_PER_SAMPLE[fmt])
# The number of samples to read
n_read_samples = end_flat_sample - start_flat_sample
# The number of samples to replace with NAN at the end of each signal
# due to skew wanting samples beyond the file
nan_replace = [max(0, sampto + s - sig_len) for s in skew]
return (
start_byte,
n_read_samples,
block_floor_samples,
extra_flat_samples,
nan_replace,
)
def _required_byte_num(mode, fmt, n_samp):
"""
Determine how many signal bytes are needed to read or write a
number of desired samples from a dat file.
Parameters
----------
mode : str
Whether the file is to be read or written: 'read' or 'write'.
fmt : str
The WFDB dat format.
n_samp : int
The number of samples wanted.
Returns
-------
n_bytes : int
The number of bytes required to read or write the file.
Notes
-----
Read and write require the same number in most cases. An exception
is fmt 311 for n_extra==2.
"""
if fmt == "212":
n_bytes = math.ceil(n_samp * 1.5)
elif fmt in ["310", "311"]:
n_extra = n_samp % 3
if n_extra == 2:
if fmt == "310":
n_bytes = util.upround(n_samp * 4 / 3, 4)
# 311
else:
if mode == "read":
n_bytes = math.ceil(n_samp * 4 / 3)
# Have to write more bytes for WFDB c to work
else:
n_bytes = util.upround(n_samp * 4 / 3, 4)
# 0 or 1
else:
n_bytes = math.ceil(n_samp * 4 / 3)
else:
n_bytes = n_samp * BYTES_PER_SAMPLE[fmt]
return int(n_bytes)
def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
"""
Read data from a dat file, either local or remote, into a 1d numpy
array.
This is the lowest level dat reading function (along with
`_stream_dat` which this function may call), and is called by
`_rd_dat_signals`.
Parameters
----------
file_name : str
The name of the dat file.
dir_name : str
The full directory where the dat file(s) are located, if the dat
file(s) are local.
pn_dir : str
The PhysioNet directory where the dat file(s) are located, if
the dat file(s) are remote.
fmt : str
The format of the dat file.
start_byte : int
The starting byte number to read from.
n_samp : int
The total number of samples to read. Does NOT need to create
whole blocks for special format. Any number of samples should be
readable.
Returns
-------
sig_data : ndarray
The data read from the dat file. The dtype varies depending on
fmt. Byte aligned fmts are read in their final required format.
Unaligned formats are read as uint8 to be further processed.
Notes
-----
'channels', 'sampfrom', 'sampto', 'smooth_frames', and 'ignore_skew'
are user desired input fields. All other parameters are
specifications of the segment.
"""
# element_count is the number of elements to read using np.fromfile
# for local files
# byte_count is the number of bytes to read for streaming files
if fmt == "212":
byte_count = _required_byte_num("read", "212", n_samp)
element_count = byte_count
elif fmt in ["310", "311"]:
byte_count = _required_byte_num("read", fmt, n_samp)
element_count = byte_count
elif fmt == "24":
byte_count = n_samp * 3
element_count = byte_count
else:
element_count = n_samp
byte_count = n_samp * BYTES_PER_SAMPLE[fmt]
# Local dat file
if pn_dir is None:
with open(os.path.join(dir_name, file_name), "rb") as fp:
fp.seek(start_byte)
sig_data = np.fromfile(
fp, dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count
)
# Stream dat file from Physionet
else:
dtype_in = np.dtype(DATA_LOAD_TYPES[fmt])
sig_data = download._stream_dat(
file_name, pn_dir, byte_count, start_byte, dtype_in
)
return sig_data
def _blocks_to_samples(sig_data, n_samp, fmt):
"""
Convert uint8 blocks into signal samples for unaligned dat formats.
Parameters
----------
sig_data : ndarray
The uint8 data blocks.
n_samp : int
The number of samples contained in the bytes.
fmt : list
The formats of the dat files.
Returns
-------
sig : ndarray
The numpy array of digital samples.
"""
if fmt == "212":
# Easier to process when dealing with whole blocks
if n_samp % 2:
n_samp += 1
added_samps = 1
sig_data = np.append(sig_data, np.zeros(1, dtype="uint8"))
else:
added_samps = 0
sig_data = sig_data.astype("int16")
sig = np.zeros(n_samp, dtype="int16")
# One sample pair is stored in one byte triplet.
# Even numbered samples
sig[0::2] = sig_data[0::3] + 256 * np.bitwise_and(sig_data[1::3], 0x0F)
# Odd numbered samples (len(sig) always > 1 due to processing of
# whole blocks)
sig[1::2] = sig_data[2::3] + 256 * np.bitwise_and(
sig_data[1::3] >> 4, 0x0F
)
# Remove trailing sample read within the byte block if
# originally odd sampled
if added_samps:
sig = sig[:-added_samps]
# Loaded values as un_signed. Convert to 2's complement form:
# values > 2^11-1 are negative.
sig[sig > 2047] -= 4096
elif fmt == "310":
sig_data = sig_data.astype("int16")
sig = np.zeros(n_samp, dtype="int16")
# One sample triplet is stored in one byte quartet
# First sample is 7 msb of first byte and 3 lsb of second byte.
sig[0::3] = (sig_data[0::4] >> 1)[
0 : len(sig[0::3])
] + 128 * np.bitwise_and(sig_data[1::4], 0x07)[0 : len(sig[0::3])]
# Second signal is 7 msb of third byte and 3 lsb of forth byte
sig[1::3] = (sig_data[2::4] >> 1)[
0 : len(sig[1::3])
] + 128 * np.bitwise_and(sig_data[3::4], 0x07)[0 : len(sig[1::3])]
# Third signal is 5 msb of second byte and 5 msb of forth byte
sig[2::3] = (
np.bitwise_and((sig_data[1::4] >> 3), 0x1F)[0 : len(sig[2::3])]
+ 32 * np.bitwise_and(sig_data[3::4] >> 3, 0x1F)[0 : len(sig[2::3])]
)
# Loaded values as un_signed. Convert to 2's complement form:
# values > 2^9-1 are negative.
sig[sig > 511] -= 1024
elif fmt == "311":
sig_data = sig_data.astype("int16")
sig = np.zeros(n_samp, dtype="int16")
# One sample triplet is stored in one byte quartet
# First sample is first byte and 2 lsb of second byte.
sig[0::3] = (
sig_data[0::4][0 : len(sig[0::3])]
+ 256 * np.bitwise_and(sig_data[1::4], 0x03)[0 : len(sig[0::3])]
)
# Second sample is 6 msb of second byte and 4 lsb of third byte
sig[1::3] = (sig_data[1::4] >> 2)[
0 : len(sig[1::3])
] + 64 * np.bitwise_and(sig_data[2::4], 0x0F)[0 : len(sig[1::3])]
# Third sample is 4 msb of third byte and 6 msb of forth byte
sig[2::3] = (sig_data[2::4] >> 4)[
0 : len(sig[2::3])
] + 16 * np.bitwise_and(sig_data[3::4], 0x7F)[0 : len(sig[2::3])]
# Loaded values as un_signed. Convert to 2's complement form.
# Values > 2^9-1 are negative.
sig[sig > 511] -= 1024
elif fmt == "24":
# The following is equivalent to:
# sig = (sig_data[2::3].view('int8').astype('int32') * 65536
# + sig_data[1::3].astype('uint16') * 256
# + sig_data[0::3])
# Treat the high byte as signed and shift it by 16 bits.
sig = np.left_shift(sig_data[2::3].view("int8"), 16, dtype="int32")
# Directly copy the low and middle bytes.
if sys.byteorder == "little":
sig.view("uint8")[0::4] = sig_data[0::3]
sig.view("uint8")[1::4] = sig_data[1::3]
elif sys.byteorder == "big":
sig.view("uint8")[3::4] = sig_data[0::3]
sig.view("uint8")[2::4] = sig_data[1::3]
else:
raise NotImplementedError
return sig
def _rd_compressed_file(
file_name,
dir_name,
pn_dir,
fmt,
sample_offset,
n_sig,
samps_per_frame,
start_frame,
end_frame,
):
"""
Read data from a compressed file into a 1D numpy array.
Parameters
----------
file_name : str
The name of the signal file.
dir_name : str
The full directory where the signal file is located, if local.
This argument is ignored if `pn_dir` is not None.
pn_dir : str or None
The PhysioNet database directory where the signal file is located.
fmt : str
The format code of the signal file.
sample_offset : int
The sample number in the signal file corresponding to sample 0 of
the WFDB record.
n_sig : int
The number of signals in the file.
samps_per_frame : list
The number of samples per frame for each signal in the file.
start_frame : int
The starting frame number to read.
end_frame : int
The ending frame number to read.
Returns
-------
signal : ndarray
The data read from the signal file. This is a one-dimensional
array in the same order the samples would be stored in a binary
signal file; `signal[(i*n_sig+j)*samps_per_frame[0]+k]` is sample
number `i*samps_per_frame[0]+k` of signal `j`.
Notes
-----
Converting the output array into "dat file order" here is inefficient,
but necessary to match the behavior of _rd_dat_file. It would be
better to reorganize _rd_dat_signals to make the reshaping unnecessary.
"""
import soundfile
if any(spf != samps_per_frame[0] for spf in samps_per_frame):
raise ValueError(
"All channels in a FLAC signal file must have the same "
"sampling rate and samples per frame"
)
if pn_dir is None:
file_name = os.path.join(dir_name, file_name)
with _coreio._open_file(pn_dir, file_name, "rb") as fp:
signature = fp.read(4)
if signature != b"fLaC":
raise ValueError(f"{fp.name} is not a FLAC file")
fp.seek(0)
with soundfile.SoundFile(fp) as sf:
# Determine the actual resolution of the FLAC stream and the
# data type will use when reading it. Note that soundfile
# doesn't support int8.
if sf.subtype == "PCM_S8":
format_bits = 8
read_dtype = "int16"
elif sf.subtype == "PCM_16":
format_bits = 16
read_dtype = "int16"
elif sf.subtype == "PCM_24":
format_bits = 24
read_dtype = "int32"
else:
raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})")
max_bits = int(fmt) - 500
if format_bits > max_bits:
raise ValueError(
f"wrong resolution in {fp.name} "
f"({format_bits}, expected <= {max_bits})"
)
if sf.channels != n_sig:
raise ValueError(
f"wrong number of channels in {fp.name} "
f"({sf.channels}, expected {n_sig})"
)
# Read the samples.
start_samp = start_frame * samps_per_frame[0]
end_samp = end_frame * samps_per_frame[0]
sf.seek(start_samp + sample_offset)
# We could do this:
# sig_data = sf.read(end_samp - start_samp, dtype=read_dtype)
# However, sf.read fails for huge blocks (over 2**24 total
# samples) due to a bug in libsndfile:
# https://github.com/libsndfile/libsndfile/issues/431
# So read the data in chunks instead.
n_samp = end_samp - start_samp
sig_data = np.empty((n_samp, n_sig), dtype=read_dtype)
CHUNK_SIZE = 1024 * 1024
for chunk_start in range(0, n_samp, CHUNK_SIZE):
chunk_end = chunk_start + CHUNK_SIZE
chunk_data = sf.read(out=sig_data[chunk_start:chunk_end])
samples_read = chunk_data.shape[0]
if samples_read != CHUNK_SIZE:
sig_data = sig_data[: chunk_start + samples_read]
break
# If we read an 8-bit stream as int16 or a 24-bit stream as
# int32, soundfile shifts each sample left by 8 bits. We
# want to undo this shift (and, in the case of 8-bit data,
# convert to an int8 array.)
if format_bits == 8:
# np.right_shift(sig_data, 8, dtype='int8') doesn't work.
# This seems wrong, but the numpy documentation is unclear.
sig_data2 = np.empty(sig_data.shape, dtype="int8")
sig_data = np.right_shift(sig_data, 8, out=sig_data2)
elif format_bits == 24:
# Shift 32-bit array in-place.
np.right_shift(sig_data, 8, out=sig_data)
# Suppose we have 3 channels and 2 samples per frame. The array
# returned by sf.read looks like this:
#
# channel 0 channel 1 channel 2
# time 0 [0,0] [0,1] [0,2]
# time 1 [1,0] [1,1] [1,2]
# time 2 [2,0] [2,1] [2,2]
# time 3 [3,0] [3,1] [3,2]
#
# We reshape this first into the following:
#
# channel 0 channel 1 channel 2
# time 0 [0,0,0] [0,0,1] [0,0,2]
# time 1 [0,1,0] [0,1,1] [0,1,2]
# time 2 [1,0,0] [1,0,1] [1,0,2]
# time 3 [1,1,0] [1,1,1] [1,1,2]
#
# Then we transpose axes 1 and 2:
#
# channel 0 channel 1 channel 2
# time 0 [0,0,0] [0,1,0] [0,2,0]
# time 1 [0,0,1] [0,1,1] [0,2,1]
# time 2 [1,0,0] [1,1,0] [1,2,0]
# time 3 [1,0,1] [1,1,1] [1,2,1]
#
# Then when we reshape the array to 1D, the result is in dat file
# order:
#
# channel 0 channel 1 channel 2
# time 0 [0] [2] [4]
# time 1 [1] [3] [5]
# time 2 [6] [8] [10]
# time 3 [7] [9] [11]
sig_data = sig_data.reshape(-1, samps_per_frame[0], n_sig)
sig_data = sig_data.transpose(0, 2, 1)
return sig_data.reshape(-1)
def _skew_sig(
sig, skew, n_sig, read_len, fmt, nan_replace, samps_per_frame=None
):
"""
Skew the signal, insert nans, and shave off end of array if needed.
Parameters
----------
sig : ndarray
The original signal.
skew : list
List of samples to skew for each signal.
n_sig : int
The number of signals.
read_len : int
The total number of samples: Calculated by `sampto - sampfrom`
fmt : list
The formats of the dat files.
nan_replace : list
The indices to replace values with NAN.
samps_per_frame : list, optional
The number of samples of the orignal signal per channel.
Returns
-------
sig : ndarray
The new skewed and trimmed signal.
Notes
-----
`fmt` is just for the correct NAN value.
`samps_per_frame` is only used for skewing expanded signals.
"""
if max(skew) > 0:
# Expanded frame samples. List of arrays.
if isinstance(sig, list):
# Shift the channel samples
for ch in range(n_sig):
if skew[ch] > 0:
sig[ch][: read_len * samps_per_frame[ch]] = sig[ch][
skew[ch] * samps_per_frame[ch] :
]
# Shave off the extra signal length at the end
for ch in range(n_sig):
sig[ch] = sig[ch][: read_len * samps_per_frame[ch]]
# Insert nans where skewed signal overran dat file
for ch in range(n_sig):
if nan_replace[ch] > 0:
sig[ch][-nan_replace[ch] :] = _digi_nan(fmt)
# Uniform array
else:
# Shift the channel samples
for ch in range(n_sig):
if skew[ch] > 0:
sig[:read_len, ch] = sig[skew[ch] :, ch]
# Shave off the extra signal length at the end
sig = sig[:read_len, :]
# Insert nans where skewed signal overran dat file
for ch in range(n_sig):
if nan_replace[ch] > 0:
sig[-nan_replace[ch] :, ch] = _digi_nan(fmt)
return sig
def _check_sig_dims(sig, read_len, n_sig, samps_per_frame):
"""
Integrity check of a signal's shape after reading.
Parameters
----------
sig : ndarray
The original signal.
read_len : int
The signal length to read per channel. Calculated
by `sampto - sampfrom`.
n_sig : int
The number of signals.
samps_per_frame : list
The number of samples of the orignal signal per channel.
Returns
-------
N/A
"""
if isinstance(sig, np.ndarray):
if sig.shape != (read_len, n_sig):
raise ValueError("Samples were not loaded correctly")
else:
if len(sig) != n_sig:
raise ValueError("Samples were not loaded correctly")
for ch in range(n_sig):
if len(sig[ch]) != samps_per_frame[ch] * read_len:
raise ValueError("Samples were not loaded correctly")
# ------------------- /Reading Signals -------------------#
def _digi_bounds(fmt):
"""
Return min and max digital values for each format type.
Parmeters
---------
fmt : str, list
The WFDB dat format, or a list of them.
Returns
-------
tuple (int, int)
The min and max WFDB digital value per format type.
"""
if isinstance(fmt, list):
return [_digi_bounds(f) for f in fmt]
return SAMPLE_VALUE_RANGE[fmt]
def _digi_nan(fmt):
"""
Return the WFDB digital value used to store NAN for the format type.
Parmeters
---------
fmt : str, list
The WFDB dat format, or a list of them.
Returns
-------
int
The WFDB digital value per format type.
"""
if isinstance(fmt, list):
return [_digi_nan(f) for f in fmt]
return INVALID_SAMPLE_VALUE[fmt]
def est_res(signals):
"""
Estimate the resolution of each signal in a multi-channel signal in
bits. Maximum of 32 bits.
Parameters
----------
signals : ndarray, list
A 2d numpy array representing a uniform multichannel signal, or
a list of 1d numpy arrays representing multiple channels of
signals with different numbers of samples per frame.
Returns
-------
res : list
A list of estimated integer resolutions for each channel.
"""
res_levels = np.power(2, np.arange(0, 33))
# Expanded sample signals. List of numpy arrays
if isinstance(signals, list):
n_sig = len(signals)
# Uniform numpy array
else:
if signals.ndim == 1:
n_sig = 1
else:
n_sig = signals.shape[1]
res = []
for ch in range(n_sig):
# Estimate the number of steps as the range divided by the
# minimum increment.
if isinstance(signals, list):
sorted_sig = np.sort(np.unique(signals[ch]))
else:
if signals.ndim == 1:
sorted_sig = np.sort(np.unique(signals))
else:
sorted_sig = np.sort(np.unique(signals[:, ch]))
min_inc = min(np.diff(sorted_sig))
if min_inc == 0:
# Case where signal is flat. Resolution is 0.
res.append(0)
else:
nlevels = 1 + (sorted_sig[-1] - sorted_sig[0]) / min_inc
if nlevels >= res_levels[-1]:
res.append(32)
else:
res.append(np.where(res_levels >= nlevels)[0][0])
return res
def _wfdb_fmt(bit_res, single_fmt=True):
"""
Return the most suitable WFDB format(s) to use given signal
resolutions.
Parameters
----------
bit_res : int, list
The resolution of the signal, or a list of resolutions, in bits.
single_fmt : bool, optional
Whether to return the format for the maximum resolution signal.
Returns
-------
fmt : str, list
The most suitable WFDB format(s) used to encode the signal(s).
"""
if isinstance(bit_res, list):
# Return a single format
if single_fmt:
bit_res = [max(bit_res)] * len(bit_res)
return [_wfdb_fmt(r) for r in bit_res]
if bit_res <= 8:
return "80"
elif bit_res <= 12:
return "212"
elif bit_res <= 16:
return "16"
elif bit_res <= 24:
return "24"
else:
return "32"
def _fmt_res(fmt, max_res=False):
"""
Return the resolution of the WFDB dat format(s). Uses the BIT_RES
dictionary, but accepts lists and other options.
Parameters
----------
fmt : str
The WFDB format. Can be a list of valid fmts. If it is a list,
and `max_res` is True, the list may contain None.
max_res : bool, optional
If given a list of fmts, whether to return the highest
resolution.
Returns
-------
bit_res : int, list
The resolution(s) of the dat format(s) in bits.
"""
if isinstance(fmt, list):
if max_res:
# Allow None
bit_res = np.max([_fmt_res(f) for f in fmt if f is not None])
else:
bit_res = [_fmt_res(f) for f in fmt]
return bit_res
return BIT_RES[fmt]
def _np_dtype(bit_res, discrete):
"""
Given the bit resolution of a signal, return the minimum numpy dtype
used to store it.
Parameters
----------
bit_res : int
The bit resolution.
discrete : bool
Whether the dtype is to be int or float.
Returns
-------
dtype : str
String numpy dtype used to store the signal of the given
resolution.
"""
bit_res = min(bit_res, 64)
for np_res in [8, 16, 32, 64]:
if bit_res <= np_res:
break
if discrete:
return "int" + str(np_res)
else:
# No float8 dtype
return "float" + str(max(np_res, 16))
def wr_dat_file(
file_name,
fmt,
d_signal,
byte_offset,
expanded=False,
e_d_signal=None,
samps_per_frame=None,
write_dir="",
):
"""
Write a dat file. All bytes are written one at a time to avoid
endianness issues.
Parameters
----------
file_name : str
Name of the dat file.
fmt : str
WFDB fmt of the dat file.
d_signal : ndarray
The digital conversion of the signal, as a 2d numpy array.
byte_offset : int
The byte offset of the dat file.
expanded : bool, optional
Whether to transform the `e_d_signal` attribute (True) or
the `d_signal` attribute (False).
e_d_signal : ndarray, optional
The expanded digital conversion of the signal, as a list of 1d
numpy arrays.
samps_per_frame : list, optional
The samples/frame for each signal of the dat file.
write_dir : str, optional
The directory to write the output file to.
Returns
-------
N/A
"""
file_path = os.path.join(write_dir, file_name)
# Combine list of arrays into single array
if expanded:
n_sig = len(e_d_signal)
if len(samps_per_frame) != n_sig:
raise ValueError("mismatch between samps_per_frame and e_d_signal")
sig_len = len(e_d_signal[0]) // samps_per_frame[0]
for sig, spf in zip(e_d_signal, samps_per_frame):
if len(sig) != sig_len * spf:
raise ValueError("mismatch in lengths of expanded signals")
# Effectively create MxN signal, with extra frame samples acting
# like extra channels
d_signal = np.zeros((sig_len, sum(samps_per_frame)), dtype="int64")
# Counter for channel number
expand_ch = 0
for ch in range(n_sig):
spf = samps_per_frame[ch]
for framenum in range(spf):
d_signal[:, expand_ch] = e_d_signal[ch][framenum::spf]
expand_ch = expand_ch + 1
else:
# Create a copy to prevent overwrite
d_signal = d_signal.copy()
# Non-expanded format always has 1 sample per frame
n_sig = d_signal.shape[1]
samps_per_frame = [1] * n_sig
# Total number of samples per frame (equal to number of signals if
# expanded=False, but may be greater for expanded=True)
tsamps_per_frame = d_signal.shape[1]
if fmt == "80":
# convert to 8 bit offset binary form
d_signal = d_signal + 128
# Concatenate into 1D
d_signal = d_signal.reshape(-1)
# Convert to un_signed 8 bit dtype to write
b_write = d_signal.astype("uint8")
elif fmt == "212":
# Each sample is represented by a 12 bit two's complement
# amplitude. The first sample is obtained from the 12 least
# significant bits of the first byte pair (stored least
# significant byte first). The second sample is formed from the
# 4 remaining bits of the first byte pair (which are the 4 high
# bits of the 12-bit sample) and the next byte (which contains
# the remaining 8 bits of the second sample). The process is
# repeated for each successive pair of samples.
# convert to 12 bit two's complement
d_signal[d_signal < 0] = d_signal[d_signal < 0] + 4096
# Concatenate into 1D
d_signal = d_signal.reshape(-1)
n_samp = len(d_signal)
# use this for byte processing
processn_samp = n_samp
# Odd numbered number of samples. Fill in extra blank for
# following byte calculation.
if processn_samp % 2:
d_signal = np.concatenate([d_signal, np.array([0])])
processn_samp += 1
# The individual bytes to write
b_write = np.zeros([int(1.5 * processn_samp)], dtype="uint8")
# Fill in the byte triplets
# Triplet 1 from lowest 8 bits of sample 1
b_write[0::3] = d_signal[0::2] & 255
# Triplet 2 from highest 4 bits of samples 1 (lower) and 2 (upper)
b_write[1::3] = ((d_signal[0::2] & 3840) >> 8) + (
(d_signal[1::2] & 3840) >> 4
)
# Triplet 3 from lowest 8 bits of sample 2
b_write[2::3] = d_signal[1::2] & 255
# If we added an extra sample for byte calculation, remove the last byte (don't write)
if n_samp % 2:
b_write = b_write[:-1]
elif fmt == "16":
# convert to 16 bit two's complement
d_signal[d_signal < 0] = d_signal[d_signal < 0] + 65536
# Split samples into separate bytes using binary masks
b1 = d_signal & [255] * tsamps_per_frame
b2 = (d_signal & [65280] * tsamps_per_frame) >> 8
# Interweave the bytes so that the same samples' bytes are consecutive
b1 = b1.reshape((-1, 1))
b2 = b2.reshape((-1, 1))
b_write = np.concatenate((b1, b2), axis=1)
b_write = b_write.reshape((1, -1))[0]
# Convert to un_signed 8 bit dtype to write
b_write = b_write.astype("uint8")
elif fmt == "24":
# convert to 24 bit two's complement
d_signal[d_signal < 0] = d_signal[d_signal < 0] + 16777216
# Split samples into separate bytes using binary masks
b1 = d_signal & [255] * tsamps_per_frame
b2 = (d_signal & [65280] * tsamps_per_frame) >> 8
b3 = (d_signal & [16711680] * tsamps_per_frame) >> 16
# Interweave the bytes so that the same samples' bytes are consecutive
b1 = b1.reshape((-1, 1))
b2 = b2.reshape((-1, 1))
b3 = b3.reshape((-1, 1))
b_write = np.concatenate((b1, b2, b3), axis=1)
b_write = b_write.reshape((1, -1))[0]
# Convert to un_signed 8 bit dtype to write
b_write = b_write.astype("uint8")
elif fmt == "32":
# convert to 32 bit two's complement
d_signal[d_signal < 0] = d_signal[d_signal < 0] + 4294967296
# Split samples into separate bytes using binary masks
b1 = d_signal & [255] * tsamps_per_frame
b2 = (d_signal & [65280] * tsamps_per_frame) >> 8
b3 = (d_signal & [16711680] * tsamps_per_frame) >> 16
b4 = (d_signal & [4278190080] * tsamps_per_frame) >> 24
# Interweave the bytes so that the same samples' bytes are consecutive
b1 = b1.reshape((-1, 1))
b2 = b2.reshape((-1, 1))
b3 = b3.reshape((-1, 1))
b4 = b4.reshape((-1, 1))
b_write = np.concatenate((b1, b2, b3, b4), axis=1)
b_write = b_write.reshape((1, -1))[0]
# Convert to un_signed 8 bit dtype to write
b_write = b_write.astype("uint8")
elif fmt in ("508", "516", "524"):
import soundfile
if any(spf != samps_per_frame[0] for spf in samps_per_frame):
raise ValueError(
"All channels in a FLAC signal file must have the same "
"sampling rate and samples per frame"
)
if n_sig > 8:
raise ValueError(
"A single FLAC signal file cannot contain more than 8 channels"
)
d_signal = d_signal.reshape(-1, n_sig, samps_per_frame[0])
d_signal = d_signal.transpose(0, 2, 1)
d_signal = d_signal.reshape(-1, n_sig)
if fmt == "508":
d_signal = d_signal.astype("int16")
np.left_shift(d_signal, 8, out=d_signal)
subtype = "PCM_S8"
elif fmt == "516":
d_signal = d_signal.astype("int16")
subtype = "PCM_16"
elif fmt == "524":
d_signal = d_signal.astype("int32")
np.left_shift(d_signal, 8, out=d_signal)
subtype = "PCM_24"
else:
raise ValueError(f"unknown format ({fmt})")
sf = soundfile.SoundFile(
file_path,
mode="w",
samplerate=96000,
channels=n_sig,
subtype=subtype,
format="FLAC",
)
with sf:
sf.write(d_signal)
return
else:
raise ValueError(
"This library currently only supports writing the "
"following formats: 80, 16, 24, 32, 508, 516, 524"
)
# Byte offset in the file
if byte_offset is not None and byte_offset > 0:
print(
"Writing file "
+ file_name
+ " with "
+ str(byte_offset)
+ " empty leading bytes"
)
b_write = np.append(np.zeros(byte_offset, dtype="uint8"), b_write)
# Write the bytes to the file
with open(file_path, "wb") as f:
b_write.tofile(f)
def describe_list_indices(full_list):
"""
Describe the indices of the given list.
Parameters
----------
full_list : list
The list of items to order.
Returns
-------
unique_elements : list
A list of the unique elements of the list, in the order in which
they first appear.
element_indices : dict
A dictionary of lists for each unique element, giving all the
indices in which they appear in the original list.
"""
unique_elements = []
element_indices = {}
for i in range(len(full_list)):
item = full_list[i]
# new item
if item not in unique_elements:
unique_elements.append(item)
element_indices[item] = [i]
# previously seen item
else:
element_indices[item].append(i)
return unique_elements, element_indices
def _infer_sig_len(
file_name, fmt, tsamps_per_frame, byte_offset, dir_name, pn_dir=None
):
"""
Infer the length of a signal from a dat file.
Parameters
----------
file_name : str
Name of the dat file.
fmt : str
WFDB fmt of the dat file.
tsamps_per_frame : int
Total number of samples per frame contained in the dat file.
byte_offset : int or None
The byte offset of the dat file. None is equivalent to zero.
dir_name : str
The full directory where the dat file(s) are located, if the dat
file(s) are local.
pn_dir : str, optional
The PhysioNet directory where the dat file(s) are located, if
the dat file(s) are remote.
Returns
-------
sig_len : int
The length of the signal file in frames.
Notes
-----
sig_len * tsamps_per_frame * bytes_per_sample == file_size
"""
if pn_dir is None:
file_size = os.path.getsize(os.path.join(dir_name, file_name))
else:
file_size = download._remote_file_size(
file_name=file_name, pn_dir=pn_dir
)
if byte_offset is None:
byte_offset = 0
data_size = file_size - byte_offset
sig_len = int(data_size / (BYTES_PER_SAMPLE[fmt] * tsamps_per_frame))
return sig_len