boosted/Perform-Boosting-Experiments-XGBoost.ipynb
# built-in dependencies
import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import statistics
# 3rd party dependencies
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.datasets import fetch_lfw_pairs
from deepface import DeepFace
import lightgbm as lgb
import xgboost
from xgboost import plot_importance
from sklearn.model_selection import KFold
from tqdm import tqdm
seed = 17
detector_backend = "retinaface"
more_train = False # append some of validation data into train
print(f"This experiment is done with pip package of deepface with {DeepFace.__version__} version")
# all configuration alternatives for 4 dimensions of arguments
alignment = [True]
models = ["Facenet", "Facenet512", "VGG-Face", "ArcFace", "Dlib"]
detectors = ["retinaface"]
metrics = ["euclidean_l2"]
expand_percentage = 0
target_paths = [
"lfwe",
"lfwe/test",
"lfwe/train",
"lfwe/10_folds",
"dataset",
"outputs",
"outputs/test",
"outputs/train",
"outputs/10_folds",
"results"
]
for target_path in target_paths:
if os.path.exists(target_path) != True:
os.mkdir(target_path)
print(f"{target_path} is just created")
def retrieve_lfe(task: str):
if task == "test":
instances = 1000
elif task == "train":
instances = 2200
elif task == "10_folds":
instances = 6000
else:
raise ValueError(f"unimplemented task - {task}")
pairs_touch = f"outputs/{task}_lfwe.txt"
target_path = f"dataset/{task}_lfw.npy"
labels_path = f"dataset/{task}_labels.npy"
if os.path.exists(target_path) != True:
fetched_lfw_pairs = fetch_lfw_pairs(
subset = task,
color = True,
# memory allocation problem occurs for validation set
resize = 2 if task != "10_folds" else 1,
funneled = False,
slice_=None,
)
print("fetched")
pairs = fetched_lfw_pairs.pairs
labels = fetched_lfw_pairs.target
# target_names = fetched_lfw_pairs.target_names
np.save(target_path, pairs)
np.save(labels_path, labels)
else:
if os.path.exists(pairs_touch) != True:
# loading pairs takes some time. but if we extract these pairs as image, no need to load it anymore
pairs = np.load(target_path)
labels = np.load(labels_path)
# store to file system
for i in tqdm(range(0, instances)):
img1_target = f"lfwe/{task}/{i}_1.jpg"
img2_target = f"lfwe/{task}/{i}_2.jpg"
if os.path.exists(img1_target) != True:
img1 = pairs[i][0]
# plt.imsave(img1_target, img1/255) #works for my mac
plt.imsave(img1_target, img1) #works for my debian
if os.path.exists(img2_target) != True:
img2 = pairs[i][1]
# plt.imsave(img2_target, img2/255) #works for my mac
plt.imsave(img2_target, img2) #works for my debian
if os.path.exists(pairs_touch) != True:
open(pairs_touch,'a').close()
retrieve_lfe(task = "test")
retrieve_lfe(task = "train")
retrieve_lfe(task = "10_folds")
def perform_experiments():
for model_name in models:
for detector_backend in detectors:
for distance_metric in metrics:
for align in alignment:
if detector_backend == "skip" and align is True:
# Alignment is not possible for a skipped detector configuration
continue
calculate_distances(
model_name=model_name,
detector_backend=detector_backend,
distance_metric=distance_metric,
align=align,
)
def calculate_distances(
model_name: str,
detector_backend: str,
distance_metric: str = "euclidean_l2",
align: bool = True
):
for experiment in ["test", "train", "10_folds"]:
if experiment == "test":
instances = 1000
elif experiment == "train":
instances = 2200
elif experiment == "10_folds":
instances = 6000
else:
raise ValueError(f"unimplemented experiment - {experiment}")
labels = np.load(f"dataset/{experiment}_labels.npy")
alignment_text = "aligned" if align is True else "unaligned"
task = f"{experiment}/{model_name}_{detector_backend}_{distance_metric}_{alignment_text}"
output_file = f"outputs/{task}.csv"
# check file is already available
if os.path.exists(output_file) is True:
continue
distances = []
for i in tqdm(range(0, instances), desc = task):
img1_target = f"lfwe/{experiment}/{i}_1.jpg"
img2_target = f"lfwe/{experiment}/{i}_2.jpg"
result = DeepFace.verify(
img1_path=img1_target,
img2_path=img2_target,
model_name=model_name,
detector_backend=detector_backend,
distance_metric=distance_metric,
align=align,
enforce_detection=False,
expand_percentage=expand_percentage,
)
distance = result["distance"]
distances.append(distance)
# -----------------------------------
df = pd.DataFrame(list(labels), columns = ["actuals"])
df["distances"] = distances
df.to_csv(output_file, index=False)
perform_experiments()
# pre-tuned threshold for single models
if detector_backend == "mtcnn":
thresholds = {
"Facenet": 1.0927487190831375,
"Facenet512": 1.0676744382971612,
"VGG-Face": 1.199458073887602,
"ArcFace": 1.1853355178343647,
"Dlib": 0.4020917206804517,
}
elif detector_backend == "retinaface":
thresholds = {
"Facenet": 1.0771751259493634,
"Facenet512": 1.080821730376328,
"VGG-Face": 1.1952250102966764,
"ArcFace": 1.1601818883318848,
"Dlib": 0.4022031592966787,
}
elif detector_backend == "yunet":
thresholds = {
"Facenet": 1.066751738677861,
"Facenet512": 1.0691771483816928,
"VGG-Face": 1.1802823845238797,
"ArcFace": 1.1945138501899335,
"Dlib": 0.422060409585814,
}
else:
raise ValueError(f"unimplemented detector - {detector_backend}")
tasks = ["train", "test", "10_folds"]
dfs = {}
for task in tasks:
dfs[task] = None
for model in models:
current_df = pd.read_csv(
f"outputs/{task}/{model}_{detector_backend}_euclidean_l2_aligned.csv"
).rename(columns = {"distances": model})
if dfs[task] is None:
dfs[task] = current_df.copy()
else:
current_df = current_df.drop(columns = ["actuals"])
dfs[task] = pd.concat([dfs[task], current_df], axis=1)
dfs["train"].head()
print(f"{more_train=}")
if more_train:
tmp_df = dfs["train"].append(dfs["10_folds"], ignore_index = True)
dfs["train"] = tmp_df.sample(frac = 0.7, random_state=seed)
dfs["10_folds"] = tmp_df.drop(dfs["train"].index)
k = 1
else:
k = 10
def add_classification_results(df: pd.DataFrame) -> pd.DataFrame:
for model_name in models:
idx = df[df[model_name] < thresholds[model_name]].index
df[f"{model_name}_clf"] = -1
df.loc[idx, f"{model_name}_clf"] = 1
return df
dfs["train"] = add_classification_results(dfs["train"])
dfs["10_folds"] = add_classification_results(dfs["10_folds"])
dfs["test"] = add_classification_results(dfs["test"])
def add_classification_sum(df: pd.DataFrame) -> pd.DataFrame:
df["clf_sum"] = 0
for model_name in models:
df["clf_sum"] += df[f"{model_name}_clf"]
return df
dfs["train"] = add_classification_sum(dfs["train"])
dfs["10_folds"] = add_classification_sum(dfs["10_folds"])
dfs["test"] = add_classification_sum(dfs["test"])
def add_distance_additions(df: pd.DataFrame) -> pd.DataFrame:
df["distance_sums"] = 0
for model_name in models:
df["distance_sums"] += df[f"{model_name}"]
return df
dfs["train"] = add_distance_additions(dfs["train"])
dfs["10_folds"] = add_distance_additions(dfs["10_folds"])
dfs["test"] = add_distance_additions(dfs["test"])
def add_distance_multiplications(df: pd.DataFrame) -> pd.DataFrame:
df["distance_multiplications"] = 1
for model_name in models:
df["distance_multiplications"] *= df[f"{model_name}"]
return df
dfs["train"] = add_distance_multiplications(dfs["train"])
dfs["10_folds"] = add_distance_multiplications(dfs["10_folds"])
dfs["test"] = add_distance_multiplications(dfs["test"])
dfs["train"].sample(5)
categorical_features = [column for column in dfs["train"].columns if column.endswith("_clf")]
feature_names = list(dfs["train"].drop(columns=["actuals"]).columns)
# restore sets
y_train = dfs["train"]["actuals"].values
x_train = dfs["train"].drop(columns=["actuals"]).values
y_test = dfs["test"]["actuals"].values
x_test = dfs["test"].drop(columns=["actuals"]).values
y_val = dfs["10_folds"]["actuals"].values
x_val = dfs["10_folds"].drop(columns=["actuals"]).values
params = {
'learning_rate': 0.01
, 'max_depth': 5
, 'max_leaves': pow(2, 5) - 1
, 'n_estimators': 10000
, 'seed': 17
, 'nthread': 2
, 'object': 'binary:logistic'
}
models = []
for k in range(0, 10):
print(f"Training {k}-th model")
model = xgboost.XGBClassifier(**params)
valid_from = k * 600
valid_until = valid_from + 600
_ = model.fit(
x_train,
y_train,
eval_metric='logloss',
eval_set=[(x_val[valid_from:valid_until], y_val[valid_from:valid_until])],
early_stopping_rounds=500,
verbose=False,
)
models.append(model)
def analyze_results(x, y, label):
scores = []
for k in range(0, 10):
model = models[k]
if label == "validation":
x_org = x.copy()
y_org = y.copy()
valid_from = k * 600
valid_until = valid_from + 600
x = x[valid_from:valid_until]
y = y[valid_from:valid_until]
predictions = model.predict(x)
classified = 0
for idx, prediction in enumerate(predictions):
actual = y[idx]
if actual == prediction:
classified += 1
score = 100 * (classified / len(predictions))
print(round(score, 2))
scores.append(score)
# restore
if label == "validation":
x = x_org.copy()
y = y_org.copy()
return scores
train_results = analyze_results(x_train, y_train, "train")
round(sum(train_results)/10, 2)
val_results = analyze_results(x_val, y_val, "validation")
round(sum(val_results)/10, 2)
test_results = analyze_results(x_test, y_test, "test")
round(sum(test_results)/10, 2)