Waveform Database Software Package (WFDB) for Python 4.1.0

File: <base>/wfdb/io/annotation.py (109,544 bytes)
import copy
import numpy as np
import os
import pandas as pd
import re
import posixpath
import sys

from wfdb.io import download
from wfdb.io import _header
from wfdb.io import record


class Annotation(object):
    """
    The class representing WFDB annotations.

    Annotation objects can be created using the initializer, or by reading a
    WFDB annotation file with `rdann`.

    The attributes of the Annotation object give information about the
    annotation as specified by:
    https://www.physionet.org/physiotools/wag/annot-5.htm

    Call `show_ann_labels()` to see the list of standard annotation codes. Any
    text used to label annotations that are not one of these codes should go in
    the 'aux_note' field rather than the 'sym' field.

    The current annotation values organized as such:

        AnnotationLabel(label_store (or subtype), symbol (or aux_note), short_description, description)

    where the associated values are:

        ann_labels = [
            AnnotationLabel(0, " ", 'NOTANN', 'Not an actual annotation'),
            AnnotationLabel(1, "N", 'NORMAL', 'Normal beat'),
            AnnotationLabel(2, "L", 'LBBB', 'Left bundle branch block beat'),
            AnnotationLabel(3, "R", 'RBBB', 'Right bundle branch block beat'),
            AnnotationLabel(4, "a", 'ABERR', 'Aberrated atrial premature beat'),
            AnnotationLabel(5, "V", 'PVC', 'Premature ventricular contraction'),
            AnnotationLabel(6, "F", 'FUSION', 'Fusion of ventricular and normal beat'),
            AnnotationLabel(7, "J", 'NPC', 'Nodal (junctional) premature beat'),
            AnnotationLabel(8, "A", 'APC', 'Atrial premature contraction'),
            AnnotationLabel(9, "S", 'SVPB', 'Premature or ectopic supraventricular beat'),
            AnnotationLabel(10, "E", 'VESC', 'Ventricular escape beat'),
            AnnotationLabel(11, "j", 'NESC', 'Nodal (junctional) escape beat'),
            AnnotationLabel(12, "/", 'PACE', 'Paced beat'),
            AnnotationLabel(13, "Q", 'UNKNOWN', 'Unclassifiable beat'),
            AnnotationLabel(14, "~", 'NOISE', 'Signal quality change'),
            AnnotationLabel(16, "|", 'ARFCT',  'Isolated QRS-like artifact'),
            AnnotationLabel(18, "s", 'STCH',  'ST change'),
            AnnotationLabel(19, "T", 'TCH',  'T-wave change'),
            AnnotationLabel(20, "*", 'SYSTOLE',  'Systole'),
            AnnotationLabel(21, "D", 'DIASTOLE',  'Diastole'),
            AnnotationLabel(22, '"', 'NOTE',  'Comment annotation'),
            AnnotationLabel(23, "=", 'MEASURE',  'Measurement annotation'),
            AnnotationLabel(24, "p", 'PWAVE',  'P-wave peak'),
            AnnotationLabel(25, "B", 'BBB',  'Left or right bundle branch block'),
            AnnotationLabel(26, "^", 'PACESP',  'Non-conducted pacer spike'),
            AnnotationLabel(27, "t", 'TWAVE',  'T-wave peak'),
            AnnotationLabel(28, "+", 'RHYTHM',  'Rhythm change'),
            AnnotationLabel(29, "u", 'UWAVE',  'U-wave peak'),
            AnnotationLabel(30, "?", 'LEARN',  'Learning'),
            AnnotationLabel(31, "!", 'FLWAV',  'Ventricular flutter wave'),
            AnnotationLabel(32, "[", 'VFON',  'Start of ventricular flutter/fibrillation'),
            AnnotationLabel(33, "]", 'VFOFF',  'End of ventricular flutter/fibrillation'),
            AnnotationLabel(34, "e", 'AESC',  'Atrial escape beat'),
            AnnotationLabel(35, "n", 'SVESC',  'Supraventricular escape beat'),
            AnnotationLabel(36, "@", 'LINK',  'Link to external data (aux_note contains URL)'),
            AnnotationLabel(37, "x", 'NAPC',  'Non-conducted P-wave (blocked APB)'),
            AnnotationLabel(38, "f", 'PFUS',  'Fusion of paced and normal beat'),
            AnnotationLabel(39, "(", 'WFON',  'Waveform onset'),
            AnnotationLabel(40, ")", 'WFOFF',  'Waveform end'),
            AnnotationLabel(41, "r", 'RONT',  'R-on-T premature ventricular contraction')
        ]

    The current annotation classes are organized as such:

        AnnotationClass(extension, description, human_reviewed)

    where the associated values are:

        ann_classes = [
            AnnotationClass('atr', 'Reference ECG annotations', True),
            AnnotationClass('blh', 'Human reviewed beat labels', True),
            AnnotationClass('blm', 'Machine beat labels', False),
            AnnotationClass('alh', 'Human reviewed alarms', True),
            AnnotationClass('alm', 'Machine alarms', False),
            AnnotationClass('qrsc', 'Human reviewed QRS detections', True),
            AnnotationClass('qrs', 'Machine QRS detections', False),
            AnnotationClass('bph', 'Human reviewed BP beat detections', True),
            AnnotationClass('bpm', 'Machine BP beat detections', False)
        ]

    Attributes
    ----------
    record_name : str
        The base file name (without extension) of the record that the
        annotation is associated with.
    extension : str
        The file extension of the file the annotation is stored in.
    sample : ndarray
        A numpy array containing the annotation locations in samples relative to
        the beginning of the record.
    symbol : list, numpy array, optional
        The symbols used to display the annotation labels. List or numpy array.
        If this field is present, `label_store` must not be present.
    subtype : ndarray, optional
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray, optional
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray, optional
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list, optional
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.
    fs : int, float, optional
        The sampling frequency of the record.
    label_store : ndarray, optional
        The integer value used to store/encode each annotation label
    description : list, optional
        A list containing the descriptive string of each annotation label.
    custom_labels : pandas dataframe, optional
        The custom annotation labels defined in the annotation file. Maps
        the relationship between the three label fields. The data type is a
        pandas DataFrame with three columns:
        ['label_store', 'symbol', 'description'].
    contained_labels : pandas dataframe, optional
        The unique labels contained in this annotation. Same structure as
        `custom_labels`.

    Examples
    --------
    >>> ann1 = wfdb.Annotation(record_name='rec1', extension='atr',
                               sample=[10,20,400], symbol=['N','N','['],
                               aux_note=[None, None, 'Serious Vfib'])

    """

    def __init__(
        self,
        record_name,
        extension,
        sample,
        symbol=None,
        subtype=None,
        chan=None,
        num=None,
        aux_note=None,
        fs=None,
        label_store=None,
        description=None,
        custom_labels=None,
        contained_labels=None,
    ):

        self.record_name = record_name
        self.extension = extension

        self.sample = sample
        self.symbol = symbol

        self.subtype = subtype
        self.chan = chan
        self.num = num
        self.aux_note = aux_note
        self.fs = fs

        self.label_store = label_store
        self.description = description

        self.custom_labels = custom_labels
        self.contained_labels = contained_labels

        self.ann_len = len(self.sample)

        # __label_map__: (storevalue, symbol, description) hidden attribute

    def __eq__(self, other):
        """
        Equal comparison operator for objects of this type.

        Parameters
        ----------
        other : object
            The object that is being compared to self.

        Returns
        -------
        bool
            Determines if the objects are equal (True) or not equal (False).

        """
        att1 = self.__dict__
        att2 = other.__dict__

        if set(att1.keys()) != set(att2.keys()):
            print("keyset")
            return False

        for k in att1.keys():
            v1 = att1[k]
            v2 = att2[k]

            if type(v1) != type(v2):
                print(k)
                return False

            if isinstance(v1, np.ndarray):
                if not np.array_equal(v1, v2):
                    print(k)
                    return False
            elif isinstance(v1, pd.DataFrame):
                if not v1.equals(v2):
                    print(k)
                    return False
            else:
                if v1 != v2:
                    print(k)
                    return False

        return True

    def apply_range(self, sampfrom=0, sampto=None):
        """
        Filter the annotation attributes to keep only items between the
        desired sample values.

        Parameters
        ----------
        sampfrom : int, optional
            The minimum sample number for annotations to be returned.
        sampto : int, optional
            The maximum sample number for annotations to be returned.

        """
        sampto = sampto or self.sample[-1]

        kept_inds = np.intersect1d(
            np.where(self.sample >= sampfrom), np.where(self.sample <= sampto)
        )

        for field in ["sample", "label_store", "subtype", "chan", "num"]:
            setattr(self, field, getattr(self, field)[kept_inds])

        self.aux_note = [self.aux_note[i] for i in kept_inds]

        self.ann_len = len(self.sample)

    def wrann(self, write_fs=False, write_dir=""):
        """
        Write a WFDB annotation file from this object.

        Parameters
        ----------
        write_fs : bool, optional
            Whether to write the `fs` attribute to the file.
        write_dir : str, optional
            The output directory in which the header is written.

        Returns
        -------
        N/A

        """
        for field in ["record_name", "extension"]:
            if getattr(self, field) is None:
                raise Exception(
                    "Missing required field for writing annotation file: ",
                    field,
                )

        present_label_fields = self.get_label_fields()
        if not present_label_fields:
            raise Exception(
                "At least one annotation label field is required to write the annotation: ",
                ann_label_fields,
            )

        # Check the validity of individual fields
        self.check_fields()

        # Standardize the format of the custom_labels field
        self.standardize_custom_labels()

        # Create the label map used in this annotaion
        self.create_label_map()

        # Check the cohesion of fields
        self.check_field_cohesion(present_label_fields)

        # Calculate the label_store field if necessary
        if "label_store" not in present_label_fields:
            self.convert_label_attribute(
                source_field=present_label_fields[0], target_field="label_store"
            )

        # Calculate the symbol field if necessary
        if "symbol" not in present_label_fields:
            self.convert_label_attribute(
                source_field=present_label_fields[0], target_field="symbol"
            )

        # Write the header file using the specified fields
        self.wr_ann_file(write_fs=write_fs, write_dir=write_dir)

        return

    def get_label_fields(self):
        """
        Get the present label fields in the object.

        Parameters
        ----------
        N/A

        Returns
        -------
        present_label_fields : list
            All of the present label fields in the object.

        """
        present_label_fields = []
        for field in ann_label_fields:
            if getattr(self, field) is not None:
                present_label_fields.append(field)

        return present_label_fields

    def check_fields(self):
        """
        Check the set fields of the annotation object.

        Parameters
        ----------
        N/A

        Returns
        -------
        N/A

        """
        # Check all set fields
        for field in ALLOWED_TYPES:
            if getattr(self, field) is not None:
                # Check the type of the field's elements
                self.check_field(field)
        return

    def check_field(self, field):
        """
        Check a particular annotation field.

        Parameters
        ----------
        field : str
            The annotation field to be checked.

        Returns
        -------
        N/A

        """
        item = getattr(self, field)

        if not isinstance(item, ALLOWED_TYPES[field]):
            raise TypeError(
                "The " + field + " field must be one of the following types:",
                ALLOWED_TYPES[field],
            )

        # Numerical integer annotation fields: sample, label_store, sub,
        # chan, num
        if ALLOWED_TYPES[field] == (np.ndarray):
            record.check_np_array(
                item=item,
                field_name=field,
                ndim=1,
                parent_class=np.integer,
                channel_num=None,
            )

        # Field specific checks
        if field == "record_name":
            if bool(re.search(r"[^-\w]", self.record_name)):
                raise ValueError(
                    "record_name must only comprise of letters, digits, hyphens, and underscores."
                )
        elif field == "extension":
            if bool(re.search("[^a-zA-Z]", self.extension)):
                raise ValueError("extension must only comprise of letters.")
        elif field == "fs":
            if self.fs <= 0:
                raise ValueError("The fs field must be a non-negative number")
        elif field == "custom_labels":
            # The role of this section is just to check the
            # elements of this item, without utilizing
            # any other fields. No format conversion
            # or free value looksups etc are done.

            # Check the structure of the subelements
            if isinstance(item, pd.DataFrame):
                column_names = list(item)
                if "symbol" in column_names and "description" in column_names:
                    if "label_store" in column_names:
                        label_store = list(item["label_store"].values)
                    else:
                        label_store = None
                    symbol = item["symbol"].values
                    description = item["description"].values
                else:
                    raise ValueError(
                        "".join(
                            [
                                "If the "
                                + field
                                + " field is pandas dataframe, its columns",
                                " must be one of the following:\n-[label_store, symbol, description]",
                                "\n-[symbol, description]",
                            ]
                        )
                    )
            else:
                if set([len(i) for i in item]) == {2}:
                    label_store = None
                    symbol = [i[0] for i in item]
                    description = [i[1] for i in item]
                elif set([len(i) for i in item]) == {3}:
                    label_store = [i[0] for i in item]
                    symbol = [i[1] for i in item]
                    description = [i[2] for i in item]
                else:
                    raise ValueError(
                        "".join(
                            [
                                "If the "
                                + field
                                + " field is an array-like object, its subelements",
                                " must be one of the following:\n- tuple triplets storing: ",
                                "(label_store, symbol, description)\n- tuple pairs storing: ",
                                "(symbol, description)",
                            ]
                        )
                    )

            # Check the values of the subelements
            if label_store:
                if len(item) != len(set(label_store)):
                    raise ValueError(
                        "The label_store values of the "
                        + field
                        + " field must be unique"
                    )

                if min(label_store) < 1 or max(label_store) > 49:
                    raise ValueError(
                        "The label_store values of the custom_labels field must be between 1 and 49"
                    )

            if len(item) != len(set(symbol)):
                raise ValueError(
                    "The symbol values of the "
                    + field
                    + " field must be unique"
                )

            for i in range(len(item)):
                if label_store:
                    if not hasattr(label_store[i], "__index__"):
                        raise TypeError(
                            "The label_store values of the "
                            + field
                            + " field must be integer-like"
                        )

                if not isinstance(symbol[i], str_types) or len(
                    symbol[i]
                ) not in [
                    1,
                    2,
                    3,
                ]:
                    raise ValueError(
                        "The symbol values of the "
                        + field
                        + " field must be strings of length 1 to 3"
                    )

                if bool(re.search("[ \t\n\r\f\v]", symbol[i])):
                    raise ValueError(
                        "The symbol values of the "
                        + field
                        + " field must not contain whitespace characters"
                    )

                if not isinstance(description[i], str_types):
                    raise TypeError(
                        "The description values of the "
                        + field
                        + " field must be strings"
                    )

                # Would be good to enfore this but existing garbage annotations have tabs and newlines...
                # if bool(re.search('[\t\n\r\f\v]', description[i])):
                #    raise ValueError('The description values of the '+field+' field must not contain tabs or newlines')

        # The string fields
        elif field in ["symbol", "description", "aux_note"]:
            uniq_elements = set(item)

            for e in uniq_elements:
                if not isinstance(e, str_types):
                    raise TypeError(
                        "Subelements of the " + field + " field must be strings"
                    )

            if field == "symbol":
                for e in uniq_elements:
                    if len(e) not in [1, 2, 3]:
                        raise ValueError(
                            "Subelements of the "
                            + field
                            + " field must be strings of length 1 to 3"
                        )
                    if bool(re.search("[ \t\n\r\f\v]", e)):
                        raise ValueError(
                            "Subelements of the "
                            + field
                            + " field may not contain whitespace characters"
                        )
            else:
                for e in uniq_elements:
                    if bool(re.search("[\t\n\r\f\v]", e)):
                        raise ValueError(
                            "Subelements of the "
                            + field
                            + " field must not contain tabs or newlines"
                        )

        elif field == "sample":
            if len(self.sample) == 1:
                sampdiffs = np.array([self.sample[0]])
            elif len(self.sample) > 1:
                sampdiffs = np.concatenate(
                    ([self.sample[0]], np.diff(self.sample))
                )
            else:
                raise ValueError(
                    "The 'sample' field must be a numpy array with length greater than 0"
                )
            if min(self.sample) < 0:
                raise ValueError(
                    "The 'sample' field must only contain non-negative integers"
                )
            if min(sampdiffs) < 0:
                raise ValueError(
                    "The 'sample' field must contain monotonically increasing sample numbers"
                )

        elif field == "label_store":
            if min(item) < 1 or max(item) > 49:
                raise ValueError(
                    "The label_store values must be between 1 and 49"
                )

        # The C WFDB library stores num/sub/chan as chars.
        elif field == "subtype":
            # signed character
            if min(self.subtype) < -128 or max(self.subtype) > 127:
                raise ValueError(
                    "The 'subtype' field must only contain integers from -128 to 127"
                )
        elif field == "chan":
            # un_signed character
            if min(self.chan) < 0 or max(self.chan) > 255:
                raise ValueError(
                    "The 'chan' field must only contain non-negative integers up to 255"
                )
        elif field == "num":
            # signed character
            if min(self.num) < 0 or max(self.num) > 127:
                raise ValueError(
                    "The 'num' field must only contain non-negative integers up to 127"
                )

        return

    def check_field_cohesion(self, present_label_fields):
        """
        Check that the content and structure of different fields are consistent
        with one another.

        Parameters
        ----------
        present_label_fields : list
            All of the present label fields in the object.

        Returns
        -------
        N/A

        """
        # Ensure all written annotation fields have the same length
        nannots = len(self.sample)

        for field in [
            "sample",
            "num",
            "subtype",
            "chan",
            "aux_note",
        ] + present_label_fields:
            if getattr(self, field) is not None:
                if len(getattr(self, field)) != nannots:
                    raise ValueError(
                        "The lengths of the 'sample' and '"
                        + field
                        + "' fields do not match"
                    )

        # Ensure all label fields are defined by the label map. This has to be checked because
        # it is possible the user defined (or lack of) custom_labels does not capture all the
        # labels present.
        for field in present_label_fields:
            defined_values = self.__label_map__[field].values

            if set(getattr(self, field)) - set(defined_values) != set():
                raise ValueError(
                    "\n".join(
                        [
                            "\nThe "
                            + field
                            + " field contains elements not encoded in the stardard WFDB annotation labels, or this object's custom_labels field",
                            "- To see the standard WFDB annotation labels, call: show_ann_labels()",
                            "- To transfer non-encoded symbol items into the aux_note field, call: self.sym_to_aux()",
                            "- To define custom labels, set the custom_labels field as a list of tuple triplets with format: (label_store, symbol, description)",
                        ]
                    )
                )

        return

    def standardize_custom_labels(self):
        """
        Set the custom_labels field of the object to a standardized format:
        3 column pandas df with ann_label_fields as columns.

        Does nothing if there are no custom labels defined.
        Does nothing if custom_labels is already a df with all 3 columns.

        If custom_labels is an iterable of pairs/triplets, this
        function will convert it into a df.

        If the label_store attribute is not already defined, this
        function will automatically choose values by trying to use:
        1. The undefined store values from the standard WFDB annotation
           label map.
        2. The unused label store values. This is extracted by finding the
           set of all labels contained in this annotation object and seeing
           which symbols/descriptions are not used.

        If there are more custom labels defined than there are enough spaces,
        even in condition 2 from above, this function will raise an error.

        This function must work when called as a standalone.

        Parameters
        ----------
        N/A

        Returns
        -------
        N/A

        """
        custom_labels = self.custom_labels

        if custom_labels is None:
            return

        self.check_field("custom_labels")

        # Convert to dataframe if not already
        if not isinstance(custom_labels, pd.DataFrame):
            if len(self.custom_labels[0]) == 2:
                symbol = self.get_custom_label_attribute("symbol")
                description = self.get_custom_label_attribute("description")
                custom_labels = pd.DataFrame(
                    {"symbol": symbol, "description": description}
                )
            else:
                label_store = self.get_custom_label_attribute("label_store")
                symbol = self.get_custom_label_attribute("symbol")
                description = self.get_custom_label_attribute("description")
                custom_labels = pd.DataFrame(
                    {
                        "label_store": label_store,
                        "symbol": symbol,
                        "description": description,
                    }
                )

        # Assign label_store values to the custom labels if not defined
        if "label_store" not in list(custom_labels):
            undefined_label_stores = self.get_undefined_label_stores()

            if len(custom_labels) > len(undefined_label_stores):
                available_label_stores = self.get_available_label_stores()
            else:
                available_label_stores = undefined_label_stores

            n_custom_labels = custom_labels.shape[0]

            if n_custom_labels > len(available_label_stores):
                raise ValueError(
                    "There are more custom_label definitions than storage values available for them."
                )

            custom_labels["label_store"] = available_label_stores[
                :n_custom_labels
            ]

        custom_labels.set_index(
            custom_labels["label_store"].values, inplace=True
        )
        custom_labels = custom_labels[list(ann_label_fields)]

        self.custom_labels = custom_labels

        return

    def get_undefined_label_stores(self):
        """
        Get the label_store values not defined in the
        standard WFDB annotation labels.

        Parameters
        ----------
        N/A

        Returns
        -------
        list
            The label_store values not found in WFDB annotation labels.

        """
        return list(set(range(50)) - set(ann_label_table["label_store"]))

    def get_available_label_stores(self, usefield="tryall"):
        """
        Get the label store values that may be used
        for writing this annotation.

        Available store values include:
        - the undefined values in the standard WFDB labels
        - the store values not used in the current
          annotation object.
        - the store values whose standard WFDB symbols/descriptions
          match those of the custom labels (if custom_labels exists)

        Parameters
        ----------
        usefield : str, optional
            If 'usefield' is explicitly specified, the function will use that
            field to figure out available label stores. If 'usefield'
            is set to 'tryall', the function will choose one of the contained
            attributes by checking availability in the order: label_store, symbol, description.

        Returns
        -------
        available_label_stores : set
            The available store values used for writing the annotation.

        """
        # Figure out which field to use to get available labels stores.
        if usefield == "tryall":
            if self.label_store is not None:
                usefield = "label_store"
            elif self.symbol is not None:
                usefield = "symbol"
            elif self.description is not None:
                usefield = "description"
            else:
                raise ValueError(
                    "No label fields are defined. At least one of the following is required: ",
                    ann_label_fields,
                )
            return self.get_available_label_stores(usefield=usefield)
        # Use the explicitly stated field to get available stores.
        else:
            # If usefield == 'label_store', there are slightly fewer/different steps
            # compared to if it were another option

            contained_field = getattr(self, usefield)

            # Get the unused label_store values
            if usefield == "label_store":
                unused_label_stores = (
                    set(ann_label_table["label_store"].values) - contained_field
                )
            else:
                # the label_store values from the standard WFDB annotation labels
                # whose symbols are not contained in this annotation
                unused_field = (
                    set(ann_label_table[usefield].values) - contained_field
                )
                unused_label_stores = ann_label_table.loc[
                    ann_label_table[usefield] in unused_field, "label_store"
                ].values

            # Get the standard WFDB label_store values overwritten by the
            # custom_labels if any
            if self.custom_symbols is not None:
                custom_field = set(self.get_custom_label_attribute(usefield))
                if usefield == "label_store":
                    overwritten_label_stores = set(custom_field).intersection(
                        set(ann_label_table["label_store"])
                    )
                else:
                    overwritten_fields = set(custom_field).intersection(
                        set(ann_label_table[usefield])
                    )
                    overwritten_label_stores = ann_label_table.loc[
                        ann_label_table[usefield] in overwritten_fields,
                        "label_store",
                    ].values
            else:
                overwritten_label_stores = set()

            # The undefined values in the standard WFDB labels
            undefined_label_stores = self.get_undefined_label_stores()
            # Final available label stores = undefined + unused + overwritten
            available_label_stores = (
                set(undefined_label_stores)
                .union(set(unused_label_stores))
                .union(overwritten_label_stores)
            )

            return available_label_stores

    def get_custom_label_attribute(self, attribute):
        """
        Get a list of the custom_labels attribute i.e. label_store,
        symbol, or description. The custom_labels variable could be in
        a number of formats.

        Parameters
        ----------
        attribute : str
            The selected attribute to generate the list.

        Returns
        -------
        a : list
            All of the custom_labels attributes.

        """
        if attribute not in ann_label_fields:
            raise ValueError("Invalid attribute specified")

        if isinstance(self.custom_labels, pd.DataFrame):
            if "label_store" not in list(self.custom_labels):
                raise ValueError("label_store not defined in custom_labels")
            a = list(self.custom_labels[attribute].values)
        else:
            if len(self.custom_labels[0]) == 2:
                if attribute == "label_store":
                    raise ValueError("label_store not defined in custom_labels")
                elif attribute == "symbol":
                    a = [l[0] for l in self.custom_labels]
                elif attribute == "description":
                    a = [l[1] for l in self.custom_labels]
            else:
                if attribute == "label_store":
                    a = [l[0] for l in self.custom_labels]
                elif attribute == "symbol":
                    a = [l[1] for l in self.custom_labels]
                elif attribute == "description":
                    a = [l[2] for l in self.custom_labels]

        return a

    def create_label_map(self, inplace=True):
        """
        Creates mapping df based on ann_label_table and self.custom_labels. Table
        composed of entire WFDB standard annotation table, overwritten/appended
        with custom_labels if any. Sets __label_map__ attribute, or returns value.

        Parameters
        ----------
        inplace : bool, optional
            Determines whether to add the label map to the current
            object (True) or as a return variable (False).

        Returns
        -------
        label_map : pandas DataFrame
            Mapping based on ann_label_table and self.custom_labels.

        """
        label_map = ann_label_table.copy()

        if self.custom_labels is not None:
            self.standardize_custom_labels()
            for i in self.custom_labels.index:
                label_map.loc[i] = self.custom_labels.loc[i]

        if inplace:
            self.__label_map__ = label_map
        else:
            return label_map

    def wr_ann_file(self, write_fs, write_dir=""):
        """
        Calculate the bytes used to encode an annotation set and
        write them to an annotation file.

        Parameters
        ----------
        write_fs : bool
            Whether to write the `fs` attribute to the file.
        write_dir : str, optional
            The output directory in which the header is written.

        Returns
        -------
        N/A

        """
        # Calculate the fs bytes to write if present and desired to write
        if write_fs:
            fs_bytes = self.calc_fs_bytes()
        else:
            fs_bytes = []
        # Calculate the custom_labels bytes to write if present
        cl_bytes = self.calc_cl_bytes()
        # Calculate the core field bytes to write
        core_bytes = self.calc_core_bytes()

        # Mark the end of the special annotation types if needed
        if fs_bytes == [] and cl_bytes == []:
            end_special_bytes = []
        else:
            end_special_bytes = [0, 236, 255, 255, 255, 255, 1, 0]

        # Write the file
        with open(
            os.path.join(write_dir, self.record_name + "." + self.extension),
            "wb",
        ) as f:
            # Combine all bytes to write: fs (if any), custom annotations (if any), main content, file terminator
            np.concatenate(
                (
                    fs_bytes,
                    cl_bytes,
                    end_special_bytes,
                    core_bytes,
                    np.array([0, 0]),
                )
            ).astype("u1").tofile(f)

        return

    def calc_fs_bytes(self):
        """
        Calculate the bytes written to the annotation file for the fs field.

        Parameters
        ----------
        N/A

        Returns
        -------
        list, ndarray
            All of the bytes to be written to the annotation file.

        """
        if self.fs is None:
            return []

        # Initial indicators of encoding fs
        data_bytes = [
            0,
            88,
            0,
            252,
            35,
            35,
            32,
            116,
            105,
            109,
            101,
            32,
            114,
            101,
            115,
            111,
            108,
            117,
            116,
            105,
            111,
            110,
            58,
            32,
        ]

        # Check if fs is close enough to int
        if isinstance(self.fs, float):
            if round(self.fs, 8) == float(int(self.fs)):
                self.fs = int(self.fs)

        fschars = str(self.fs)
        ndigits = len(fschars)

        for i in range(ndigits):
            data_bytes.append(ord(fschars[i]))

        # Fill in the aux_note length
        data_bytes[2] = ndigits + 20

        # odd number of digits
        if ndigits % 2:
            data_bytes.append(0)

        return np.array(data_bytes).astype("u1")

    def calc_cl_bytes(self):
        """
        Calculate the bytes written to the annotation file for the
        custom_labels field.

        Parameters
        ----------
        N/A

        Returns
        -------
        list, ndarray
            All of the bytes to be written to the annotation file.

        """
        if self.custom_labels is None:
            return []

        # The start wrapper: '0 NOTE length aux_note ## annotation type definitions'
        headbytes = [
            0,
            88,
            30,
            252,
            35,
            35,
            32,
            97,
            110,
            110,
            111,
            116,
            97,
            116,
            105,
            111,
            110,
            32,
            116,
            121,
            112,
            101,
            32,
            100,
            101,
            102,
            105,
            110,
            105,
            116,
            105,
            111,
            110,
            115,
        ]

        # The end wrapper: '0 NOTE length aux_note ## end of definitions' followed by SKIP -1, +1
        tailbytes = [
            0,
            88,
            21,
            252,
            35,
            35,
            32,
            101,
            110,
            100,
            32,
            111,
            102,
            32,
            100,
            101,
            102,
            105,
            110,
            105,
            116,
            105,
            111,
            110,
            115,
            0,
        ]

        custom_bytes = []

        for i in self.custom_labels.index:
            custom_bytes += custom_triplet_bytes(
                list(self.custom_labels.loc[i, list(ann_label_fields)])
            )

        # writecontent = []
        # for i in range(len(self.custom_labels)):
        #     writecontent.append([freenumbers[i],list(custom_labels.keys())[i],list(custom_labels.values())[i]])

        # custombytes = [customcode2bytes(triplet) for triplet in writecontent]
        # custombytes = [item for sublist in custombytes for item in sublist]

        return np.array(headbytes + custom_bytes + tailbytes).astype("u1")

    def calc_core_bytes(self):
        """
        Convert all used annotation fields into bytes to write.

        Parameters
        ----------
        N/A

        Returns
        -------
        list, ndarray
            All of the bytes to be written to the annotation file.

        """
        # The difference sample to write
        if len(self.sample) == 1:
            sampdiff = np.array([self.sample[0]])
        else:
            sampdiff = np.concatenate(([self.sample[0]], np.diff(self.sample)))

        # Create a copy of the annotation object with a
        # compact version of fields to write
        compact_annotation = copy.deepcopy(self)
        compact_annotation.compact_fields()

        # The optional fields to be written. Write if they are not None or all empty
        extra_write_fields = []

        for field in ["num", "subtype", "chan", "aux_note"]:
            if not isblank(getattr(compact_annotation, field)):
                extra_write_fields.append(field)

        data_bytes = []

        # Allow use of custom labels
        label_table = ann_label_table
        if self.custom_labels is not None:
            label_table = pd.concat(
                [label_table, self.custom_labels], ignore_index=True
            )

        # Generate typecodes from annotation label table
        typecodes = {
            label_table.iloc[i]["symbol"]: label_table.iloc[i]["label_store"]
            for i in range(len(label_table))
        }

        # Iterate across all fields one index at a time
        for i in range(len(sampdiff)):

            # Process the samp (difference) and sym items
            data_bytes.append(
                field2bytes(
                    "samptype", [sampdiff[i], self.symbol[i]], typecodes
                )
            )

            # Process the extra optional fields
            for field in extra_write_fields:
                value = getattr(compact_annotation, field)[i]
                if value is not None:
                    data_bytes.append(field2bytes(field, value, typecodes))

        # Flatten and convert to correct format
        data_bytes = np.array(
            [item for sublist in data_bytes for item in sublist]
        ).astype("u1")

        return data_bytes

    def compact_fields(self):
        """
        Compact all of the object's fields so that the output
        writing annotation file writes as few bytes as possible.

        Parameters
        ----------
        N/A

        Returns
        -------
        N/A

        """
        # Number of annotations
        nannots = len(self.sample)

        # Chan and num carry over previous fields. Get lists of as few
        # elements to write as possible
        self.chan = compact_carry_field(self.chan)
        self.num = compact_carry_field(self.num)

        # Elements of 0 (default) do not need to be written for subtype.
        # num and sub are signed in original c package...
        if self.subtype is not None:
            if isinstance(self.subtype, list):
                for i in range(nannots):
                    if self.subtype[i] == 0:
                        self.subtype[i] = None
                if np.array_equal(self.subtype, [None] * nannots):
                    self.subtype = None
            else:
                zero_inds = np.where(self.subtype == 0)[0]
                if len(zero_inds) == nannots:
                    self.subtype = None
                else:
                    self.subtype = list(self.subtype)
                    for i in zero_inds:
                        self.subtype[i] = None

        # Empty aux_note strings are not written
        if self.aux_note is not None:
            for i in range(nannots):
                if self.aux_note[i] == "":
                    self.aux_note[i] = None
            if np.array_equal(self.aux_note, [None] * nannots):
                self.aux_note = None

    def sym_to_aux(self):
        """
        Move non-encoded symbol elements into the aux_note field.

        Parameters
        ----------
        N/A

        Returns
        -------
        N/A

        """
        self.check_field("symbol")

        # Non-encoded symbols
        label_table_map = self.create_label_map(inplace=False)
        external_syms = set(self.symbol) - set(label_table_map["symbol"].values)

        if external_syms == set():
            return

        if self.aux_note is None:
            self.aux_note = [""] * len(self.sample)

        for ext in external_syms:
            for i in [i for i, x in enumerate(self.symbol) if x == ext]:
                if not self.aux_note[i]:
                    self.aux_note[i] = self.symbol[i]
                else:
                    self.aux_note[i] = self.symbol[i] + " " + self.aux_note[i]
                self.symbol[i] = '"'
        return

    def get_contained_labels(self, inplace=True):
        """
        Get the set of unique labels contained in this annotation.
        Returns a pandas dataframe or sets the contained_labels
        attribute of the object. Requires the label_store field to be set.

        Function will try to use attributes contained in the order:
        1. label_store
        2. symbol
        3. description

        This function should also be called to summarize information about an
        annotation after it has been read. Should not be a helper function
        to others except rdann.

        Parameters
        ----------
        inplace : bool, optional
            Determines whether to add the label map to the current
            object (True) or as a return variable (False).

        Returns
        -------
        contained_labels : pandas DataFrame
            Mapping based on ann_label_table and self.custom_labels.

        """
        if self.custom_labels is not None:
            self.check_field("custom_labels")

        # Create the label map
        label_map = ann_label_table.copy()

        # Convert the tuple triplets into a pandas dataframe if needed
        if isinstance(self.custom_labels, (list, tuple)):
            custom_labels = label_triplets_to_df(self.custom_labels)
        elif isinstance(self.custom_labels, pd.DataFrame):
            # Set the index just in case it doesn't already match the label_store
            self.custom_labels.set_index(
                self.custom_labels["label_store"].values, inplace=True
            )
            custom_labels = self.custom_labels
        else:
            custom_labels = None

        # Merge the standard WFDB labels with the custom labels.
        # custom labels values overwrite standard WFDB if overlap.
        if custom_labels is not None:
            for i in custom_labels.index:
                label_map.loc[i] = custom_labels.loc[i]
            # This doesn't work...
            # label_map.loc[custom_labels.index] = custom_labels.loc[custom_labels.index]

        # Get the labels using one of the features
        if self.label_store is not None:
            index_vals = set(self.label_store)
            reset_index = False
            counts = np.unique(self.label_store, return_counts=True)
        elif self.symbol is not None:
            index_vals = set(self.symbol)
            label_map.set_index(label_map["symbol"].values, inplace=True)
            reset_index = True
            counts = np.unique(self.symbol, return_counts=True)
        elif self.description is not None:
            index_vals = set(self.description)
            label_map.set_index(label_map["description"].values, inplace=True)
            reset_index = True
            counts = np.unique(self.description, return_counts=True)
        else:
            raise Exception("No annotation labels contained in object")

        contained_labels = label_map.loc[index_vals, :]

        # Add the counts
        for i in range(len(counts[0])):
            contained_labels.loc[counts[0][i], "n_occurrences"] = counts[1][i]
        contained_labels["n_occurrences"] = pd.to_numeric(
            contained_labels["n_occurrences"], downcast="integer"
        )

        if reset_index:
            contained_labels.set_index(
                contained_labels["label_store"].values, inplace=True
            )

        if inplace:
            self.contained_labels = contained_labels
            return
        else:
            return contained_labels

    def set_label_elements(self, wanted_label_elements):
        """
        Set one or more label elements based on at least one of the others.

        Parameters
        ----------
        wanted_label_elements : list
            All of the desired label elements.

        Returns
        -------
        N/A

        """
        if isinstance(wanted_label_elements, str):
            wanted_label_elements = [wanted_label_elements]

        # Figure out which desired label elements are missing
        missing_elements = [
            e for e in wanted_label_elements if getattr(self, e) is None
        ]

        contained_elements = [
            e for e in ann_label_fields if getattr(self, e) is not None
        ]

        if not contained_elements:
            raise Exception("No annotation labels contained in object")

        for e in missing_elements:
            self.convert_label_attribute(contained_elements[0], e)

        unwanted_label_elements = list(
            set(ann_label_fields) - set(wanted_label_elements)
        )

        self.rm_attributes(unwanted_label_elements)

        return

    def rm_attributes(self, attributes):
        """
        Remove attributes from object.

        Parameters
        ----------
        attributes : list
            All of the desired attributes to remove.

        Returns
        -------
        N/A

        """
        if isinstance(attributes, str):
            attributes = [attributes]
        for a in attributes:
            setattr(self, a, None)
        return

    def convert_label_attribute(
        self, source_field, target_field, inplace=True, overwrite=True
    ):
        """
        Convert one label attribute (label_store, symbol, or description) to
        another. Creates mapping df on the fly based on ann_label_table and
        self.custom_labels.

        Parameters
        ----------
        source_field : str
            The label attribute to be converted.
        target_field : str
            The label attribute that will be converted to.
        inplace : bool, optional
            Determines whether to add the label map to the current
            object (True) or as a return variable (False).
        overwrite : bool, optional
            If True, performs conversion and replaces target field attribute
            even if the target attribute already has a value. If False, does
            not perform conversion in the aforementioned case. Set to
            True (do conversion) if inplace=False.

        Returns
        -------
        target_item : list
            All of the desired target fields.

        """
        if inplace and not overwrite:
            if getattr(self, target_field) is not None:
                return

        label_map = self.create_label_map(inplace=False)
        label_map.set_index(label_map[source_field].values, inplace=True)

        try:
            target_item = label_map.loc[
                getattr(self, source_field), target_field
            ].values
        except KeyError:
            target_item = label_map.reindex(
                index=getattr(self, source_field), columns=[target_field]
            ).values.flatten()

        if target_field != "label_store":
            # Should already be int64 dtype if target is label_store
            target_item = list(target_item)

        if inplace:
            setattr(self, target_field, target_item)
        else:
            return target_item


def label_triplets_to_df(triplets):
    """
    Get a pd dataframe from a tuple triplets used to define annotation labels.

    Parameters
    ----------
    triplets : list
        Should come in the form: (label_store, symbol, description).

    Returns
    -------
    label_df : pandas DataFrame
        Captures all of the tuple triplets used to define annotation labels.

    """
    label_df = pd.DataFrame(
        {
            "label_store": np.array([t[0] for t in triplets], dtype="int"),
            "symbol": [t[1] for t in triplets],
            "description": [t[2] for t in triplets],
        }
    )

    label_df.set_index(label_df["label_store"].values, inplace=True)
    label_df = label_df[list(ann_label_fields)]

    return label_df


def custom_triplet_bytes(custom_triplet):
    """
    Convert triplet of [label_store, symbol, description] into bytes
    for defining custom labels in the annotation file.

    Parameters
    ----------
    custom_triplet : list
        Triplet of [label_store, symbol, description].

    Returns
    -------
    annbytes : list
        All the bytes converted from the tiplet.

    """
    # Structure: 0, NOTE, len(aux_note), aux_note, codenumber, space, codesymbol, space, description, (0 null if necessary)
    # Remember, aux_note string includes 'number(s)<space><symbol><space><description>''
    annbytes = (
        [0, 88, len(custom_triplet[2]) + 3 + len(str(custom_triplet[0])), 252]
        + [ord(c) for c in str(custom_triplet[0])]
        + [32]
        + [ord(custom_triplet[1])]
        + [32]
        + [ord(c) for c in custom_triplet[2]]
    )

    if len(annbytes) % 2:
        annbytes.append(0)

    return annbytes


def isblank(x):
    """
    Tests whether the item is blank.

    Parameters
    ----------
    x : ndarray, list
        The item to be checked.

    Returns
    -------
    bool
        Determines if the item was blank (True) or not (False).

    """
    if x is None:
        return True
    elif isinstance(x, list):
        if set(x) == set([None]):
            return True
    return False


def compact_carry_field(full_field):
    """
    Return the compact list version of a list/array of an annotation
    field that has previous values carried over (chan or num).
    - The first sample is 0 by default. Only set otherwise if necessary.
    - Only set fields if they are different from their prev field.

    Parameters
    ----------
    full_field : str
        The annotation field to be converted to a list.

    Returns
    -------
    compact_field : list
        Compact list version of the annotation fields.

    """
    # Keep in mind that the field may already be compact or None
    if full_field is None:
        return None

    # List of same length. Place None where element
    # does not need to be written
    compact_field = [None] * len(full_field)

    prev_field = 0

    for i in range(len(full_field)):
        current_field = full_field[i]
        if current_field != prev_field:
            compact_field[i] = current_field
            prev_field = current_field

    # May further simplify
    if np.array_equal(compact_field, [None] * len(full_field)):
        compact_field = None

    return compact_field


def field2bytes(field, value, typecodes):
    """
    Convert an annotation field into bytes to write.

    Parameters
    ----------
    field : str
        The annotation field of the value to be converted to bytes.
    value : list
        The value to be converted to bytes.
    typecodes : dict
        The mapping between each annotation label an its corresponding typecode.

    Returns
    -------
    data_bytes : list, ndarray
        All of the bytes to be written to the annotation file.

    """
    data_bytes = []

    # samp and sym bytes come together
    if field == "samptype":
        # Numerical value encoding annotation symbol
        typecode = typecodes[value[1]]
        # sample difference
        sd = value[0]

        data_bytes = []

        # Add SKIP element(s) if the sample difference is too large to
        # be stored in the annotation type word.
        #
        # Each SKIP element consists of three words (6 bytes):
        #  - Bytes 0-1 contain the SKIP indicator (59 << 10)
        #  - Bytes 2-3 contain the high 16 bits of the sample difference
        #  - Bytes 4-5 contain the low 16 bits of the sample difference
        # If the total difference exceeds 2**31 - 1, multiple skips must
        # be used.
        while sd > 1023:
            n = min(sd, 0x7FFFFFFF)
            data_bytes += [
                0,
                59 << 2,
                (n >> 16) & 255,
                (n >> 24) & 255,
                (n >> 0) & 255,
                (n >> 8) & 255,
            ]
            sd -= n

        # Annotation type itself is stored as a single word:
        #  - bits 0 to 9 store the sample difference (0 to 1023)
        #  - bits 10 to 15 store the type code
        data_bytes += [sd & 255, ((sd & 768) >> 8) + 4 * typecode]

    elif field == "num":
        # First byte stores num
        # second byte stores 60*4 indicator
        data_bytes = [value, 240]
    elif field == "subtype":
        # First byte stores subtype
        # second byte stores 61*4 indicator
        data_bytes = [value, 244]
    elif field == "chan":
        # First byte stores num
        # second byte stores 62*4 indicator
        data_bytes = [value, 248]
    elif field == "aux_note":
        # - First byte stores length of aux_note field
        # - Second byte stores 63*4 indicator
        # - Then store the aux_note string characters
        data_bytes = [len(value), 252] + [ord(i) for i in value]
        # Zero pad odd length aux_note strings
        if len(value) % 2:
            data_bytes.append(0)

    return data_bytes


def wrann(
    record_name,
    extension,
    sample,
    symbol=None,
    subtype=None,
    chan=None,
    num=None,
    aux_note=None,
    label_store=None,
    fs=None,
    custom_labels=None,
    write_dir="",
):
    """
    Write a WFDB annotation file.

    Specify at least the following:

    - The record name of the WFDB record (record_name).
    - The annotation file extension (extension).
    - The annotation locations in samples relative to the beginning of
      the record (sample).
    - Either the numerical values used to store the labels.
      (`label_store`), or more commonly, the display symbols of each
      label (`symbol`).

    Parameters
    ----------
    record_name : str
        The string name of the WFDB record to be written (without any file
        extensions).
    extension : str
        The string annotation file extension.
    sample : ndarray
        A numpy array containing the annotation locations in samples relative to
        the beginning of the record.
    symbol : list, numpy array, optional
        The symbols used to display the annotation labels. List or numpy array.
        If this field is present, `label_store` must not be present.
    subtype : ndarray, optional
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray, optional
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray, optional
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list, optional
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.
    label_store : ndarray, optional
        A numpy array containing the integer values used to store the
        annotation labels. If this field is present, `symbol` must not be
        present.
    fs : int, float, optional
        The numerical sampling frequency of the record to be written to the file.
    custom_labels : pandas dataframe, optional
        The map of custom defined annotation labels used for this annotation, in
        addition to the standard WFDB annotation labels. Custom labels are
        defined by two or three fields:

        - The integer values used to store custom annotation labels in the file
          (optional).
        - Their short display symbols
        - Their long descriptions.

        This input argument may come in four formats:

        1. A pandas.DataFrame object with columns:
           ['label_store', 'symbol', 'description']
        2. A pandas.DataFrame object with columns: ['symbol', 'description']
           If this option is chosen, label_store values are automatically chosen.
        3. A list or tuple of tuple triplets, with triplet elements
           representing: (label_store, symbol, description).
        4. A list or tuple of tuple pairs, with pair elements representing:
           (symbol, description). If this option is chosen, label_store values
           are automatically chosen.

        If the `label_store` field is given for this function, and
        `custom_labels` is defined, `custom_labels` must contain `label_store`
        in its mapping. ie. it must come in format 1 or 3 above.
    write_dir : str, optional
        The directory in which to write the annotation file

    Returns
    -------
    N/A

    Notes
    -----
    This is a gateway function, written as a simple way to write WFDB annotation
    files without needing to explicity create an Annotation object. You may also
    create an Annotation object, manually set its attributes, and call its
    `wrann` instance method.

    Each annotation stored in a WFDB annotation file contains a sample field and
    a label field. All other fields may or may not be present.

    Examples
    --------
    >>> # Read an annotation as an Annotation object
    >>> annotation = wfdb.rdann('b001', 'atr', pn_dir='cebsdb')
    >>> # Write a copy of the annotation file
    >>> wfdb.wrann('b001', 'cpy', annotation.sample, annotation.symbol)

    """
    # Create Annotation object
    annotation = Annotation(
        record_name=record_name,
        extension=extension,
        sample=sample,
        symbol=symbol,
        subtype=subtype,
        chan=chan,
        num=num,
        aux_note=aux_note,
        label_store=label_store,
        fs=fs,
        custom_labels=custom_labels,
    )

    # Find out which input field describes the labels
    if symbol is None:
        if label_store is None:
            raise Exception(
                "Either the 'symbol' field or the 'label_store' field must be set"
            )
    else:
        if label_store is None:
            annotation.sym_to_aux()
        else:
            raise Exception(
                "Only one of the 'symbol' and 'label_store' fields may be input, for describing annotation labels"
            )

    # Perform field checks and write the annotation file
    annotation.wrann(write_fs=True, write_dir=write_dir)


def show_ann_labels():
    """
    Display the standard WFDB annotation label mapping.

    Parameters
    ----------
    N/A

    Returns
    -------
    N/A

    Examples
    --------
    >>> show_ann_labels()

    """
    print(ann_label_table)


def show_ann_classes():
    """
    Display the standard WFDB annotation classes.

    Parameters
    ----------
    N/A

    Returns
    -------
    N/A

    Examples
    --------
    >>> show_ann_classes()

    """
    print(ann_class_table)


# todo: return as df option?
def rdann(
    record_name,
    extension,
    sampfrom=0,
    sampto=None,
    shift_samps=False,
    pn_dir=None,
    return_label_elements=["symbol"],
    summarize_labels=False,
):
    """
    Read a WFDB annotation file record_name.extension and return an
    Annotation object.

    Parameters
    ----------
    record_name : str
        The record name of the WFDB annotation file. ie. for file '100.atr',
        record_name='100'.
    extension : str
        The annotatator extension of the annotation file. ie. for  file
        '100.atr', extension='atr'.
    sampfrom : int, optional
        The minimum sample number for annotations to be returned.
    sampto : int, optional
        The maximum sample number for annotations to be returned.
    shift_samps : bool, optional
        Specifies whether to return the sample indices relative to `sampfrom`
        (True), or sample 0 (False).
    pn_dir : str, optional
        Option used to stream data from Physionet. The PhysioNet database
        directory from which to find the required annotation file. eg. For
        record '100' in 'http://physionet.org/content/mitdb': pn_dir='mitdb'.
    return_label_elements : list, optional
        The label elements that are to be returned from reading the annotation
        file. A list with at least one of the following options: 'symbol',
        'label_store', 'description'.
    summarize_labels : bool, optional
        If True, assign a summary table of the set of annotation labels
        contained in the file to the 'contained_labels' attribute of the
        returned object. This table will contain the columns:
        ['label_store', 'symbol', 'description', 'n_occurrences'].

    Returns
    -------
    annotation : Annotation
        The Annotation object. Call help(wfdb.Annotation) for the attribute
        descriptions.

    Notes
    -----
    For every annotation sample, the annotation file explictly stores the
    'sample' and 'symbol' fields, but not necessarily the others. When reading
    annotation files using this function, fields which are not stored in the
    file will either take their default values of 0 or None, or will be carried
    over from their previous values if any.

    Examples
    --------
    >>> ann = wfdb.rdann('sample-data/100', 'atr', sampto=300000)

    """
    if (pn_dir is not None) and ("." not in pn_dir):
        dir_list = pn_dir.split("/")
        pn_dir = posixpath.join(
            dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
        )

    return_label_elements = check_read_inputs(
        sampfrom, sampto, return_label_elements
    )

    # Read the file in byte pairs
    filebytes = load_byte_pairs(record_name, extension, pn_dir)

    # Get WFDB annotation fields from the file bytes
    (sample, label_store, subtype, chan, num, aux_note) = proc_ann_bytes(
        filebytes, sampto
    )

    # Get the indices of annotations that hold definition information about
    # the entire annotation file, and other empty annotations to be removed.
    potential_definition_inds, rm_inds = get_special_inds(
        sample, label_store, aux_note
    )

    # Try to extract information describing the annotation file
    (fs, custom_labels) = interpret_defintion_annotations(
        potential_definition_inds, aux_note
    )

    # Remove annotations that do not store actual sample and label information
    (sample, label_store, subtype, chan, num, aux_note) = rm_empty_indices(
        rm_inds, sample, label_store, subtype, chan, num, aux_note
    )

    # Convert lists to numpy arrays dtype='int'
    (label_store, subtype, chan, num) = lists_to_int_arrays(
        label_store, subtype, chan, num
    )

    # Convert sample numbers to a numpy array of 'int64'
    sample = np.array(sample, dtype="int64")

    # Try to get fs from the header file if it is not contained in the
    # annotation file
    if fs is None:
        try:
            rec = record.rdheader(record_name, pn_dir)
            fs = rec.fs
        except:
            pass

    # Create the annotation object
    annotation = Annotation(
        record_name=os.path.split(record_name)[1],
        extension=extension,
        sample=sample,
        label_store=label_store,
        subtype=subtype,
        chan=chan,
        num=num,
        aux_note=aux_note,
        fs=fs,
        custom_labels=custom_labels,
    )

    # Apply the desired index range
    if sampfrom > 0 and sampto is not None:
        annotation.apply_range(sampfrom=sampfrom, sampto=sampto)

    # If specified, obtain annotation samples relative to the starting
    # index
    if shift_samps and len(sample) > 0 and sampfrom:
        annotation.sample = annotation.sample - sampfrom

    # Get the set of unique label definitions contained in this
    # annotation
    if summarize_labels:
        annotation.get_contained_labels(inplace=True)

    # Set/unset the desired label values
    annotation.set_label_elements(return_label_elements)

    return annotation


def check_read_inputs(sampfrom, sampto, return_label_elements):
    """
    Check if all the read inputs are valid.

    Parameters
    ----------
    sampfrom : int
        The minimum sample number for annotations to be returned.
    sampto : int
        The maximum sample number for annotations to be returned.
    return_label_elements : list, str
        The label elements that are to be returned from reading the annotation
        file. A list with at least one of the following options: 'symbol',
        'label_store', 'description'.

    Returns
    -------
    return_label_elements : list
        The label elements that are to be returned from reading the annotation
        file. A list with at least one of the following options: 'symbol',
        'label_store', 'description'.

    """
    if sampto and sampto <= sampfrom:
        raise ValueError("sampto must be greater than sampfrom")
    if sampfrom < 0:
        raise ValueError("sampfrom must be a non-negative integer")

    if isinstance(return_label_elements, str):
        return_label_elements = [return_label_elements]

    if set.union(set(ann_label_fields), set(return_label_elements)) != set(
        ann_label_fields
    ):
        raise ValueError(
            "return_label_elements must be a list containing one or more of the following elements:",
            ann_label_fields,
        )

    return return_label_elements


def load_byte_pairs(record_name, extension, pn_dir):
    """
    Load the annotation file 1 byte at a time and arrange in pairs.

    Parameters
    ----------
    record_name : str
        The record name of the WFDB annotation file. ie. for file '100.atr',
        record_name='100'.
    extension : str
        The annotatator extension of the annotation file. ie. for  file
        '100.atr', extension='atr'.
    pn_dir : str
        Option used to stream data from Physionet. The PhysioNet database
        directory from which to find the required annotation file. eg. For
        record '100' in 'http://physionet.org/content/mitdb': pn_dir='mitdb'.

    Returns
    -------
    filebytes : ndarray
        The input filestream converted to an Nx2 array of unsigned bytes.

    """
    # local file
    if pn_dir is None:
        with open(record_name + "." + extension, "rb") as f:
            filebytes = np.fromfile(f, "<u1").reshape([-1, 2])
    # PhysioNet file
    else:
        filebytes = download._stream_annotation(
            record_name + "." + extension, pn_dir
        ).reshape([-1, 2])

    return filebytes


def proc_ann_bytes(filebytes, sampto):
    """
    Get regular annotation fields from the annotation bytes.

    Parameters
    ----------
    filebytes : ndarray
        The input filestream converted to an Nx2 array of unsigned bytes.
    sampto : int
        The maximum sample number for annotations to be returned.

    Returns
    -------
    sample : ndarray
        A numpy array containing the annotation locations in samples relative to
        the beginning of the record.
    label_store : ndarray
        A numpy array containing the integer values used to store the
        annotation labels. If this field is present, `symbol` must not be
        present.
    subtype : ndarray
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.

    """
    # Base annotation fields
    sample, label_store, subtype, chan, num, aux_note = [], [], [], [], [], []

    # Indexing Variables

    # Total number of sample from beginning of record. Annotation bytes
    # only store sample_diff
    sample_total = 0
    # Byte pair index
    bpi = 0

    # Process annotations. Iterate across byte pairs.
    # Sequence for one ann is:
    # - SKIP pair (if any)
    # - samp + sym pair
    # - other pairs (if any)
    # The last byte pair of the file is 0 indicating eof.
    while bpi < filebytes.shape[0] - 1:

        # Get the sample and label_store fields of the current annotation
        sample_diff, current_label_store, bpi = proc_core_fields(filebytes, bpi)
        sample_total = sample_total + sample_diff
        sample.append(sample_total)
        label_store.append(current_label_store)

        # Process any other fields belonging to this annotation

        # Flags that specify whether the extra fields need to be updated
        update = {"subtype": True, "chan": True, "num": True, "aux_note": True}
        # Get the next label store value - it may indicate additional
        # fields for this annotation, or the values of the next annotation.
        current_label_store = filebytes[bpi, 1] >> 2

        while current_label_store > 59:
            subtype, chan, num, aux_note, update, bpi = proc_extra_field(
                current_label_store,
                filebytes,
                bpi,
                subtype,
                chan,
                num,
                aux_note,
                update,
            )

            current_label_store = filebytes[bpi, 1] >> 2

        # Set defaults or carry over previous values if necessary
        subtype, chan, num, aux_note = update_extra_fields(
            subtype, chan, num, aux_note, update
        )

        if sampto and sampto < sample_total:
            sample, label_store, subtype, chan, num, aux_note = rm_last(
                sample, label_store, subtype, chan, num, aux_note
            )
            break

    return sample, label_store, subtype, chan, num, aux_note


def proc_core_fields(filebytes, bpi):
    """
    Get the sample difference and store fields of the current annotation.

    Parameters
    ----------
    filebytes : ndarray
        The input filestream converted to an Nx2 array of unsigned bytes.
    bpi : int
        The index to start the conversion.

    Returns
    -------
    sample_diff : int
        The sample difference.
    label_store : ndarray
        A numpy array containing the integer values used to store the
        annotation labels. If this field is present, `symbol` must not be
        present.
    bpi : int
        The index to start the conversion.

    """
    sample_diff = 0

    # The current byte pair will contain either the actual d_sample + annotation store value,
    # or 0 + SKIP.
    while filebytes[bpi, 1] >> 2 == 59:
        # 4 bytes storing dt
        skip_diff = (
            (int(filebytes[bpi + 1, 0]) << 16)
            + (int(filebytes[bpi + 1, 1]) << 24)
            + (int(filebytes[bpi + 2, 0]) << 0)
            + (int(filebytes[bpi + 2, 1]) << 8)
        )

        # Data type is long integer (stored in two's complement). Range -2**31 to 2**31 - 1
        if skip_diff > 2147483647:
            skip_diff = skip_diff - 4294967296

        sample_diff += skip_diff
        bpi = bpi + 3

    # Not a skip - it is the actual sample number + annotation type store value
    label_store = filebytes[bpi, 1] >> 2
    sample_diff += int(filebytes[bpi, 0] + 256 * (filebytes[bpi, 1] & 3))
    bpi = bpi + 1

    return sample_diff, label_store, bpi


def proc_extra_field(
    label_store, filebytes, bpi, subtype, chan, num, aux_note, update
):
    """
    Process extra fields belonging to the current annotation. Potential
    updated fields: subtype, chan, num, aux_note.

    Parameters
    ----------
    label_store : ndarray
        A numpy array containing the integer values used to store the
        annotation labels. If this field is present, `symbol` must not be
        present.
    filebytes : str
        The input filestream converted to bytes.
    bpi : int
        The index to start the conversion.
    subtype : ndarray
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.
    update : dict
        The container of updated fields.

    Returns
    -------
    subtype : ndarray
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.
    update : dict
        The container of updated fields.
    bpi : int
        The index to start the conversion.

    """
    # aux_note and sub are reset between annotations. chan and num copy over
    # previous value if missing.

    # SUB
    if label_store == 61:
        # sub is interpreted as signed char.
        subtype.append(filebytes[bpi, 0].astype("i1"))
        update["subtype"] = False
        bpi = bpi + 1
    # CHAN
    elif label_store == 62:
        # chan is interpreted as un_signed char
        chan.append(filebytes[bpi, 0])
        update["chan"] = False
        bpi = bpi + 1
    # NUM
    elif label_store == 60:
        # num is interpreted as signed char
        num.append(filebytes[bpi, 0].astype("i1"))
        update["num"] = False
        bpi = bpi + 1
    # aux_note
    elif label_store == 63:
        # length of aux_note string. Max 256? No need to check other bits of
        # second byte?
        aux_notelen = filebytes[bpi, 0]
        aux_notebytes = filebytes[
            bpi + 1 : bpi + 1 + int(np.ceil(aux_notelen / 2.0)), :
        ].flatten()
        if aux_notelen & 1:
            aux_notebytes = aux_notebytes[:-1]
        # The aux_note string
        aux_note.append("".join([chr(char) for char in aux_notebytes]))
        update["aux_note"] = False
        bpi = bpi + 1 + int(np.ceil(aux_notelen / 2.0))

    return subtype, chan, num, aux_note, update, bpi


def update_extra_fields(subtype, chan, num, aux_note, update):
    """
    Update the field if the current annotation did not
    provide a value.

    - aux_note and sub are set to default values if missing.
    - chan and num copy over previous value if missing.

    Parameters
    ----------
    subtype : ndarray
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.
    update : dict
        The container of updated fields.

    Returns
    -------
    subtype : ndarray
        A numpy array containing the marked class/category of each annotation.
    chan : ndarray
        A numpy array containing the signal channel associated with each
        annotation.
    num : ndarray
        A numpy array containing the labelled annotation number for each
        annotation.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.

    """
    if update["subtype"]:
        subtype.append(0)

    if update["chan"]:
        if chan == []:
            chan.append(0)
        else:
            chan.append(chan[-1])
    if update["num"]:
        if num == []:
            num.append(0)
        else:
            num.append(num[-1])

    if update["aux_note"]:
        aux_note.append("")

    return subtype, chan, num, aux_note


rx_fs = re.compile(r"## time resolution: (?P<fs>\d+\.?\d*)")
rx_custom_label = re.compile(
    r"(?P<label_store>\d+) (?P<symbol>\S+) (?P<description>.+)"
)


def get_special_inds(sample, label_store, aux_note):
    """
    Get the indices of annotations that hold definition information about
    the entire annotation file, and other empty annotations to be removed.

    Note: There is no need to deal with SKIP annotations (label_store=59)
          which were already dealt with in proc_core_fields and hence not
          included here.

    Parameters
    ----------
    sample : ndarray
        A numpy array containing the annotation locations in samples relative to
        the beginning of the record.
    label_store : ndarray
        A numpy array containing the integer values used to store the
        annotation labels. If this field is present, `symbol` must not be
        present.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.

    Returns
    -------
    potential_definition_inds : set
        The indices to be used for annotation notes.
    rm_inds : set
        The indices to be removed.

    """
    s0_inds = np.where(sample == np.int64(0))[0]
    note_inds = np.where(label_store == np.int64(22))[0]

    # sample = 0 with aux_note means there should be an fs or custom label definition.
    # Either way, they are to be removed.
    potential_definition_inds = set(s0_inds).intersection(note_inds)

    # Other indices which are not actual annotations.
    notann_inds = np.where(label_store == np.int64(0))[0]

    rm_inds = potential_definition_inds.union(set(notann_inds))

    return potential_definition_inds, rm_inds


def interpret_defintion_annotations(potential_definition_inds, aux_note):
    """
    Try to extract annotation definition information from annotation notes.
    Information that may be contained:
    - fs - sample=0, label_state=22, aux_note='## time resolution: XXX'.
    - custom annotation label definitions.

    Parameters
    ----------
    potential_definition_inds : set
        The indices to be used for annotation notes.
    aux_note : list
        A list containing the auxiliary information string (or None for
        annotations without notes) for each annotation.

    Returns
    -------
    fs : int, float
        The sampling frequency of the record.
    custom_labels : pandas dataframe
        The custom annotation labels defined in the annotation file. Maps
        the relationship between the three label fields. The data type is a
        pandas DataFrame with three columns:
        ['label_store', 'symbol', 'description'].

    """
    fs = None
    custom_labels = []

    if len(potential_definition_inds) > 0:
        i = 0
        while i < len(potential_definition_inds):
            if aux_note[i].startswith("## "):
                if not fs:
                    search_fs = rx_fs.findall(aux_note[i])
                    if search_fs:
                        fs = float(search_fs[0])
                        if round(fs, 8) == float(int(fs)):
                            fs = int(fs)
                        i += 1
                        continue
                if aux_note[i] == "## annotation type definitions":
                    i += 1
                    while aux_note[i] != "## end of definitions":
                        (
                            label_store,
                            symbol,
                            description,
                        ) = rx_custom_label.findall(aux_note[i])[0]
                        custom_labels.append(
                            (int(label_store), symbol, description)
                        )
                        i += 1
                    i += 1
            else:
                i += 1

    if not custom_labels:
        custom_labels = None

    return fs, custom_labels


def rm_empty_indices(*args):
    """
    Remove unwanted list indices.

    Parameters
    ----------
    args : tuple
        First argument is the list of indices to remove. Other elements
        are the lists to trim.

    Returns
    -------
    list
        The remaining trimmed list.

    """
    rm_inds = args[0]

    if not rm_inds:
        return args[1:]

    keep_inds = [i for i in range(len(args[1])) if i not in rm_inds]

    return [[a[i] for i in keep_inds] for a in args[1:]]


def lists_to_int_arrays(*args):
    """
    Convert lists to numpy int arrays.

    Parameters
    ----------
    args : tuple
        Any number of lists to be converted.

    Returns
    -------
    numpy array
        The converted input list.

    """
    return [np.array(a, dtype="int") for a in args]


def rm_last(*args):
    """
    Remove the last index from each list.

    Parameters
    ----------
    args : tuple
        Any number of lists to be trimmed.

    Returns
    -------
    list
        The trimmed input list.

    """
    if len(args) == 1:
        return args[:-1]
    else:
        return [a[:-1] for a in args]
    return


def mrgann(
    ann_file1,
    ann_file2,
    out_file_name="merged_ann.atr",
    merge_method="combine",
    chan1=-1,
    chan2=-1,
    start_ann=0,
    end_ann="e",
    record_only=True,
    verbose=False,
):
    """
    This function reads a pair of annotation files (specified by `ann_file1`
    and `ann_file2`) for the specified record and writes a third annotation
    file (specified by `out_file_name`) for the same record. The header (.hea)
    file should be included in the same directory as each annotation file so
    that the sampling rate can be read. Typical applications of `mrgann`
    include combining annotation files that apply to different signals within
    a multi-signal record, and replacing a segment of an annotation file with
    annotations from another file. For example, setting 'merge_method' to
    'combine' will simply blindly merge the annotation files for the specified
    'start_ann' and 'end_ann' range while setting 'merge_method' to 'replace1'
    will replace the contents of the first file with the second in that
    specified range. Setting 'merge_method' to 'replace2' will replace the
    contents of the second file with the first in that specified range.

    Parameters
    ----------
    ann_file1 : string
        The file path of the first annotation file (with extension included).
    ann_file2 : string
        The file path of the second annotation file (with extension included).
    out_file_name : string
        The name of the output file name (with extension included). The
        default is 'merged_ann.atr'.
    merge_method : string, optional
        The method used to merge the two annotation files. The default is
        'combine' which simply combines the two files along every attribute;
        duplicates will be preserved. The other options are 'replace1' which
        replaces attributes of the first annotation file with attributes of
        the second for the desired time range, 'replace2' which does the
        same thing except switched (first file replaces second), and 'delete'
        which deletes all of the annotations in the desired time range.
    chan1 : int, optional
        Sets the value of `chan` for the first annotation file. The default is
        -1 which means to keep it the same.
    chan2 : int, optional
        Sets the value of `chan` for the second annotation file. The default
        is -1 which means to keep it the same.
    start_ann : float, int, string, optional
        The location (sample, time, etc.) to start the annotation filtering.
        If float, it will be interpreted as time in seconds. If int, it will
        be interpreted as sample number. If string, it will be interpreted
        as time formatted in HH:MM:SS format (the same as that in `wfdbtime`).
        The default is 0 to represent sample number 0. A value of 0.0 would
        represent 0 seconds instead.
    end_ann : float, int, string, optional
        The location (sample, time, etc.) to stop the annotation filtering.
        If float, it will be interpreted as time in seconds. If int, it will
        be interpreted as sample number. If string, it will be interpreted
        as time formatted in HH:MM:SS format (the same as that in `wfdbtime`).
        The default is 'e' to represent the end of the annotation.
    record_only : bool, optional
        Whether to only return the annotation information (True) or not
        (False). If False, this function will generate a WFDB-formatted
        annotation file. If True, it will return the object returned if that
        file was read with `rdann`.
    verbose : bool, optional
        Whether to print all the information read about each annotation file
        and the methodology for merging them (True) or not (False).

    Returns
    -------
    N/A : Annotation, optional
        If 'record_only' is set to True, then return the new WFDB-formatted
        annotation object which is the same as generated by the `rdann`
        output. Else, create the WFDB-formatted annotation file.

    """
    ann1 = rdann(ann_file1.split(".")[0], ann_file1.split(".")[1])
    ann2 = rdann(ann_file2.split(".")[0], ann_file2.split(".")[1])
    if ann1.fs != ann2.fs:
        raise Exception(
            "Annotation sample rates do not match up: samples "
            "can be aligned but final sample rate can not be "
            "determined"
        )
    # Apply the channel mapping if desired
    if chan1 != -1:
        if chan1 < -1:
            raise Exception("Invalid value for `chan1`: must be >= 0")
        ann1.chan = np.array([chan1] * ann1.ann_len)
    if chan2 != -1:
        if chan2 < -1:
            raise Exception("Invalid value for `chan2`: must be >= 0")
        ann2.chan = np.array([chan2] * ann2.ann_len)

    if start_ann == "e":
        raise Exception("Start time can not be set to the end of the record")
    if end_ann == 0:
        raise Exception("End time can not be set to the start of the record")

    samples = []
    for i, time in enumerate([start_ann, end_ann]):
        if time == "e":
            # End of annotation, set end sample to largest int, roughly
            sample = sys.maxsize
        else:
            if type(time) is int:
                # Sample number
                sample = time
            elif type(time) is float:
                # Time in seconds
                sample = int(time * ann1.fs)
            else:
                # HH:MM:SS format, loosely
                time_split = [t if t != "" else "0" for t in time.split(":")]
                if len(time_split) == 1:
                    seconds = float(time) % 60
                    minutes = int(float(time) // 60)
                    hours = int(float(time) // 60 // 60)
                elif len(time_split) == 2:
                    seconds = float(time_split[1])
                    minutes = int(time_split[0])
                    hours = 0
                elif len(time_split) == 3:
                    seconds = float(time_split[2])
                    minutes = int(time_split[1])
                    hours = int(time_split[0])
                if seconds >= 60:
                    raise Exception("Seconds not in correct format")
                if minutes >= 60:
                    raise Exception("Minutes not in correct format")
                total_seconds = hours * 60 * 60 + minutes * 60 + seconds
                if (i == 1) and (total_seconds == 0):
                    raise Exception(
                        "End time can not be set to the start of " "the record"
                    )
                sample = int(total_seconds * ann1.fs)
                if sample > max([max(ann1.sample), max(ann2.sample)]):
                    if i == 0:
                        raise Exception(
                            "Start time can not be set to the "
                            "end of the record"
                        )
                    else:
                        print(
                            "'end_ann' greater than the highest "
                            "annotation... reverting to the highest "
                            "annotation"
                        )
        samples.append(sample)
    start_sample = samples[0]
    end_sample = samples[1]
    if verbose:
        print(f"Start sample: {start_sample}, end sample: {end_sample}")

    if (merge_method == "combine") or (merge_method == "delete"):
        if verbose:
            print("Combining the two files together")
        # The sample should never be empty but others can (though they
        # shouldn't be)
        both_sample = np.concatenate([ann1.sample, ann2.sample]).astype(
            np.int64
        )
        # Generate a list of sorted indices then sort the array
        sort_indices = np.argsort(both_sample)
        both_sample = np.sort(both_sample)
        # Find where to filter the array
        if merge_method == "combine":
            sample_range = (both_sample >= start_sample) & (
                both_sample <= end_sample
            )
        if merge_method == "delete":
            sample_range = (both_sample < start_sample) | (
                both_sample > end_sample
            )
        index_range = np.where(sample_range)[0]
        both_sample = both_sample[sample_range]
        # Combine both annotation attributes
        ann_attr = {}
        blank_array = np.array([], dtype=np.int64)
        for cat in [
            "chan",
            "num",
            "subtype",
            "label_store",
            "symbol",
            "aux_note",
        ]:
            ann1_cat = ann1.__dict__[cat]
            ann2_cat = ann2.__dict__[cat]
            if cat in ["symbol", "aux_note"]:
                ann1_cat = ann1_cat if ann1_cat is not None else []
                ann2_cat = ann2_cat if ann2_cat is not None else []
                temp_cat = ann1_cat
                temp_cat.extend(ann2_cat)
                if len(temp_cat) == 0:
                    ann_attr[cat] = None
                else:
                    temp_cat = [temp_cat[i] for i in sort_indices]
                    ann_attr[cat] = [temp_cat[i] for i in index_range]
            else:
                ann1_cat = ann1_cat if ann1_cat is not None else blank_array
                ann2_cat = ann2_cat if ann2_cat is not None else blank_array
                temp_cat = np.concatenate([ann1_cat, ann2_cat]).astype(np.int64)
                if temp_cat.shape[0] == 0:
                    ann_attr[cat] = None
                else:
                    temp_cat = np.array([temp_cat[i] for i in sort_indices])
                    ann_attr[cat] = np.array([temp_cat[i] for i in index_range])

    elif (merge_method == "replace1") or (merge_method == "replace2"):
        if merge_method == "replace1":
            if verbose:
                print(
                    "Replacing the contents of the first file with the "
                    "contents of the second"
                )
            keep_ann = ann2
            remove_ann = ann1
        elif merge_method == "replace2":
            if verbose:
                print(
                    "Replacing the contents of the second file with the "
                    "contents of the first"
                )
            keep_ann = ann1
            remove_ann = ann2
        # Find where to filter the first array
        keep_sample_range = (keep_ann.sample >= start_sample) & (
            keep_ann.sample <= end_sample
        )
        keep_index_range = np.where(keep_sample_range)[0]
        # Find where to filter the second array
        remove_sample_range = (remove_ann.sample < start_sample) | (
            remove_ann.sample > end_sample
        )
        remove_index_range = np.where(remove_sample_range)[0]
        # The sample should never be empty but others can (though they
        # shouldn't be)
        keep_ann_sample = keep_ann.sample[keep_index_range]
        remove_ann_sample = remove_ann.sample[remove_index_range]
        both_sample = np.concatenate(
            [keep_ann_sample, remove_ann_sample]
        ).astype(np.int64)
        # Generate a list of sorted indices then sort the array
        sort_indices = np.argsort(both_sample)
        both_sample = np.sort(both_sample)
        # Combine both annotation attributes
        ann_attr = {}
        blank_array = np.array([], dtype=np.int64)
        for cat in [
            "chan",
            "num",
            "subtype",
            "label_store",
            "symbol",
            "aux_note",
        ]:
            keep_cat = keep_ann.__dict__[cat]
            remove_cat = remove_ann.__dict__[cat]
            if cat in ["symbol", "aux_note"]:
                keep_cat = (
                    [keep_cat[i] for i in keep_index_range]
                    if keep_cat is not None
                    else []
                )
                remove_cat = (
                    [remove_cat[i] for i in remove_index_range]
                    if remove_cat is not None
                    else []
                )
                temp_cat = keep_cat
                temp_cat.extend(remove_cat)
                if len(temp_cat) == 0:
                    ann_attr[cat] = None
                else:
                    ann_attr[cat] = [temp_cat[i] for i in sort_indices]
            else:
                keep_cat = (
                    np.array([keep_cat[i] for i in keep_index_range])
                    if keep_cat is not None
                    else blank_array
                )
                remove_cat = (
                    np.array([remove_cat[i] for i in remove_index_range])
                    if remove_cat is not None
                    else blank_array
                )
                temp_cat = np.concatenate([keep_cat, remove_cat]).astype(
                    np.int64
                )
                if temp_cat.shape[0] == 0:
                    ann_attr[cat] = None
                else:
                    ann_attr[cat] = np.array(
                        [temp_cat[i] for i in sort_indices]
                    )
    else:
        raise Exception(
            "Invalid value for 'merge_method': options are "
            "'combine', 'replace1', and 'replace2'"
        )

    if record_only:
        if verbose:
            print("Returning Annotation object")
        return Annotation(
            record_name=out_file_name.split(".")[0],
            extension=out_file_name.split(".")[1],
            sample=both_sample,
            symbol=ann_attr["symbol"],
            subtype=ann_attr["subtype"],
            chan=ann_attr["chan"],
            num=ann_attr["num"],
            aux_note=ann_attr["aux_note"],
            label_store=ann_attr["label_store"],
            fs=ann1.fs,
        )
    else:
        if verbose:
            print(f"Creating annotation file called: {out_file_name}")
        wrann(
            out_file_name.split(".")[0],
            out_file_name.split(".")[1],
            sample=both_sample,
            symbol=ann_attr["symbol"],
            subtype=ann_attr["subtype"],
            chan=ann_attr["chan"],
            num=ann_attr["num"],
            aux_note=ann_attr["aux_note"],
            label_store=ann_attr["label_store"],
            fs=ann1.fs,
        )


def format_ann_from_df(df_in):
    """
    Parameters
    ----------
    df_in : Pandas dataframe
        Contains all the information needed to create WFDB-formatted
        annotations. Of the form:
            onset,duration,description
            onset_1,duration_1,description_1
            onset_2,duration_2,description_2
            ...,...,...

    Returns
    -------
    N/A : Pandas dataframe
        The WFDB-formatted input dataframe.

    """
    # Create two separate dataframes for the start and end annotation
    # then remove them from the original
    df_start = df_in[df_in["duration"] > 0]
    df_end = df_in[df_in["duration"] > 0]
    df_trunc = df_in[df_in["duration"] == 0]
    # Append parentheses at the start for annotation start and end for
    # annotation end
    df_start["description"] = "(" + df_start["description"].astype(str)
    df_end["description"] = df_end["description"].astype(str) + ")"
    # Add the duration time to the onset for the end annotation to convert
    # to single time annotations only
    df_end["onset"] = df_end["onset"] + df_end["duration"]
    # Concatenate all of the dataframes
    df_out = pd.concat([df_trunc, df_start, df_end], ignore_index=True)
    # Make sure the sorting is correct
    df_out["col_index"] = df_out.index
    return df_out.sort_values(["onset", "col_index"])


## ------------- Annotation Field Specifications ------------- ##


"""
WFDB field specifications for each field. The indexes are the field
names.

Notes
-----
In the original WFDB package, certain fields have default values, but
not all of them. Some attributes need to be present for core
functionality, ie. baseline, whereas others are not essential, yet have
defaults, ie. base_time.

This inconsistency has likely resulted in the generation of incorrect
files, and general confusion. This library aims to make explicit,
whether certain fields are present in the file, by setting their values
to None if they are not written in, unless the fields are essential, in
which case an actual default value will be set.

The read vs write default values are different for 2 reasons:
1. We want to force the user to be explicit with certain important
   fields when writing WFDB records fields, without affecting
   existing WFDB headers when reading.
2. Certain unimportant fields may be dependencies of other
   important fields. When writing, we want to fill in defaults
   so that the user doesn't need to. But when reading, it should
   be clear that the fields are missing.

"""
# Allowed types of each Annotation object attribute.
ALLOWED_TYPES = {
    "record_name": (str),
    "extension": (str),
    "sample": (np.ndarray,),
    "symbol": (list, np.ndarray),
    "subtype": (np.ndarray,),
    "chan": (np.ndarray,),
    "num": (np.ndarray,),
    "aux_note": (list, np.ndarray),
    "fs": _header.float_types,
    "label_store": (np.ndarray,),
    "description": (list, np.ndarray),
    "custom_labels": (pd.DataFrame, list, tuple),
    "contained_labels": (pd.DataFrame, list, tuple),
}

str_types = (str, np.str_)

# Elements of the annotation label
ann_label_fields = ("label_store", "symbol", "description")


class AnnotationClass(object):
    """
    Describes the annotations.

    Attributes
    ----------
    extension : str
        The file extension of the annotation.
    description : str
        The description provided with the annotation.
    human_reviewed : bool
        Whether the annotation was human-reviewed (True) or not (False).

    """

    def __init__(self, extension, description, human_reviewed):

        self.extension = extension
        self.description = description
        self.human_reviewed = human_reviewed


ann_classes = [
    AnnotationClass("atr", "Reference ECG annotations", True),
    AnnotationClass("blh", "Human reviewed beat labels", True),
    AnnotationClass("blm", "Machine beat labels", False),
    AnnotationClass("alh", "Human reviewed alarms", True),
    AnnotationClass("alm", "Machine alarms", False),
    AnnotationClass("qrsc", "Human reviewed QRS detections", True),
    AnnotationClass("qrs", "Machine QRS detections", False),
    AnnotationClass("bph", "Human reviewed BP beat detections", True),
    AnnotationClass("bpm", "Machine BP beat detections", False),
    # AnnotationClass('alh', 'Human reviewed BP alarms', True),
    # AnnotationClass('alm', 'Machine BP alarms', False),
    # separate ECG and other signal category alarms?
    # Can we use signum to determine the channel it was triggered off?
    # ppg alarms?
    # eeg alarms?
]

ann_class_table = pd.DataFrame(
    {
        "extension": [ac.extension for ac in ann_classes],
        "description": [ac.description for ac in ann_classes],
        "human_reviewed": [ac.human_reviewed for ac in ann_classes],
    }
)
ann_class_table.set_index(ann_class_table["extension"].values, inplace=True)
ann_class_table = ann_class_table[
    ["extension", "description", "human_reviewed"]
]

# Individual annotation labels
class AnnotationLabel(object):
    """
    Describes the individual annotation labels.

    Attributes
    ----------
    label_store : int
        The value used to store the labels.
    symbol : str
        The shortened version of the annotation label abbreviation.
    short_description : str
        The shortened version of the description provided with the annotation.
    description : str
        The description provided with the annotation.

    """

    def __init__(self, label_store, symbol, short_description, description):
        self.label_store = label_store
        self.symbol = symbol
        self.short_description = short_description
        self.description = description

    def __str__(self):
        return (
            str(self.label_store)
            + ", "
            + str(self.symbol)
            + ", "
            + str(self.short_description)
            + ", "
            + str(self.description)
        )


is_qrs = [
    False,
    True,
    True,
    True,
    True,
    True,
    True,
    True,
    True,
    True,  # 0 - 9
    True,
    True,
    True,
    True,
    False,
    False,
    False,
    False,
    False,
    False,  # 10 - 19
    False,
    False,
    False,
    False,
    False,
    True,
    False,
    False,
    False,
    False,  # 20 - 29
    True,
    True,
    False,
    False,
    True,
    True,
    False,
    False,
    True,
    False,  # 30 - 39
    False,
    True,
    False,
    False,
    False,
    False,
    False,
    False,
    False,
    False,  # 40 - 49
]

ann_labels = [
    AnnotationLabel(0, " ", "NOTANN", "Not an actual annotation"),
    AnnotationLabel(1, "N", "NORMAL", "Normal beat"),
    AnnotationLabel(2, "L", "LBBB", "Left bundle branch block beat"),
    AnnotationLabel(3, "R", "RBBB", "Right bundle branch block beat"),
    AnnotationLabel(4, "a", "ABERR", "Aberrated atrial premature beat"),
    AnnotationLabel(5, "V", "PVC", "Premature ventricular contraction"),
    AnnotationLabel(6, "F", "FUSION", "Fusion of ventricular and normal beat"),
    AnnotationLabel(7, "J", "NPC", "Nodal (junctional) premature beat"),
    AnnotationLabel(8, "A", "APC", "Atrial premature contraction"),
    AnnotationLabel(
        9, "S", "SVPB", "Premature or ectopic supraventricular beat"
    ),
    AnnotationLabel(10, "E", "VESC", "Ventricular escape beat"),
    AnnotationLabel(11, "j", "NESC", "Nodal (junctional) escape beat"),
    AnnotationLabel(12, "/", "PACE", "Paced beat"),
    AnnotationLabel(13, "Q", "UNKNOWN", "Unclassifiable beat"),
    AnnotationLabel(14, "~", "NOISE", "Signal quality change"),
    # AnnotationLabel(15, None, None, None),
    AnnotationLabel(16, "|", "ARFCT", "Isolated QRS-like artifact"),
    # AnnotationLabel(17, None, None, None),
    AnnotationLabel(18, "s", "STCH", "ST change"),
    AnnotationLabel(19, "T", "TCH", "T-wave change"),
    AnnotationLabel(20, "*", "SYSTOLE", "Systole"),
    AnnotationLabel(21, "D", "DIASTOLE", "Diastole"),
    AnnotationLabel(22, '"', "NOTE", "Comment annotation"),
    AnnotationLabel(23, "=", "MEASURE", "Measurement annotation"),
    AnnotationLabel(24, "p", "PWAVE", "P-wave peak"),
    AnnotationLabel(25, "B", "BBB", "Left or right bundle branch block"),
    AnnotationLabel(26, "^", "PACESP", "Non-conducted pacer spike"),
    AnnotationLabel(27, "t", "TWAVE", "T-wave peak"),
    AnnotationLabel(28, "+", "RHYTHM", "Rhythm change"),
    AnnotationLabel(29, "u", "UWAVE", "U-wave peak"),
    AnnotationLabel(30, "?", "LEARN", "Learning"),
    AnnotationLabel(31, "!", "FLWAV", "Ventricular flutter wave"),
    AnnotationLabel(
        32, "[", "VFON", "Start of ventricular flutter/fibrillation"
    ),
    AnnotationLabel(
        33, "]", "VFOFF", "End of ventricular flutter/fibrillation"
    ),
    AnnotationLabel(34, "e", "AESC", "Atrial escape beat"),
    AnnotationLabel(35, "n", "SVESC", "Supraventricular escape beat"),
    AnnotationLabel(
        36, "@", "LINK", "Link to external data (aux_note contains URL)"
    ),
    AnnotationLabel(37, "x", "NAPC", "Non-conducted P-wave (blocked APB)"),
    AnnotationLabel(38, "f", "PFUS", "Fusion of paced and normal beat"),
    AnnotationLabel(39, "(", "WFON", "Waveform onset"),
    AnnotationLabel(40, ")", "WFOFF", "Waveform end"),
    AnnotationLabel(
        41, "r", "RONT", "R-on-T premature ventricular contraction"
    ),
    # AnnotationLabel(42, None, None, None),
    # AnnotationLabel(43, None, None, None),
    # AnnotationLabel(44, None, None, None),
    # AnnotationLabel(45, None, None, None),
    # AnnotationLabel(46, None, None, None),
    # AnnotationLabel(47, None, None, None),
    # AnnotationLabel(48, None, None, None),
    # AnnotationLabel(49, None, None, None),
]

ann_label_table = pd.DataFrame(
    {
        "label_store": np.array(
            [al.label_store for al in ann_labels], dtype="int"
        ),
        "symbol": [al.symbol for al in ann_labels],
        "description": [al.description for al in ann_labels],
    }
)
ann_label_table.set_index(ann_label_table["label_store"].values, inplace=True)
ann_label_table = ann_label_table[["label_store", "symbol", "description"]]