Waveform Database Software Package (WFDB) for Python 3.3.0

File: <base>/wfdb/io/tff.py (10,188 bytes)
"""
Module for reading ME6000 .tff format files.

http://www.biomation.com/kin/me6000.htm

"""
import datetime
import os
import struct

import numpy as np


def rdtff(file_name, cut_end=False):
    """
    Read values from a tff file.

    Parameters
    ----------
    file_name : str
        Name of the .tff file to read.
    cut_end : bool, optional
        If True, cuts out the last sample for all channels. This is for
        reading files which appear to terminate with the incorrect
        number of samples (ie. sample not present for all channels).

    Returns
    -------
    signal : ndarray
        A 2d numpy array storing the physical signals from the record.
    fields : dict
        A dictionary containing several key attributes of the read record.
    markers : ndarray
        A 1d numpy array storing the marker locations.
    triggers : ndarray
        A 1d numpy array storing the trigger locations.

    Notes
    -----
    This function is slow because tff files may contain any number of
    escape sequences interspersed with the signals. There is no way to
    know the number of samples/escape sequences beforehand, so the file
    is inefficiently parsed a small chunk at a time.

    It is recommended that you convert your tff files to WFDB format.

    """
    file_size = os.path.getsize(file_name)
    with open(file_name, 'rb') as fp:
        fields, file_fields = _rdheader(fp)
        signal, markers, triggers = _rdsignal(fp, file_size=file_size,
                                              header_size=file_fields['header_size'],
                                              n_sig=file_fields['n_sig'],
                                              bit_width=file_fields['bit_width'],
                                              is_signed=file_fields['is_signed'],
                                              cut_end=cut_end)
    return signal, fields, markers, triggers


def _rdheader(fp):
    """
    Read header info of the windaq file.

    Parameters
    ----------
    fp : file IO object
        The input header file to be read.

    Returns
    -------
    fields : dict
        For interpreting the waveforms.
    file_fields : dict
        For reading the signal samples.

    """
    tag = None
    # The '2' tag indicates the end of tags.
    while tag != 2:
        # For each header element, there is a tag indicating data type,
        # followed by the data size, followed by the data itself. 0's
        # pad the content to the nearest 4 bytes. If data_len=0, no pad.
        tag = struct.unpack('>H', fp.read(2))[0]
        data_size = struct.unpack('>H', fp.read(2))[0]
        pad_len = (4 - (data_size % 4)) % 4
        pos = fp.tell()
        # Currently, most tags will be ignored...
        # storage method
        if tag == 1001:
            storage_method = fs = struct.unpack('B', fp.read(1))[0]
            storage_method = {0:'recording', 1:'manual', 2:'online'}[storage_method]
        # fs, unit16
        elif tag == 1003:
            fs = struct.unpack('>H', fp.read(2))[0]
        # sensor type
        elif tag == 1007:
            # Each byte contains information for one channel
            n_sig = data_size
            channel_data = struct.unpack('>%dB' % data_size, fp.read(data_size))
            # The documentation states: "0 : Channel is not used"
            # This means the samples are NOT saved.
            channel_map = ((1, 1, 'emg'),
                           (15, 30, 'goniometer'), (31, 46, 'accelerometer'),
                           (47, 62, 'inclinometer'),
                           (63, 78, 'polar_interface'), (79, 94, 'ecg'),
                           (95, 110, 'torque'), (111, 126, 'gyrometer'),
                           (127, 142, 'sensor'))
            sig_name = []
            # The number range that the data lies between gives the
            # channel
            for data in channel_data:
                # Default case if byte value falls outside of channel map
                base_name = 'unknown'
                # Unused channel
                if data == 0:
                    n_sig -= 1
                    break
                for item in channel_map:
                    if item[0] <= data <= item[1]:
                        base_name = item[2]
                        break
                existing_count = [base_name in name for name in sig_name].count(True)
                sig_name.append('%s_%d' % (base_name, existing_count))
        # Display scale. Probably not useful.
        elif tag == 1009:
            # 100, 500, 1000, 2500, or 8500uV
            display_scale = struct.unpack('>I', fp.read(4))[0]
        # sample format, uint8
        elif tag == 3:
            sample_fmt = struct.unpack('B', fp.read(1))[0]
            is_signed = bool(sample_fmt >> 7)
            # ie. 8 or 16 bits
            bit_width = sample_fmt & 127
        # Measurement start time - seconds from 1.1.1970 UTC
        elif tag == 101:
            n_seconds = struct.unpack('>I', fp.read(4))[0]
            base_datetime = datetime.datetime.utcfromtimestamp(n_seconds)
            base_date = base_datetime.date()
            base_time = base_datetime.time()
        # Measurement start time - minutes from UTC
        elif tag == 102:
            n_minutes = struct.unpack('>h', fp.read(2))[0]
        # Go to the next tag
        fp.seek(pos + data_size + pad_len)
    header_size = fp.tell()
    # For interpreting the waveforms
    fields = {'fs':fs, 'n_sig':n_sig, 'sig_name':sig_name,
              'base_time':base_time, 'base_date':base_date}
    # For reading the signal samples
    file_fields = {'header_size':header_size, 'n_sig':n_sig,
                   'bit_width':bit_width, 'is_signed':is_signed}
    return fields, file_fields


def _rdsignal(fp, file_size, header_size, n_sig, bit_width, is_signed, cut_end):
    """
    Read the signal.

    Parameters
    ----------
    fp : file IO object
        The input header file to be read.
    file_size : int
        Size of the file in bytes.
    header_size : int
        Size of the header file in bytes.
    n_sig : int
        The number of signals contained in the dat file.
    bit_width : int
        The number of bits necessary to represent the number in binary.
    is_signed : bool
        Whether the number is signed (True) or not (False).
    cut_end : bool, optional
        If True, enables reading the end of files which appear to terminate
        with the incorrect number of samples (ie. sample not present for all channels),
        by checking and skipping the reading the end of such files.
        Checking this option makes reading slower.
    
    Returns
    -------
    signal : ndarray
        Tranformed expanded signal into uniform signal.
    markers : ndarray
        A 1d numpy array storing the marker locations.
    triggers : ndarray
        A 1d numpy array storing the trigger locations.

    """
    # Cannot initially figure out signal length because there
    # are escape sequences.
    fp.seek(header_size)
    signal_size = file_size - header_size
    byte_width = int(bit_width / 8)
    # numpy dtype
    dtype = str(byte_width)
    if is_signed:
        dtype = 'i' + dtype
    else:
        dtype = 'u' + dtype
    # big endian
    dtype = '>' + dtype
    # The maximum possible samples given the file size
    # All channels must be present
    max_samples = int(signal_size / byte_width)
    max_samples = max_samples - max_samples % n_sig
    # Output information
    signal = np.empty(max_samples, dtype=dtype)
    markers = []
    triggers = []
    # Number of (total) samples read
    sample_num = 0

    # Read one sample for all channels at a time
    if cut_end:
        stop_byte = file_size - n_sig * byte_width + 1
        while fp.tell() < stop_byte:
            chunk = fp.read(2)
            sample_num = _get_sample(fp, chunk, n_sig, dtype, signal, markers, triggers, sample_num)
    else:
        while True:
            chunk = fp.read(2)
            if not chunk:
                break
            sample_num = _get_sample(fp, chunk, n_sig, dtype, signal, markers, triggers, sample_num)

    # No more bytes to read. Reshape output arguments.
    signal = signal[:sample_num]
    signal = signal.reshape((-1, n_sig))
    markers = np.array(markers, dtype='int')
    triggers = np.array(triggers, dtype='int')
    return signal, markers, triggers


def _get_sample(fp, chunk, n_sig, dtype, signal, markers, triggers, sample_num):
    """
    Get the total number of samples in the signal.

    Parameters
    ----------
    fp : file IO object
        The input header file to be read.
    chunk : str
        The data currently being processed.
    n_sig : int
        The number of signals contained in the dat file.
    dtype : str
        String numpy dtype used to store the signal of the given
        resolution.
    signal : ndarray
        Tranformed expanded signal into uniform signal.    
    markers : ndarray
        A 1d numpy array storing the marker locations.
    triggers : ndarray
        A 1d numpy array storing the trigger locations.
    sample_num : int
        The total number of samples in the signal.

    Returns
    -------
    sample_num : int
        The total number of samples in the signal.

    """
    tag = struct.unpack('>h', chunk)[0]
    # Escape sequence
    if tag == -32768:
        # Escape sequence structure: int16 marker, uint8 type,
        # uint8 length, uint8 * length data, padding % 2
        escape_type = struct.unpack('B', fp.read(1))[0]
        data_len = struct.unpack('B', fp.read(1))[0]
        # Marker*
        if escape_type == 1:
            # *In manual mode, this could be block start/top time.
            # But we are it is just a single time marker.
            markers.append(sample_num / n_sig)
        # Trigger
        elif escape_type == 2:
            triggers.append(sample_num / n_sig)
        fp.seek(data_len + data_len % 2, 1)
    # Regular samples
    else:
        fp.seek(-2, 1)
        signal[sample_num:sample_num + n_sig] = np.fromfile(
            fp, dtype=dtype, count=n_sig)
        sample_num += n_sig
    return sample_num