-
Rhino authored
Sclaed X data, and used a little chat gpt adjusted version of nseted cross validation. Didnt see if it all makes sense, but it seems okay. Can not for some reason get the sklearnex thing to work
Rhino authoredSclaed X data, and used a little chat gpt adjusted version of nseted cross validation. Didnt see if it all makes sense, but it seems okay. Can not for some reason get the sklearnex thing to work
script_grid_search.py 21.36 KiB
# Basics
import numpy as np
import os
import math
# Plotting
import matplotlib.pyplot as plt
import ipympl
# Data processing
import pandas as pd
import awkward as ak
# ML
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.svm import SVC
# Misc
from pathlib import Path
import pyarrow as pa
import urllib.request
from itertools import chain, combinations
# Imports the user score information
user_data = pd.read_csv(r"data/scores_WtG_PrePost.csv", delimiter=",", usecols=["User", "Pre score", "Post score", "Difference", "Group cat"])
# Defines my directory with the user data
user_dir = r'data/with ET'
# Filters and drops non-relevant users
to_drop = []
for i, cat in enumerate(user_data["Group cat"]):
if math.isnan(cat):
to_drop.append(i)
user_data = user_data.drop(to_drop)
user_data = user_data.reset_index()
# Filters and drops users with no directory
not_existing_names = []
for i, user in enumerate(user_data["User"]):
if not os.path.isdir(user_dir + '/' + user):
not_existing_names.append(i)
user_data = user_data.drop(not_existing_names)
user_data = user_data.reset_index()
# Convert to awkward array
array_user = ak.zip(dict(user_data))
# Creates dictionary with all the files for one user
file_names = {}
for user in user_data["User"]:
#print(user)
available_files = []
available_files_temp = os.listdir(user_dir + '/' + user)
for file in available_files_temp:
if "graph01-ET_planning" in file:
available_files.append(file)
# print(available_files)
file_names[user] = available_files
#file_names
# Read each CSV file for one user, stored for each attempt
df_attempt1 = []
df_attempt2 = []
attempt2_mask = []
for user in user_data['User']:
files = file_names[user]
if len(files) == 2:
attempt2_mask.append(True)
df_attempt1.append(pd.read_csv(user_dir + '/' + user + '/' + files[0], delimiter=" ", usecols=["eyeDataTimestamp", "gazePointAOI_target_x", "gazePointAOI_target_y"]))
df_attempt2.append(pd.read_csv(user_dir + '/' + user + '/' + files[1], delimiter=" ", usecols=["eyeDataTimestamp", "gazePointAOI_target_x", "gazePointAOI_target_y"]))
elif len(files) == 1:
attempt2_mask.append(False)
df_attempt1.append(pd.read_csv(user_dir + '/' + user + '/' + files[0], delimiter=" ", usecols=["eyeDataTimestamp", "gazePointAOI_target_x", "gazePointAOI_target_y"]))
df_attempt2.append(pd.read_csv(user_dir + '/' + user + '/' + files[0], delimiter=" ", usecols=["eyeDataTimestamp", "gazePointAOI_target_x", "gazePointAOI_target_y"]))
# Add delta t list
for attempt in [df_attempt1, df_attempt2]:
for i in range(len(attempt)):
temp_delta_t_list = []
for j in range(len(attempt[i]["eyeDataTimestamp"]) - 1):
temp_delta_t_list.append(attempt[i]["eyeDataTimestamp"][j+1] - attempt[i]["eyeDataTimestamp"][j])
temp_delta_t_list.append(np.mean(temp_delta_t_list))
attempt[i]["deltaTimestamp"] = temp_delta_t_list
# Convert df_attempts to ak.Array
array_attempt1 = []
array_attempt2 = []
for df in df_attempt1:
array_attempt1.append(ak.Array(dict(df)))
for df in df_attempt2:
array_attempt2.append(ak.Array(dict(df)))
# Adds a list of arrays in a new column to an array
def add_column(ak_array, arrays, col_name):
combined_entries = [
{**{k: ak_array[k][i] for k in ak_array.fields}, col_name: array} for i, (entry, array) in enumerate(zip(ak_array, arrays))
]
return ak.Array(combined_entries)
# Creates arrays with labels for all 3 categories
def create_labels(array_user):
labels_str = []
labels_int_expert = []
labels_int_good = []
labels_int_bad = []
for subject_name, pre_score, diff in zip(array_user["User"], array_user["Pre score"], array_user["Difference"]):
if pre_score == 2 and diff == 0:
label_str = "Expert"
label_int_expert = 1
label_int_good = 0
label_int_bad = 0
elif diff <= 0:
label_str = "Bad"
label_int_expert = 0
label_int_good = 0
label_int_bad = 1
else:
label_str = "Good"
label_int_expert = 0
label_int_good = 1
label_int_bad = 0
labels_str.append(label_str)
labels_int_expert.append(label_int_expert)
labels_int_good.append(label_int_good)
labels_int_bad.append(label_int_bad)
labels_str = ak.Array(labels_str)
labels_int_expert = ak.Array(labels_int_expert)
labels_int_good = ak.Array(labels_int_good)
labels_int_bad = ak.Array(labels_int_bad)
return labels_str, labels_int_expert, labels_int_good, labels_int_bad
labels_str, labels_int_expert, labels_int_good, labels_int_bad = create_labels(array_user)
# Creates array with first and second attempts added
array_data = add_column(array_user, array_attempt1, 'Attempt1')
array_data = add_column(array_data, array_attempt2, 'Attempt2')
array_data["Attempt 2 Mask"] = ak.Array(attempt2_mask)
array_data['Labels Str'] = labels_str
array_data['Labels Expert'] = labels_int_expert
array_data['Labels Good'] = labels_int_good
array_data['Labels Bad'] = labels_int_bad
def minmax(data):
"""Get the min and max of an iterable in O(n) time and constant space."""
minValue = data[0]
maxValue = data[0]
for d in data[1:]:
minValue = d if d < minValue else minValue
maxValue = d if d > maxValue else maxValue
return (minValue,maxValue)
# Get Range of field of view
def get_minmax(array_data):
min_max_x = []
min_max_y = []
for i, user in enumerate(array_data["User"]):
min_x, max_x = minmax(array_data["Attempt1"][i]["gazePointAOI_target_x"])
min_y, max_y = minmax(array_data["Attempt1"][i]["gazePointAOI_target_y"])
min_max_x.extend([min_x, max_x])
min_max_y.extend([min_y, max_y])
if array_data["Attempt 2 Mask"][i]:
min_x, max_x = minmax(array_data["Attempt2"][i]["gazePointAOI_target_x"])
min_y, max_y = minmax(array_data["Attempt2"][i]["gazePointAOI_target_y"])
min_max_x.extend([min_x, max_x])
min_max_y.extend([min_y, max_y])
min_x, max_x = minmax(min_max_x)
min_y, max_y = minmax(min_max_y)
return min_x, max_x, min_y, max_y
min_x, max_x, min_y, max_y = get_minmax(array_data)
number_of_y_aios = 10
number_of_x_aios = 15
y_mesh = np.linspace(min_y, max_y, number_of_y_aios, endpoint=True)
x_mesh = np.linspace(min_x, max_x, number_of_x_aios, endpoint=True)
def get_aoi(x_coordinate, y_coordinate):
"""given the x and y, find the aoi; return two indexes corresponding to the x and y position of the aoi"""
x_index = np.argmin(np.abs(x_coordinate - x_mesh))
if x_coordinate - x_mesh[x_index] > 0:
x_index += 1
y_index = np.argmin(np.abs(y_coordinate - y_mesh))
if y_coordinate - y_mesh[y_index] > 0:
y_index += 1
return x_index, y_index
windows = []
# windows_labels_int_expert = []
# windows_labels_int_good = []
# windows_labels_int_bad = []
windows_labels_int = {"Expert": [], "Good": [], "Bad":[], "Str": []}
windows_delta_t = []
for i, user in enumerate(array_data["User"]):
for j in range(int((len(array_data["Attempt1"][i]["gazePointAOI_target_x"])-21)/14)):
try:
windows.append({"gazePointAOI_target_x": array_data["Attempt1"][i]["gazePointAOI_target_x"][j*14:(j*14 + 21)], "gazePointAOI_target_y": array_data["Attempt1"][i]["gazePointAOI_target_y"][j*14:(j*14 + 21)]})
# windows_labels_int_expert.append(array_data["Labels Expert"][i])
# windows_labels_int_good.append(array_data["Labels Good"][i])
# windows_labels_int_bad.append(array_data["Labels Bad"][i])
windows_labels_int["Expert"].append(array_data["Labels Expert"][i])
windows_labels_int["Good"].append(array_data["Labels Good"][i])
windows_labels_int["Bad"].append(array_data["Labels Bad"][i])
windows_delta_t.append(array_data["Attempt1"][i]["deltaTimestamp"][j*14:(j*14 + 21)])
windows_labels_int["Str"].append(array_data["Labels Str"][i])
except IndexError:
windows.append({"gazePointAOI_target_x": array_data["Attempt1"][i]["gazePointAOI_target_x"][-21:], "gazePointAOI_target_y": array_data["Attempt1"][i]["gazePointAOI_target_y"][-21:]})
# windows_labels_int_expert.append(array_data["Labels Expert"][i])
# windows_labels_int_good.append(array_data["Labels Good"][i])
# windows_labels_int_bad.append(array_data["Labels Bad"][i])
windows_labels_int["Expert"].append(array_data["Labels Expert"][i])
windows_labels_int["Good"].append(array_data["Labels Good"][i])
windows_labels_int["Bad"].append(array_data["Labels Bad"][i])
windows_delta_t.append(array_data["Attempt1"][i]["deltaTimestamp"][-21:])
windows_labels_int["Str"].append(array_data["Labels Str"][i])
if array_data["Attempt 2 Mask"][i]:
try:
windows.append({"gazePointAOI_target_x": array_data["Attempt2"][i]["gazePointAOI_target_x"][j*14:(j*14 + 21)], "gazePointAOI_target_y": array_data["Attempt2"][i]["gazePointAOI_target_y"][j*14:(j*14 + 21)]})
# windows_labels_int_expert.append(array_data["Labels Expert"][i])
# windows_labels_int_good.append(array_data["Labels Good"][i])
# windows_labels_int_bad.append(array_data["Labels Bad"][i])
windows_labels_int["Expert"].append(array_data["Labels Expert"][i])
windows_labels_int["Good"].append(array_data["Labels Good"][i])
windows_labels_int["Bad"].append(array_data["Labels Bad"][i])
windows_delta_t.append(array_data["Attempt2"][i]["deltaTimestamp"][j*14:(j*14 + 21)])
windows_labels_int["Str"].append(array_data["Labels Str"][i])
except IndexError:
windows.append({"gazePointAOI_target_x": array_data["Attempt2"][i]["gazePointAOI_target_x"][-21:], "gazePointAOI_target_y": array_data["Attempt2"][i]["gazePointAOI_target_y"][-21:]})
# windows_labels_int_expert.append(array_data["Labels Expert"][i])
# windows_labels_int_good.append(array_data["Labels Good"][i])
# windows_labels_int_bad.append(array_data["Labels Bad"][i])
windows_labels_int["Expert"].append(array_data["Labels Expert"][i])
windows_labels_int["Good"].append(array_data["Labels Good"][i])
windows_labels_int["Bad"].append(array_data["Labels Bad"][i])
windows_delta_t.append(array_data["Attempt2"][i]["deltaTimestamp"][-21:])
windows_labels_int["Str"].append(array_data["Labels Str"][i])
continue
windows_dict = {"GazePoints": ak.Array(windows), "Labels": ak.Array(windows_labels_int), "DeltaTimestamps": ak.Array(windows_delta_t)}
array_windows = ak.Array(windows_dict)
# add metrics to array_windows
windowsAOI = []
#array with coordinates of AOIs
AOIs = [0] * 150
i = 0
j = 0
for x in range(0,len(AOIs)):
AOIs[x] = (int(j), int(i))
i = i + 1
if i == 10:
i = 0
j = j + 1
for i, GazePoints in enumerate(array_windows["GazePoints"]):
#TotalAOIs = np.append(TotalAOIs, AOIs)
GazePointsAOI = [0] * 150
GazePointsAOIinstances = [0] * 150
DwellTime = [0] * 150
averageTime = [0] * 150 #average duration on aoi
averagePoints = [0] * 150 #average number of successive points on aoi
standardDevTime = [0] * 150
standardDevPoints = [0] * 150
for j, xGaze in enumerate(array_windows["GazePoints"][i]["gazePointAOI_target_x"]):
aoi = get_aoi(array_windows["GazePoints"][i]["gazePointAOI_target_x"][j], array_windows["GazePoints"][i]["gazePointAOI_target_y"][j])
x = AOIs.index(aoi)
#calculating mean
if j == 0:
time = array_windows["DeltaTimestamps"][i][j]
points = 1
k = x
elif j == len(array_windows["GazePoints"][i]["gazePointAOI_target_x"])-1:
if aoi == last_aoi:
time = time + array_windows["DeltaTimestamps"][i][j]
points = points + 1
averageTime[x] = (GazePointsAOIinstances[x]*averageTime[x]+time)/(GazePointsAOIinstances[x]+1)
averagePoints[x] = (GazePointsAOIinstances[x]*averagePoints[x]+points)/(GazePointsAOIinstances[x]+1)
GazePointsAOIinstances[x] = GazePointsAOIinstances[x] + 1
else:
averageTime[k] = (GazePointsAOIinstances[k]*averageTime[k]+time)/(GazePointsAOIinstances[k]+1)
averagePoints[k] = (GazePointsAOIinstances[k]*averagePoints[k]+points)/(GazePointsAOIinstances[k]+1)
GazePointsAOIinstances[k] = GazePointsAOIinstances[k] + 1
time = array_windows["DeltaTimestamps"][i][j]
points = 1
averageTime[x] = (GazePointsAOIinstances[x]*averageTime[x]+time)/(GazePointsAOIinstances[x]+1)
averagePoints[x] = (GazePointsAOIinstances[x]*averagePoints[x]+points)/(GazePointsAOIinstances[x]+1)
GazePointsAOIinstances[x] = GazePointsAOIinstances[x] + 1
else: #if index not first and not last: check, if aoi is same as last: if true add together
if aoi == last_aoi:
time = time + array_windows["DeltaTimestamps"][i][j]
points = points + 1
k = x
else: #calculate an incremental mean, by multiplying the previous mean with number of instances and adding new value
averageTime[k] = (GazePointsAOIinstances[k]*averageTime[k]+time)/(GazePointsAOIinstances[k]+1)
averagePoints[k] = (GazePointsAOIinstances[k]*averagePoints[k]+points)/(GazePointsAOIinstances[k]+1)
GazePointsAOIinstances[k] = GazePointsAOIinstances[k] + 1
time = array_windows["DeltaTimestamps"][i][j]
points = 1
k = x
DwellTime[x] = DwellTime[x] + array_windows["DeltaTimestamps"][i][j]
GazePointsAOI[x] = GazePointsAOI[x] + 1
last_aoi = aoi
#calculate standard deviations
for s, xGaze in enumerate(array_windows["GazePoints"][i]["gazePointAOI_target_x"]):
aoi = get_aoi(array_windows["GazePoints"][i]["gazePointAOI_target_x"][s], array_windows["GazePoints"][i]["gazePointAOI_target_y"][s])
x = AOIs.index(aoi)
if s == 0:
time = array_windows["DeltaTimestamps"][i][s]
points = 1
k = x
elif s == len(array_windows["GazePoints"][i]["gazePointAOI_target_x"])-1:
if aoi == latest_aoi:
time = time + array_windows["DeltaTimestamps"][i][s]
points = points + 1
standardDevTime[x] = standardDevTime[x] + (averageTime[x] - time)**2
standardDevPoints[x] = standardDevPoints[x] + (averagePoints[x] - points)**2
else:
standardDevTime[k] = standardDevTime[k] + (averageTime[k] - time)**2
standardDevPoints[k] = standardDevPoints[k] + (averagePoints[k] - points)**2
time = array_windows["DeltaTimestamps"][i][s]
points = 1
standardDevTime[x] = standardDevTime[x] + (averageTime[x] - time)**2
standardDevPoints[x] = standardDevPoints[x] + (averagePoints[x] - points)**2
else: #if index not first and not last: check, if aoi is same as last: if true add together
if aoi == latest_aoi:
time = time + array_windows["DeltaTimestamps"][i][s]
points = points + 1
k = x
else: #calculate an incremental mean, by multiplying the previous mean with number of instances and adding new value
standardDevTime[k] = standardDevTime[k] + (averageTime[k] - time)**2
standardDevPoints[k] = standardDevPoints[k] + (averagePoints[k] - points)**2
time = array_windows["DeltaTimestamps"][i][s]
points = 1
k = x
latest_aoi = aoi
for l in range(0,len(standardDevTime)):
if GazePointsAOIinstances[l] != 0:
standardDevTime[l] = math.sqrt(standardDevTime[l])/GazePointsAOIinstances[l]
standardDevPoints[l] = math.sqrt(standardDevPoints[l])/GazePointsAOIinstances[l]
AOI_dict = {"AOI": AOIs, "TotalDwellTime": DwellTime, "GazePointsAOI": GazePointsAOI,
"Instances": GazePointsAOIinstances, "AvTime": averageTime, "AvPoints": averagePoints,
"StDtime": standardDevTime, "StDpoints": standardDevPoints}
#AOIS = ak.Array(AOI_dict)
windowsAOI.append(AOI_dict)
array_features = ak.from_iter(windowsAOI)
array_windows_feat = add_column(array_windows, array_features, 'Features')
array_windows_data = array_windows_feat
fields_wo_aoi = []
for field in array_windows_data['Features'].fields:
if field != 'AOI':
fields_wo_aoi.append(field)
x_data = array_windows_data['Features'][fields_wo_aoi]
X_data = []
for array in x_data:
temp = []
for field in fields_wo_aoi:
temp.extend(array[field].to_numpy())
X_data.append(temp)
X_data = np.array(X_data)
y_bad = array_windows_data["Labels"]["Bad"]
y_good = array_windows_data["Labels"]["Good"]
y_expert = array_windows_data["Labels"]["Expert"]
y_dict = {"Bad": y_bad, "Good": y_good, "Expert": y_expert}
# Get the accuracy of a classifier
def get_accuracy(clf, X, labels):
return np.sum([pred == label for pred, label in zip(clf.predict(X), labels)])/len(labels)
# Get the accuracy of a classifier with >p probability
def get_accuracy_prob(clf, X, labels, p=0.5):
return np.sum([pred1 >= p for (pred0, pred1), label in zip(clf.predict_proba(X), labels)])/len(labels)
# Define the grid of hyperparameters to search
param_grid = {
'C': [0.1, 1, 10],# 100], # Regularization parameter
'kernel': ['linear', 'poly', 'rbf'],# 'sigmoid'], # Type of kernel function
'gamma': ['scale', 'auto'] # Kernel coefficient
}
#repeated nested cross validation for 5 different random_state_numbers
seedav = []
seeds = [1, 3, 7, 9, 42]
for seed in seeds:
#nested cross validation
#split into 9/10 and 1/10 size arrays
accav = []
#total = len(y_bad)
outer_StratKFold = StratifiedKFold(n_splits=5)
Bad_outer_data = outer_StratKFold.split(X_data, y_bad)
for data in Bad_outer_data:
y_train_bad = y_bad[data[0]]
X_train = X_data[data[0]]
y_test_bad = y_bad[data[1]]
X_test = X_data[data[1]]
'''
X_data_test = reduced_X_data[(i+1)*int((total/10)):,:]
X_data2 = reduced_X_data[:i*int((total/10)),:]
#print(np.shape(X_data1), np.shape(X_data2))
X_split = np.concatenate((X_data1,X_data2),axis=0)
y_bad1 = y_bad[(i+1)*int((total/10)):]
y_bad2 = y_bad[:i*int((total/10))]
y_bad_split = np.concatenate((y_bad1,y_bad2),axis=0)
X_test_data = reduced_X_data[i*int((total/10)):(i+1)*int((total/10)),:]
y_bad_test_data = y_bad[i*int((total/10)):(i+1)*int((total/10))]
#print(np.shape(X_split), np.shape(X_test_data))
X_train, X_test, y_train_bad, y_test_bad = model_selection.train_test_split(X_split, y_bad_split, test_size=0.3, random_state=seed)
#print(np.shape(X_train), np.shape(y_train_bad))
'''
# Define the SVM classifier
SVM = SVC()
# Perform grid search with cross-validation
grid_search = GridSearchCV(estimator=SVM, param_grid=param_grid, cv=5, scoring='accuracy', n_jobs=-1, verbose=3, pre_dispatch=1)
#X_train = X_train[:200,:]
#y_train_bad = y_train_bad[:200]
#print(np.shape(X_train), np.shape(y_train_bad))
grid_search.fit(X_train, y_train_bad)
#grid_search.fit(X_train, y_train_good)
#grid_search.fit(X_train, y_train_expert)
# Print results for each combination of hyperparameters
#print("Results for each combination:")
means = grid_search.cv_results_['mean_test_score']
stds = grid_search.cv_results_['std_test_score']
params = grid_search.cv_results_['params']
for mean, std, param in zip(means, stds, params):
print(f"Accuracy: {mean:.3f} (±{std:.3f}) for {param}")
# Get the best hyperparameters and the corresponding accuracy
best_params = grid_search.best_params_
best_score = grid_search.best_score_
print("Best Hyperparameters:", best_params)
print("Best Accuracy:", best_score)
# Evaluate the model on the test set using the best hyperparameters
best_model = grid_search.best_estimator_
test_accuracy = best_model.score(X_test, y_test_bad)
print("Test m-Accuracy:", test_accuracy)
#use full data set to train on best hyperparameters
best_clf = SVC(C = best_params["C"], kernel = best_params["kernel"], gamma = best_params["gamma"])
best_clf.fit(X_train, y_train_bad)
acc = get_accuracy(best_clf, X_test, y_test_bad)
print(f"n-Accuracy {acc}")
accav = np.append(accav,acc)
meanacc = np.mean(accav)
seedav = np.append(seedav, meanacc)
rnCV = np.mean(seedav)
print(rnCV)