KINECAL 1.0.2

File: <base>/sway_utils/metrics.py (20,374 bytes)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon May 13 11:46:23 2019

@author: Sean Maudsley-Barton

A set of utilities that help to calculate common sway metrics
"""

import numpy as np
import pandas as pd
import os

import math
import scipy
from scipy import signal
from scipy.spatial import distance
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks

import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
import matplotlib.transforms as transforms

from enum import Enum        

#%%
class SOTTrial(Enum):
    '''
    Enumeration for SOT trial types
    '''
    Eyes_Open_Fixed_Surround_and_Support = 1
    Eyes_Closed_Fixed_Support = 2
    Open_Sway_Referenced_Surrond = 3
    Eyes_Open_Sway_Referenced_Support = 4
    Eyes_Closed_Sway_Referenced_Support = 5
    Eyes_Open_Sway_Referenced_Surround_and_Support = 6


class SwayMetric(Enum):
    ''' 
    Enumeration for sway metric 
    '''
    ALL = 0
    RDIST = 1
    RDIST_ML = 2
    RDIST_AP = 3
    MDIST = 4
    MDIST_ML = 5
    MDIST_AP = 6
    TOTEX = 7
    TOTEX_ML = 8
    TOTEX_AP = 9
    MVELO = 10
    MVELO_ML = 11
    MVELO_AP = 12
    MFREQ = 13
    MFREQ_ML = 14
    MFREQ_AP = 15
    AREA_CE = 16
    FRAC_DIM = 17
    ROMBERG_RATIO = 18
    ROMBERG_RATIO_FOAM = 19
    
    
class DeviceType(Enum):
    ''' 
    Enumeration for sway metric 
    '''
    BALANCE_MASTER = 1
    KINECT = 2

class SwayGroup(Enum):
    '''
    Enumeration for sway groups
    '''
    All = 0
    All_healthy = 1
    All_fallers = 2
    
    Young = 3
    Middle = 4
    Old = 5
    
    Faller_by_history = 6
    Faller_by_history_single = 7
    Faller_by_history_muliple = 8
    Faller_by_miniBEStest = 9
    
    Young_vs_old = 10
    Old_vs_all_fallers = 11
    Old_vs_single_fallers = 12
    Young_vs_all_fallers = 13
    Young_and_Middle = 14
    

SPINEBASE = 0
SPINEMID = 1
NECK = 2
HEAD = 3
SHOULDERLEFT = 4
ELBOWLEFT = 5
WRISTLEFT = 6
HANDLEFT = 7
SHOULDERRIGHT = 8
ELBOWRIGHT = 9
WRISTRIGHT = 10
HANDRIGHT = 11
HIPLEFT = 12
KNEELEFT = 13
ANKLELEFT = 14
FOOTLEFT = 15
HIPRIGHT = 16
KNEERIGHT = 17
ANKLERIGHT = 18
FOOTRIGHT = 19
SPINESHOULDER = 20
HANDTIPLEFT = 21
THUMBLEFT = 22
HANDTIPRIGHT = 23
THUMBRIGHT = 24

X = 0
Y = 1
Z = 2

#%%
def normalise_skeleton(skel_frame, spine_base_joint):
    normalised_skel_frame = np.copy(skel_frame)
    # x = 0
    # y = 1
    # z = 2

    for i in range(skel_frame.shape[0]):
        normalised_skel_frame[i][2] =  str(100*(float(skel_frame[i][2]) - spine_base_joint[X]))
        normalised_skel_frame[i][3] =  str(100*(float(skel_frame[i][3]) - spine_base_joint[Y]))
        normalised_skel_frame[i][4] =  str(100*(float(skel_frame[i][4]) - spine_base_joint[Z]))

    return normalised_skel_frame


def calculate_com(skelFrame):
    _X = 2
    _Y = 3
    _Z = 4

    spine_shoulder = np.stack([float(skelFrame[SPINESHOULDER, _X]), float(skelFrame[SPINESHOULDER, _Y]), float(skelFrame[SPINESHOULDER, _Z])], axis=0)
    spine_base = np.stack([float(skelFrame[SPINEBASE, _X]), float(skelFrame[SPINEBASE, _Y]), float(skelFrame[SPINEBASE, _Z])], axis=0)
    spine_mid = np.stack([float(skelFrame[SPINEMID, _X]), float(skelFrame[SPINEMID, _Y]), float(skelFrame[SPINEMID, _Z])], axis=0)
    hip_left = np.stack([float(skelFrame[HIPLEFT, _X,]), float(skelFrame[HIPLEFT, _Y,]), float(skelFrame[HIPLEFT, _Z])], axis=0)
    hip_right = np.stack([float(skelFrame[HIPRIGHT, _X]), float(skelFrame[HIPRIGHT, _Y]), float(skelFrame[HIPRIGHT, _Z])], axis=0)
    
    xMean = np.mean([spine_mid[0],hip_left[0],hip_right[0]])
    yMean = np.mean([spine_mid[1],hip_left[1],hip_right[1]])
    zMean = np.mean([spine_mid[2],hip_left[2],hip_right[2]])
    
    com = np.stack([xMean,yMean,zMean],axis=0)

    return com.tolist()


def euclidean_distance_between_joints(j1, j2):
    ed = np.sqrt(np.square(j1[X] - j2[X]) + np.square(j1[Y] - j2[Y]) + np.square(j1[Z] - j2[Z]))

    return ed


def cosine_distance_between_joints(j1, j2):
    cd = distance.cosine(j1, j2)
    
    return cd


def normalised_magnitude_between_joints(j, j_ref):
    mag_j = np.linalg.norm(j)
    mag_j_ref = np.linalg.norm(j_ref)
    
    nn = mag_j / mag_j_ref
    
    return nn


def get_joint_XYZ(skel_frame_row):
    x = float(skel_frame_row[2])
    y = float(skel_frame_row[3])
    z = float(skel_frame_row[4])

    return [x, y, z]


def eulidean_distance_2d(j1, j2):
    ed = np.sqrt(np.square(j1[0] - j2[0]) + np.square(j1[1] - j2[1]))
    return ed

def get_angle(j1, j2, j3):
    ba = j1 - j2
    bc = j3 - j2

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    rad_angle = np.arccos(cosine_angle)
    deg_angle = np.degrees(rad_angle) 

    return deg_angle


def get_AP_angle_between_three_joints_matrix(j1, j2, j3):
    
    l1 =  np.sqrt(np.square(j1[2] - j2[2]))
    l2 =  np.sqrt(np.square(j2[2] - j3[2]))
    
    rad_between_three_joints = np.arccos(l2/l1)
    deg_between_three_joints = np.degrees(rad_between_three_joints) 
    
    return deg_between_three_joints

def get_2d_angle_between_three_joints(j1, j2, j3, plane='SP', deg=True):
    #here
    #AP = [Z, Y]
    #ML = [X, Y]
    
    if plane == 'SP':
        co_plane = [Z, Y] # SI, AP | Sagital Plane
    elif plane == 'FP':
        co_plane = [X, Y] # [X, Y] # ML, SI | Frontal Plane
    elif plane == 'TP':
        co_plane = [X, Z] # [X, Z] # ML, AP | Transverse Plane
    
    a = eulidean_distance_2d([j1[co_plane[0]], j1[co_plane[1]]], [j2[co_plane[0]], j2[co_plane[1]]])
    b = eulidean_distance_2d([j2[co_plane[0]], j2[co_plane[1]]], [j3[co_plane[0]], j3[co_plane[1]]]) #j2, j3)
    c = eulidean_distance_2d([j3[co_plane[0]], j3[co_plane[1]]], [j1[co_plane[0]], j1[co_plane[1]]]) #j3, j1)
    
    angle_C = np.arccos((a**2 + b**2 - c**2) / (2 * a * b))
    
    if deg:
        angle_C = np.degrees(angle_C)
    
    return angle_C


def get_3d_angle_between_three_joints(j1, j2, j3, deg=True):
    SP_angle = get_2d_angle_between_three_joints(j1, j2, j3, plane='SP', deg=deg)
    FP_angle = get_2d_angle_between_three_joints(j1, j2, j3, plane='FP', deg=deg)
    TP_angle = get_2d_angle_between_three_joints(j1, j2, j3, plane='TP', deg=deg)
    
    #eular_angle_3d = np.hstack([x_YZ_euler, y_XZ_euler, z_XY_euler])
    SP_FP_TP_angle_3d = np.hstack([SP_angle, FP_angle, TP_angle])
    return SP_FP_TP_angle_3d

#%%
def get_unique_values(full_list):
    '''
    Gets unique values for a list, but preserves order, unlike numpy.unnique
    '''
    
    unnique_list = []
    
    for item in full_list:
        if item not in unnique_list:
            unnique_list.append(item)
            
    return unnique_list


def eigsorted(cov):
    '''
    Eigenvalues and eigenvectors of the covariance matrix.
    '''
    vals, vecs = np.linalg.eigh(cov)
    order = vals.argsort()[::-1]
    return vals[order], vecs[:, order]

def confidence_ellipse(x, y, ax, n_std=1.96, facecolor='none', return_ellipse=True, **kwargs):
    '''
    Create a plot of the covariance confidence ellipse of `x` and `y`

    Parameters
    ----------
    x, y : array_like, shape (n, )
        Input data.
    ax : matplotlib.axes.Axes
        The axes object to draw the ellipse into.
    n_std : float
        The number of standard deviations to determine the ellipse's radiuses.
        1.96 SD = 95% confidence ellipse
    Returns
    -------
    matplotlib.patches.Ellipse
    Other parameters
    ----------------
    kwargs : `~matplotlib.patches.Patch` properties
    '''
    if x.size != y.size:
         raise ValueError("x and y must be the same size")

    cov = np.cov(x, y)
    
    #eigvals, eigvecs = eigsorted(cov)
    eigvals, eigvecs = np.linalg.eigh(cov)
    
    angle = np.degrees(np.arctan2(*eigvecs[:, 0][::-1]))
    eigvals = [
        [eigvals[0], 0],
        [0, eigvals[1]]
        ]
    axes = 2.4478 * np.sqrt(scipy.linalg.svdvals(eigvals))
    height = axes[0] * 2
    width = axes[1] * 2
    area = np.pi * np.prod(axes)                           

    ellipse = Ellipse(xy=(np.mean(x), np.mean(y)), width=width, height=height,
                      angle=angle, facecolor=facecolor, **kwargs)
    
    if return_ellipse == True:    
        return ax.add_patch(ellipse), height, width, area, angle
    else:
        return height, width, area, angle
    

def filter_signal(ML_path, AP_path=[], CC_path=np.array([]), N=2, fc=10, fs=30):
    '''
    N = order of the filer - usually 1, 2 or 4
    fc = Cut-off frequency of the filter - usually 10 or 6 
    fs = 30
    Wn = fc / (fs / 2) # Normalize the frequency
    '''
    
    Wn = np.pi * fc / (2 * fs) # Normalize the frequency
    
    #b, a = signal.butter(N, Wn, btype='low', fs=fs)
    b, a = signal.butter(N, Wn, btype='low')
    filtered_ML_path = signal.filtfilt(b, a, ML_path)    
    
    if len(AP_path) != 0:
        filtered_AP_path = signal.filtfilt(b, a, AP_path)
    
    if len(CC_path) != 0:
        filtered_CC_path = signal.filtfilt(b, a, CC_path)

    if len(CC_path) != 0:
        return filtered_ML_path, filtered_AP_path, filtered_CC_path
    if len(AP_path) != 0:
        return filtered_ML_path, filtered_AP_path
    else:
        return filtered_ML_path


def calculate_RD(selected_recording,
               deviceType = DeviceType.KINECT,
               rd_path = '',
               part_id = '',
               SOT_trial_type = ''):
    ''' 
        Calulate resultant distance, 
        that is the distance from each point on the raw CoM path to the 
        mean of the time series ad displays RD path and sway area 
        
        Saves AREA_CE image if rd_parh is filled in
        
    '''

    if deviceType == DeviceType.KINECT:
        ML_path = selected_recording['CoGx'].values.astype(float)
        AP_path = selected_recording['CoGz'].values.astype(float)
        ML_path, AP_path = filter_signal(ML_path, AP_path, fc=8)
        
    elif deviceType == DeviceType.BALANCE_MASTER:
        ML_path = selected_recording['CoGx'].values.astype(float)
        AP_path = selected_recording['CoGy'].values.astype(float)

    mean_ML = np.mean(ML_path)
    mean_AP = np.mean(AP_path)

    ML =  np.abs(np.subtract(ML_path, mean_ML))
    AP =  np.abs(np.subtract(AP_path, mean_AP))
    #ML = np.sqrt(np.square(np.subtract(ML_path, mean_ML)))
    #AP = np.sqrt(np.square(np.subtract(AP_path, mean_AP)))
    
    #get hypotinuse of ML_RD and AP_RD
    RD = np.sqrt(np.add(np.square(ML),np.square(AP)))

    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.set_xlim([-8, 8])
    ax.set_ylim([-8, 8])
    
    elp, width, height, AREA_CE = confidence_ellipse(ML_path, AP_path, ax, n_std=1.96, edgecolor='red')
    ax.scatter(ML_path, AP_path, s=3)
    
    AREA_CE = round(AREA_CE, 2)
    
    ax.set_title(str.replace(SOT_trial_type, '-', ' ') + ' ' + part_id + ' CoM 95% CE' + 
                  '\nW:' + str(round(width, 2)) + ' cm' + 
                  ' x ' + 
                  'H:' + str(round(height, 2)) + ' cm' + 
                  ' Ar:' + str(AREA_CE) + ' cm sq')
    
    ax.set_aspect('equal')

    if rd_path != '':
        plt.savefig(rd_path)
    
    plt.show()
        
    fig = plt.figure()
    ax = fig.add_subplot(111)

    elp, width, height, AREA_CE = confidence_ellipse(ML_path, AP_path, ax, n_std=1.96, edgecolor='red')
    ax.scatter(ML_path, AP_path, s=3)
    
    AREA_CE = round(AREA_CE, 2)
    ax.set_title(str.replace(SOT_trial_type, '-', ' ') + ' ' + part_id + ' CoM 95% CE' + 
                 '\nW:' + str(round(width, 2)) + ' cm' + 
                 ' x ' + 
                 'H:' + str(round(height, 2)) + ' cm' + 
                 ' Ar:' + str(AREA_CE) + ' cm sq')
    
    ax.set_aspect('equal')
    
    if rd_path != '':
        detailed_rd_path = rd_path.replace('confidence_ellipse', 'confidence_ellipse_detailed')
        plt.savefig(detailed_rd_path)

    plt.show()

    return ML, AP, RD, AREA_CE


def calculate_RD_3D(selected_recording,
                    deviceType = DeviceType.KINECT,):
    ''' Calulate resultant distance '''

    if deviceType == DeviceType.KINECT:
        ML_path = selected_recording['CoGx'].values.astype(float)
        AP_path = selected_recording['CoGz'].values.astype(float)
        UD_path = selected_recording['CoGy'].values.astype(float)
        ML_path, AP_path = filter_signal(ML_path, AP_path)
    elif deviceType == DeviceType.BALANCE_MASTER:
        ML_path = selected_recording['CoGx'].values.astype(float)
        AP_path = selected_recording['CoGy'].values.astype(float)

    mean_ML = np.mean(ML_path)
    mean_AP = np.mean(AP_path)
    mean_UD = np.mean(UD_path)

    ML_RD = np.sqrt(np.square(np.subtract(ML_path, mean_ML)))
    AP_RD = np.sqrt(np.square(np.subtract(AP_path, mean_AP)))
    UD_RD = np.sqrt(np.square(np.subtract(UD_path, mean_UD)))
    RD = np.sqrt(np.square(np.subtract(ML_path, mean_ML)) + np.square(np.subtract(AP_path, mean_AP)) + np.square(np.subtract(UD_path, mean_UD)))

    return ML_RD, AP_RD, UD_RD, RD


def calculate_TOTEX(ML, AP):

    arr_ML_diff = []
    arr_AP_diff = []
    arr_tot_diff = []

    ML_TOTEX = 0
    AP_TOTEX = 0
    TOTEX = 0

    for step in range(1, len(AP)):
        ML_curr = ML[step]
        AP_curr = AP[step]

        ML_prev = ML[step - 1]
        AP_prev = AP[step - 1]
        
        ML_diff = abs(ML_curr - ML_prev)
        AP_diff = abs(AP_curr - AP_prev)
        #ML_diff = np.sqrt(np.square(ML_curr - ML_prev))
        #AP_diff = np.sqrt(np.square(AP_curr - AP_prev))

        arr_ML_diff.append(ML_diff)
        arr_AP_diff.append(AP_diff)
        
        # get hypotinuse of ML and AP diff
        RD_diff = np.sqrt(np.add(np.square(ML_diff), np.square(AP_diff)))
        arr_tot_diff.append(RD_diff)

    ML_TOTEX = np.sum(arr_ML_diff)
    AP_TOTEX = np.sum(arr_AP_diff)
    TOTEX = np.sum(arr_tot_diff)
    
    N = len(AP)
    d = max(arr_tot_diff)
    FD = np.log(N) / np.log((N * d) / TOTEX)

    return ML_TOTEX, AP_TOTEX, TOTEX, FD
    

def calculate_sway_from_recording(selected_recording,
                                  selected_recording_name,
                                  pID,
                                  age,
                                  sex,
                                  SOT_trial_type,
                                  tNum,
                                  swayMetric = SwayMetric.ALL,
                                  deviceType = DeviceType.KINECT,
                                  impairment_self = 'healthy',
                                  impairment_confedence = 'healthy',
                                  impairment_clinical = 'healthy',
                                  impairment_stats = 'healthy',
                                  dp = -1,
                                  rd_path = '',
                                  start = 0,
                                  end = 600):
    
    '''
    Calculates sway from Kinect or Balance master recordings
    
    '''
    
    cliped_recording = selected_recording[start : end]

    ML, AP, RD, AREA_CE = calculate_RD(cliped_recording, deviceType, rd_path, pID, SOT_trial_type)
    recording_length = len(RD)

    #--mean DIST and rms DIST
    MDIST_ML = np.sum(ML) / recording_length
    MDIST_AP = np.sum(AP) / recording_length
    MDIST = np.sum(RD) / recording_length

    RDIST_ML = np.sqrt(np.sum(np.square(ML) / recording_length))
    RDIST_AP = np.sqrt(np.sum(np.square(AP) / recording_length))
    RDIST = np.sqrt(np.sum(np.square(RD) / recording_length))

    #--Total Excursion - TOTEX
    TOTEX_ML, TOTEX_AP, TOTEX, FD = calculate_TOTEX(ML, AP)

    #--Mean Velocity - MVELO
    if deviceType == DeviceType.KINECT:
        T = recording_length / 30
    elif  deviceType == DeviceType.BALANCE_MASTER:
        T = recording_length / 100

    MVELO_ML = TOTEX_ML / T
    MVELO_AP = TOTEX_AP / T
    MVELO = TOTEX / T

    #--Mean Fequency - MFREQ
    MFREQ_ML = MVELO_ML / (4*(np.sqrt(2 * MDIST_ML)))
    MFREQ_AP = MVELO_AP / (4*(np.sqrt(2 * MDIST_AP)))
    MFREQ = MVELO / (2 * np.pi * MDIST)


    #TOTEX MVELO RDIST
    if swayMetric == SwayMetric.RDIST:
        swayVal = RDIST
    elif swayMetric == SwayMetric.RDIST_ML:
        swayVal = RDIST_ML
    elif swayMetric == SwayMetric.RDIST_AP:
        swayVal = RDIST_AP

    elif swayMetric == SwayMetric.MDIST:
        swayVal = MDIST
    elif swayMetric == SwayMetric.MDIST_ML:
        swayVal = MDIST_ML
    elif swayMetric == SwayMetric.MDIST_AP:
        swayVal = MDIST_AP

    elif swayMetric == SwayMetric.TOTEX:
        swayVal = TOTEX
    elif swayMetric == SwayMetric.TOTEX_ML:
        swayVal = TOTEX_ML
    elif swayMetric == SwayMetric.TOTEX_AP:
        swayVal = TOTEX_AP

    elif swayMetric == SwayMetric.MVELO:
        swayVal = MVELO
    elif swayMetric == SwayMetric.MVELO_ML:
        swayVal = MVELO_ML
    elif swayMetric == SwayMetric.MVELO_AP:
        swayVal = MVELO_AP

    elif swayMetric == SwayMetric.MFREQ:
        swayVal = MFREQ
    elif swayMetric == SwayMetric.MFREQ_ML:
        swayVal = MFREQ_ML
    elif swayMetric == SwayMetric.MVELO_AP:
        swayVal = MFREQ_AP

    elif swayMetric == SwayMetric.AREA_CE:
        swayVal = AREA_CE
        
    elif swayMetric == SwayMetric.FRAC_DIM:
        swayVal = FD

    tmpSway = []

    if dp != -1:
        swayVal = round(swayVal, dp)
    
    if swayMetric == SwayMetric.ALL:
        tmpSway.append([pID, selected_recording_name, tNum, age, sex, 
                        impairment_self, impairment_confedence, 
                        impairment_clinical, impairment_stats,
                        swayMetric.name,
                        RDIST_ML, RDIST_AP, RDIST,
                        MDIST_ML, MDIST_AP, MDIST,
                        TOTEX_ML, TOTEX_AP, TOTEX,
                        MVELO_ML, MVELO_AP, MVELO,
                        MFREQ_ML, MFREQ_AP, MFREQ,
                        AREA_CE])
    else:
        tmpSway.append([pID, selected_recording_name, tNum, age, sex, 
                        impairment_self, impairment_confedence, 
                        impairment_clinical, impairment_stats,
                        swayMetric.name,
                        swayVal])


    return tmpSway

#%% Balance master Utils
def load_balance_master_file(rootDir,
                             participantID,
                             age,
                             kinectTrialType,
                             trialNumber = 1,
                             swayMetric = SwayMetric.RDIST):

    root = ''
    dirs = []
    columns = ''
    arrayOfRows = []
    dfFinal = pd.DataFrame(arrayOfRows)

    for root, dirs, _ in os.walk(rootDir):
        break

    dirs.sort()

    selected_trial = ''
    for dirName in dirs:
        if participantID in dirName:
            print(dirName)
            rootFilePath = os.path.join(root, dirName, 'cm')

            trialRoot = ''
            trialfiles = []
            for trialRoot, _, trialfiles in os.walk(rootFilePath):
                break

            found = False
            for trial in trialfiles:
                #if 'SOT' in trial and str('T'+ str(kinectTrialType.value)) in trial:
                if 'SOT' in trial and str('C'+ str(kinectTrialType.value)) in trial and str('T'+ str(trialNumber)) in trial:
                    print('Collating:', trialRoot + '/' + trial)
                    selected_trial = trial
                    trialFilePath = os.path.join(trialRoot, trial)

                    bMTrial = pd.read_csv(trialFilePath
                                          , sep="\t")


                    i = 0

                    for row in bMTrial[25:].iterrows():
                        parsedRow = row[1][0]
                        arrRow = parsedRow.split('\t')
                        strArrRow = str.split(arrRow[0])

                        if i == 0:
                            #columns = strArrRow
                            columns = ['DP', 'LF', 'RR', 'SH', 'LR', 'RF', 'CoFx', 'CoFy', 'CoGx', 'CoGy']
                        else:
                            arrayOfRows.append(strArrRow)

                        i += 1

                    found = True
                    break

            if found:
                dfFinal = pd.DataFrame(arrayOfRows, columns=columns)
            else:
                print('can not find ', trialRoot + '/' + trial)

            #break


    return dfFinal, selected_trial