Back to Deepface

Perform Experiments with DeepFace on LFW dataset

boosted/Perform-Boosting-Experiments-XGBoost.ipynb

0.0.9911.4 KB
Original Source

Perform Experiments with DeepFace on LFW dataset

python
# built-in dependencies
import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import statistics

# 3rd party dependencies
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.datasets import fetch_lfw_pairs
from deepface import DeepFace
import lightgbm as lgb
import xgboost
from xgboost import plot_importance
from sklearn.model_selection import KFold
from tqdm import tqdm
python
seed = 17
detector_backend = "retinaface"
more_train = False # append some of validation data into train
python
print(f"This experiment is done with pip package of deepface with {DeepFace.__version__} version")

Configuration Sets

python
# all configuration alternatives for 4 dimensions of arguments
alignment = [True]

models = ["Facenet", "Facenet512", "VGG-Face", "ArcFace", "Dlib"]
detectors = ["retinaface"]

metrics = ["euclidean_l2"]
expand_percentage = 0

Create Required Folders if necessary

python
target_paths = [
    "lfwe",
    "lfwe/test",
    "lfwe/train",
    "lfwe/10_folds",
    "dataset",
    "outputs",
    "outputs/test",
    "outputs/train",
    "outputs/10_folds",
    "results"
]
for target_path in target_paths:
    if os.path.exists(target_path) != True:
        os.mkdir(target_path)
        print(f"{target_path} is just created")

Load LFW Datasets

python
def retrieve_lfe(task: str):
    if task == "test":
        instances = 1000
    elif task == "train":
        instances = 2200
    elif task == "10_folds":
        instances = 6000
    else:
        raise ValueError(f"unimplemented task - {task}")

    pairs_touch = f"outputs/{task}_lfwe.txt"

    target_path = f"dataset/{task}_lfw.npy"
    labels_path = f"dataset/{task}_labels.npy"

    if os.path.exists(target_path) != True:
        fetched_lfw_pairs = fetch_lfw_pairs(
            subset = task,
            color = True,
            # memory allocation problem occurs for validation set
            resize = 2 if task != "10_folds" else 1,
            funneled = False,
            slice_=None,
        )
        print("fetched")
        pairs = fetched_lfw_pairs.pairs
        labels = fetched_lfw_pairs.target
        # target_names = fetched_lfw_pairs.target_names
        np.save(target_path, pairs)
        np.save(labels_path, labels)
    else:
        if os.path.exists(pairs_touch) != True:
            # loading pairs takes some time. but if we extract these pairs as image, no need to load it anymore
            pairs = np.load(target_path)
        labels = np.load(labels_path)
    
    # store to file system
    for i in tqdm(range(0, instances)):
        img1_target = f"lfwe/{task}/{i}_1.jpg"
        img2_target = f"lfwe/{task}/{i}_2.jpg"
        
        if os.path.exists(img1_target) != True:
            img1 = pairs[i][0]
            # plt.imsave(img1_target, img1/255) #works for my mac
            plt.imsave(img1_target, img1) #works for my debian
        
        if os.path.exists(img2_target) != True:
            img2 = pairs[i][1]
            # plt.imsave(img2_target, img2/255) #works for my mac
            plt.imsave(img2_target, img2) #works for my debian
        
    if os.path.exists(pairs_touch) != True:
        open(pairs_touch,'a').close()
    
python
retrieve_lfe(task = "test")
retrieve_lfe(task = "train")
retrieve_lfe(task = "10_folds")

Perform Experiments

python
def perform_experiments():    
    for model_name in models:
        for detector_backend in detectors:
            for distance_metric in metrics:
                for align in alignment:
                    
                    if detector_backend == "skip" and align is True:
                        # Alignment is not possible for a skipped detector configuration
                        continue
                    
                    calculate_distances(
                        model_name=model_name,
                        detector_backend=detector_backend,
                        distance_metric=distance_metric,
                        align=align,
                    )
                    
def calculate_distances(
        model_name: str,
        detector_backend: str,
        distance_metric: str = "euclidean_l2",
        align: bool = True
):
    for experiment in ["test", "train", "10_folds"]:
        if experiment == "test":
            instances = 1000
        elif experiment == "train":
            instances = 2200
        elif experiment == "10_folds":
            instances = 6000
        else:
            raise ValueError(f"unimplemented experiment - {experiment}")

        labels = np.load(f"dataset/{experiment}_labels.npy")

        alignment_text = "aligned" if align is True else "unaligned"
        task = f"{experiment}/{model_name}_{detector_backend}_{distance_metric}_{alignment_text}"
        output_file = f"outputs/{task}.csv"

        # check file is already available
        if os.path.exists(output_file) is True:
            continue
        
        distances = []
        for i in tqdm(range(0, instances), desc = task):
            img1_target = f"lfwe/{experiment}/{i}_1.jpg"
            img2_target = f"lfwe/{experiment}/{i}_2.jpg"
            result = DeepFace.verify(
                img1_path=img1_target,
                img2_path=img2_target,
                model_name=model_name,
                detector_backend=detector_backend,
                distance_metric=distance_metric,
                align=align,
                enforce_detection=False,
                expand_percentage=expand_percentage,
            )
            distance = result["distance"]
            distances.append(distance)
        # -----------------------------------
        df = pd.DataFrame(list(labels), columns = ["actuals"])
        df["distances"] = distances
        df.to_csv(output_file, index=False)
python
perform_experiments()

Data Frame

python
# pre-tuned threshold for single models
if detector_backend == "mtcnn":
    thresholds = {
        "Facenet": 1.0927487190831375,
        "Facenet512": 1.0676744382971612,
        "VGG-Face": 1.199458073887602,
        "ArcFace": 1.1853355178343647,
        "Dlib": 0.4020917206804517,
    }
elif detector_backend == "retinaface":
    thresholds = {
        "Facenet": 1.0771751259493634,
        "Facenet512": 1.080821730376328,
        "VGG-Face": 1.1952250102966764,
        "ArcFace": 1.1601818883318848,
        "Dlib": 0.4022031592966787,
    }
elif detector_backend == "yunet":
    thresholds = {
        "Facenet": 1.066751738677861,
        "Facenet512": 1.0691771483816928,
        "VGG-Face": 1.1802823845238797,
        "ArcFace": 1.1945138501899335,
        "Dlib": 0.422060409585814,
    }
else:
    raise ValueError(f"unimplemented detector - {detector_backend}")
python
tasks = ["train", "test", "10_folds"]

dfs = {}
for task in tasks:
    dfs[task] = None
    for model in models:
        current_df = pd.read_csv(
            f"outputs/{task}/{model}_{detector_backend}_euclidean_l2_aligned.csv"
        ).rename(columns = {"distances": model})

        if dfs[task] is None:
            dfs[task] = current_df.copy()
        else:
            current_df = current_df.drop(columns = ["actuals"])
            dfs[task] = pd.concat([dfs[task], current_df], axis=1)
python
dfs["train"].head()
python
print(f"{more_train=}")
if more_train:
    tmp_df = dfs["train"].append(dfs["10_folds"], ignore_index = True)
    dfs["train"] = tmp_df.sample(frac = 0.7, random_state=seed)
    dfs["10_folds"] = tmp_df.drop(dfs["train"].index)
    k = 1
else:
    k = 10
python
def add_classification_results(df: pd.DataFrame) -> pd.DataFrame:
    for model_name in models:
        idx = df[df[model_name] < thresholds[model_name]].index
        df[f"{model_name}_clf"] = -1
        df.loc[idx, f"{model_name}_clf"] = 1
    return df

dfs["train"] = add_classification_results(dfs["train"])
dfs["10_folds"] = add_classification_results(dfs["10_folds"])
dfs["test"] = add_classification_results(dfs["test"])
python
def add_classification_sum(df: pd.DataFrame) -> pd.DataFrame:
    df["clf_sum"] = 0
    for model_name in models:
        df["clf_sum"] += df[f"{model_name}_clf"]
    return df

dfs["train"] = add_classification_sum(dfs["train"])
dfs["10_folds"] = add_classification_sum(dfs["10_folds"])
dfs["test"] = add_classification_sum(dfs["test"])
python
def add_distance_additions(df: pd.DataFrame) -> pd.DataFrame:
    df["distance_sums"] = 0
    for model_name in models:
        df["distance_sums"] += df[f"{model_name}"]
    return df

dfs["train"] = add_distance_additions(dfs["train"])
dfs["10_folds"] = add_distance_additions(dfs["10_folds"])
dfs["test"] = add_distance_additions(dfs["test"])
python
def add_distance_multiplications(df: pd.DataFrame) -> pd.DataFrame:
    df["distance_multiplications"] = 1
    for model_name in models:
        df["distance_multiplications"] *= df[f"{model_name}"]
    return df

dfs["train"] = add_distance_multiplications(dfs["train"])
dfs["10_folds"] = add_distance_multiplications(dfs["10_folds"])
dfs["test"] = add_distance_multiplications(dfs["test"])
python
dfs["train"].sample(5)
python
categorical_features = [column for column in dfs["train"].columns if column.endswith("_clf")]
feature_names = list(dfs["train"].drop(columns=["actuals"]).columns)

XGBoost

python
# restore sets
y_train = dfs["train"]["actuals"].values
x_train = dfs["train"].drop(columns=["actuals"]).values

y_test = dfs["test"]["actuals"].values
x_test = dfs["test"].drop(columns=["actuals"]).values

y_val = dfs["10_folds"]["actuals"].values
x_val = dfs["10_folds"].drop(columns=["actuals"]).values
python
params = {
    'learning_rate': 0.01
    , 'max_depth': 5
    , 'max_leaves': pow(2, 5) - 1
    , 'n_estimators': 10000
    , 'seed': 17
    , 'nthread':  2
    , 'object':  'binary:logistic'
}
python
models = []
for k in range(0, 10):
    print(f"Training {k}-th model")
    
    model = xgboost.XGBClassifier(**params)

    valid_from = k * 600
    valid_until = valid_from + 600

    _ = model.fit(
        x_train,
        y_train,
        eval_metric='logloss',
        eval_set=[(x_val[valid_from:valid_until], y_val[valid_from:valid_until])],
        early_stopping_rounds=500,
        verbose=False,
    )

    models.append(model)
python
def analyze_results(x, y, label):
    scores = []
    for k in range(0, 10):
        model = models[k]

        if label == "validation":
            x_org  = x.copy()
            y_org = y.copy()

            valid_from = k * 600
            valid_until = valid_from + 600

            x = x[valid_from:valid_until]
            y = y[valid_from:valid_until]

        predictions = model.predict(x)

        classified = 0
        for idx, prediction in enumerate(predictions):
            actual = y[idx]
            if actual == prediction:
                classified += 1

        score = 100 * (classified / len(predictions))
        print(round(score, 2))
        scores.append(score)

        # restore
        if label == "validation":
            x = x_org.copy()
            y = y_org.copy()

    return scores
python
train_results = analyze_results(x_train, y_train, "train")
python
round(sum(train_results)/10, 2)
python
val_results = analyze_results(x_val, y_val, "validation")
python
round(sum(val_results)/10, 2)
python
test_results = analyze_results(x_test, y_test, "test")
python
round(sum(test_results)/10, 2)