Waveform Database Software Package (WFDB) for Python 4.1.0
(12,271 bytes)
import datetime
import os
import struct
import numpy as np
from wfdb.io.record import rdrecord
def wfdb_to_mat(
record_name, pn_dir=None, sampfrom=0, sampto=None, channels=None
):
"""
This program converts the signals of any PhysioNet record (or one in any
compatible format) into a .mat file that can be read directly using any version
of Matlab, and a short text file containing information about the signals
(names, gains, baselines, units, sampling frequency, and start time/date if
known). If the input record name is REC, the output files are RECm.mat and
RECm.hea. The output files can also be read by any WFDB application as record
RECm.
This program does not convert annotation files; for that task, 'rdann' is
recommended.
The output .mat file contains a single matrix named `val` containing raw
(unshifted, unscaled) samples from the selected record. Using various options,
you can select any time interval within a record, or any subset of the signals,
which can be rearranged as desired within the rows of the matrix. Since .mat
files are written in column-major order (i.e., all of column n precedes all of
column n+1), each vector of samples is written as a column rather than as a
row, so that the column number in the .mat file equals the sample number in the
input record (minus however many samples were skipped at the beginning of the
record, as specified using the `start_time` option). If this seems odd, transpose
your matrix after reading it!
This program writes version 5 MAT-file format output files, as documented in
http://www.mathworks.com/access/helpdesk/help/pdf_doc/matlab/matfile_format.pdf
The samples are written as 32-bit signed integers (mattype=20 below) in
little-endian format if the record contains any format 24 or format 32 signals,
as 8-bit unsigned integers (mattype=50) if the record contains only format 80
signals, or as 16-bit signed integers in little-endian format (mattype=30)
otherwise.
The maximum size of the output variable is 2^31 bytes. `wfdb2mat` from versions
10.5.24 and earlier of the original WFDB software package writes version 4 MAT-
files which have the additional constraint of 100,000,000 elements per variable.
The output files (recordm.mat + recordm.hea) are still WFDB-compatible, given
the .hea file constructed by this program.
Parameters
----------
record_name : str
The name of the input WFDB record to be read.
pn_dir : str, optional
Option used to stream data from Physionet. The Physionet
database directory from which to find the required record files.
eg. For record '100' in 'http://physionet.org/content/mitdb'
pn_dir='mitdb'.
sampfrom : int, optional
The starting sample number to read for all channels.
sampto : int, 'end', optional
The sample number at which to stop reading for all channels.
Reads the entire duration by default.
channels : list, optional
List of integer indices specifying the channels to be read.
Reads all channels by default.
Returns
-------
N/A
Notes
-----
The entire file is composed of:
Bytes 0 - 127: descriptive text
Bytes 128 - 131: master tag (data type = matrix)
Bytes 132 - 135: master tag (data size)
Bytes 136 - 151: array flags (4 byte tag with data type, 4 byte
tag with subelement size, 8 bytes of content)
Bytes 152 - 167: array dimension (4 byte tag with data type, 4
byte tag with subelement size, 8 bytes of content)
Bytes 168 - 183: array name (4 byte tag with data type, 4 byte
tag with subelement size, 8 bytes of content)
Bytes 184 - ...: array content (4 byte tag with data type, 4 byte
tag with subelement size, ... bytes of content)
Examples
--------
>>> wfdb2mat('100', pn_dir='pwave')
The output file name is 100m.mat and 100m.hea
"""
record = rdrecord(
record_name, pn_dir=pn_dir, sampfrom=sampfrom, sampto=sampto
)
record_name_out = record_name.split(os.sep)[-1].replace("-", "_") + "m"
# Some variables describing the format of the .mat file
field_version = 256 # 0x0100 or 256
endian_indicator = b"IM" # little endian
master_type = 14 # matrix
sub1_type = 6 # UINT32
sub2_type = 5 # INT32
sub3_type = 1 # INT8
sub1_class = 6 # double precision array
# Determine if we can write 8-bit unsigned samples, or if 16 or 32 bits
# are needed per sample
bytes_per_element = 1
for i in range(record.n_sig):
if record.adc_res[i] > 0:
if record.adc_res[i] > 16:
bytes_per_element = 4
elif (record.adc_res[i] > 8) and (bytes_per_element < 2):
bytes_per_element = 2
else:
# adc_res not specified.. try to guess from format
if (record.fmt[i] == "24") or (record.fmt[i] == "32"):
bytes_per_element = 4
elif (record.fmt[i] != "80") and (bytes_per_element < 2):
bytes_per_element = 2
if bytes_per_element == 1:
sub4_type = 2 # MAT8
out_type = "<u1" # np.uint8
wfdb_type = "80" # Offset binary form (80)
offset = 128 # Offset between sample values and the raw
# byte/word values as interpreted by Matlab/Octave
elif bytes_per_element == 2:
sub4_type = 3 # MAT16
out_type = "<i2" # np.int16
wfdb_type = "16" # Align with byte boundary (16)
offset = 0 # Offset between sample values and the raw
# byte/word values as interpreted by Matlab/Octave
else:
sub4_type = 5 # MAT32
out_type = "<i4" # np.int32
wfdb_type = "32" # Align with byte boundary (32)
offset = 0 # Offset between sample values and the raw
# byte/word values as interpreted by Matlab/Octave
# Ensure the signal size does not exceed the 2^31 byte limit
max_length = int((2**31) / bytes_per_element / record.n_sig)
if sampto is None:
sampto = record.p_signal.shape[0]
desired_length = sampto - sampfrom
# Snip record
if desired_length > max_length:
raise Exception("Can't write .mat file: data size exceeds 2GB limit")
# Bytes of actual data
bytes_of_data = bytes_per_element * record.n_sig * desired_length
# This is the remaining number of bytes that don't fit into integer
# multiple of 8: i.e. if 18 bytes, bytes_remain = 2, from 17 to 18
bytes_remain = bytes_of_data % 8
# master_bytes = (8 + 8) + (8 + 8) + (8 + 8) + (8 + bytes_of_data) + padding
# Must be integer multiple 8
if bytes_remain == 0:
master_bytes = bytes_of_data + 56
else:
master_bytes = bytes_of_data + 64 - (bytes_remain)
# Start writing the file
output_file = record_name_out + ".mat"
with open(output_file, "wb") as f:
# Descriptive text (124 bytes)
f.write(struct.pack("<124s", b"MATLAB 5.0"))
# Version (2 bytes)
f.write(struct.pack("<H", field_version))
# Endian indicator (2 bytes)
f.write(struct.pack("<2s", endian_indicator))
# Master tag data type (4 bytes)
f.write(struct.pack("<I", master_type))
# Master tag number of bytes (4 bytes)
# Number of bytes of data element
# = (8 + 8) + (8 + 8) + (8 + 8) + (8 + bytes_of_data)
# = 56 + bytes_of_data
f.write(struct.pack("<I", master_bytes))
# Matrix data has 4 subelements (5 if imaginary):
# Array flags, dimensions array, array name, real part
# Each subelement has its own subtag, and subdata
# Subelement 1: Array flags
# Subtag 1: data type (4 bytes)
f.write(struct.pack("<I", sub1_type))
# Subtag 1: number of bytes (4 bytes)
f.write(struct.pack("<I", 8))
# Value class indication the MATLAB data type (8 bytes)
f.write(struct.pack("<Q", sub1_class))
# Subelement 2: Rows and columns
# Subtag 2: data type (4 bytes)
f.write(struct.pack("<I", sub2_type))
# Subtag 2: number of bytes (4 bytes)
f.write(struct.pack("<I", 8))
# Number of signals (4 bytes)
f.write(struct.pack("<I", record.n_sig))
# Number of rows (4 bytes)
f.write(struct.pack("<I", desired_length))
# Subelement 3: Array name
# Subtag 3: data type (4 bytes)
f.write(struct.pack("<I", sub3_type))
# Subtag 3: number of bytes (4 bytes)
f.write(struct.pack("<I", 3))
# Subtag 3: name of the array (8 bytes)
f.write(struct.pack("<8s", b"val"))
# Subelement 4: Signal data
# Subtag 4: data type (4 bytes)
f.write(struct.pack("<I", sub4_type))
# Subtag 4: number of bytes (4 bytes)
f.write(struct.pack("<I", bytes_of_data))
# Total size of everything before actual data:
# 128 byte header
# + 8 byte master tag
# + 56 byte subelements (48 byte default + 8 byte name)
# = 192
# Copy the selected data into the .mat file
out_data = (
record.p_signal * record.adc_gain
+ record.baseline
- record.adc_zero
)
# Cast the data to the correct type base on the bytes_per_element
out_data = np.around(out_data).astype(out_type)
# out_data should be [r1c1, r1c2, r2c1, r2c2, etc.]
out_data = out_data.flatten()
out_fmt = "<%sh" % len(out_data)
f.write(struct.pack(out_fmt, *out_data))
# Display some useful information
if record.base_time is None:
if record.base_date is None:
datetime_string = "[None]"
else:
datetime_string = "[{}]".format(
record.base_date.strftime("%d/%m/%Y")
)
else:
if record.base_date is None:
datetime_string = "[{}]".format(
record.base_time.strftime("%H:%M:%S.%f")
)
else:
datetime_string = "[{} {}]".format(
record.base_time.strftime("%H:%M:%S.%f"),
record.base_date.strftime("%d/%m/%Y"),
)
print("Source: record {}\t\tStart: {}".format(record_name, datetime_string))
print(
"val has {} rows (signals) and {} columns (samples/signal)".format(
record.n_sig, desired_length
)
)
duration_string = str(
datetime.timedelta(seconds=desired_length / record.fs)
)
print("Duration: {}".format(duration_string))
print(
"Sampling frequency: {} Hz\tSampling interval: {} sec".format(
record.fs, 1 / record.fs
)
)
print(
"{:<7}{:<20}{:<17}{:<10}{:<10}".format(
"Row", "Signal", "Gain", "Base", "Units"
)
)
record.sig_name = [s.replace(" ", "_") for s in record.sig_name]
for i in range(record.n_sig):
print(
"{:<7}{:<20}{:<17}{:<10}{:<10}".format(
i,
record.sig_name[i],
record.adc_gain[i],
record.baseline[i] - record.adc_zero[i] + offset,
record.units[i],
)
)
# Modify the record file to reflect the new data
num_channels = record.n_sig if (channels is None) else len(channels)
record.record_name = record_name_out
record.n_sig = num_channels
record.samps_per_frame = num_channels * [1]
record.file_name = num_channels * [output_file]
record.fmt = num_channels * [wfdb_type]
record.byte_offset = num_channels * [192]
record.baseline = [
b - record.adc_zero[i] for i, b in enumerate(record.baseline)
]
record.adc_zero = num_channels * [0]
record.init_value = out_data[: record.n_sig].tolist()
# Write the header file RECm.hea
record.wrheader()
# Append the following lines to create a signature
with open(record_name_out + ".hea", "a") as f:
f.write("#Creator: wfdb2mat\n")
f.write("#Source: record {}\n".format(record_name))