Back to Smolagents

Benchmark date

examples/smolagents_benchmark/score.ipynb

1.24.010.3 KB
Original Source
python
!pip install -e .. datasets sympy numpy matplotlib seaborn -q  # Install dev version of smolagents + some packages
python
# Benchmark date
# - set a concrete date:
DATE = "2024-12-26"
# - or use default: today
# DATE = None

# Evaluation dataset
# - the dataset is gated, so you must first visit its page to request access: https://huggingface.co/datasets/smolagents-benchmark/benchmark-v1
EVAL_DATASET = "smolagents/benchmark-v1"

# Answers dataset: it must be a gated dataset; required to score the answers
ANSWERS_DATASET = "smolagents/answers"
# Whether to push the answers dataset to the Hub
PUSH_ANSWERS_DATASET_TO_HUB = True

# Results dataset
RESULTS_DATASET = "smolagents/results"
# Whether to push the results dataset to the Hub
PUSH_RESULTS_DATASET_TO_HUB = True

Constants and utilities/tools

python
import datetime
import re
import string
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed

import numpy as np
from tqdm import tqdm


def normalize_number_str(number_str: str) -> float:
    # we replace these common units and commas to allow
    # conversion to float
    for char in ["$", "%", ","]:
        number_str = number_str.replace(char, "")
    try:
        return float(number_str)
    except ValueError:
        return float("inf")


def split_string(
    s: str,
    char_list: list[str] = [",", ";"],
) -> list[str]:
    pattern = f"[{''.join(char_list)}]"
    return re.split(pattern, s)


def is_float(element: any) -> bool:
    try:
        float(element)
        return True
    except ValueError:
        return False


def normalize_str(input_str, remove_punct=True) -> str:
    """
    Normalize a string by:
    - Removing all white spaces
    - Optionally removing punctuation (if remove_punct is True)
    - Converting to lowercase
    Parameters:
    - input_str: str, the string to normalize
    - remove_punct: bool, whether to remove punctuation (default: True)
    Returns:
    - str, the normalized string
    """
    # Remove all white spaces. Required e.g for seagull vs. sea gull
    no_spaces = re.sub(r"\s", "", input_str)

    # Remove punctuation, if specified.
    if remove_punct:
        translator = str.maketrans("", "", string.punctuation)
        return no_spaces.lower().translate(translator)
    else:
        return no_spaces.lower()


def extract_numbers(text: str) -> list[str]:
    """This pattern matches:
    - Optional negative sign
    - Numbers with optional comma thousand separators
    - Optional decimal points with decimal numbers
    """
    pattern = r"-?(?:\d{1,3}(?:,\d{3})+|\d+)(?:\.\d+)?"

    return [el.replace(",", "") for el in re.findall(pattern, text)]


def get_question_score_gaia(
    model_answer: str,
    ground_truth: str,
) -> bool:
    """Scoring function used to score functions from the GAIA benchmark"""
    if is_float(ground_truth):
        normalized_answer = normalize_number_str(str(model_answer))
        return normalized_answer == float(ground_truth)

    elif any(char in ground_truth for char in [",", ";"]):  # if gt is a list
        # question with the fish: normalization removes punct
        gt_elems = split_string(ground_truth)
        ma_elems = split_string(model_answer)

        if len(gt_elems) != len(ma_elems):  # check length is the same
            warnings.warn("Answer lists have different lengths, returning False.", UserWarning)
            return False

        comparisons = []
        for ma_elem, gt_elem in zip(ma_elems, gt_elems):  # compare each element as float or str
            if is_float(gt_elem):
                normalized_ma_elem = normalize_number_str(ma_elem)
                comparisons.append(normalized_ma_elem == float(gt_elem))
            else:
                # we do not remove punct since comparisons can include punct
                comparisons.append(
                    normalize_str(ma_elem, remove_punct=False) == normalize_str(gt_elem, remove_punct=False)
                )
        return all(comparisons)

    else:  # if gt is a str
        return normalize_str(model_answer) == normalize_str(ground_truth)


def get_correct(row):
    if row["source"] == "MATH":  # Checks the last number in answer
        numbers_answer = extract_numbers(str(row["answer"]))
        if len(numbers_answer) == 0:
            return False
        return np.isclose(float(numbers_answer[-1]), float(row["true_answer"]), rtol=1e-5, atol=1e-7)
    else:
        return get_question_score_gaia(str(row["answer"]), str(row["true_answer"]))


def score_answers_subset(answers_dataset, answers_subset):
    try:
        print(answers_dataset, answers_subset)
        *model_id, action_type, task = answers_subset.split("__")
        model_id = "/".join(model_id)
        ds = datasets.load_dataset(answers_dataset, answers_subset, split="test")
        df = ds.to_pandas()
        df["correct"] = df.apply(get_correct, axis=1)
        assert df["correct"].notnull().sum() > 30, "Missing answers"
        acc = df["correct"].mean().item()
        result = df.loc[0, ["model_id", "agent_action_type", "source"]].to_dict()
        result["acc"] = acc
        return result
    except Exception as e:
        print(f"Error with {answers_subset}: {e}")
        return None


def score_answers(
    answers_subsets,
    answers_dataset=ANSWERS_DATASET,
    date=DATE,
    push_to_hub_dataset=RESULTS_DATASET if PUSH_RESULTS_DATASET_TO_HUB else None,
    set_default=True,
):
    """
    Score answers from the given dataset subsets.

    Parameters:
        answers_subsets: List of dataset subsets to score
        answers_dataset: Dataset containing the answers
        date: Date to use for the config name
        push_to_hub_dataset: Dataset ID to push results to, or None to skip pushing
        set_default: If True, sets this config as the default config in the Hugging Face Hub dataset.
                     This means when users load the dataset without specifying a config,
                     this version will be loaded by default.
    """
    if not answers_dataset:
        raise ValueError("Pass 'answers_dataset' to load the answers from it")
    date = date or datetime.date.today().isoformat()
    results = []
    with ThreadPoolExecutor(max_workers=16) as exe:
        futures = [
            exe.submit(score_answers_subset, answers_dataset, answers_subset) for answers_subset in answers_subsets
        ]
        for f in tqdm(as_completed(futures), total=len(answers_subsets), desc="Processing tasks"):
            result = f.result()
            if result:
                results.append(result)
    df = pd.DataFrame(results)

    if push_to_hub_dataset:
        ds = datasets.Dataset.from_pandas(df)
        config = date
        ds.push_to_hub(push_to_hub_dataset, config_name=config, commit_message=f"Upload {config} results")
    return df

Score answers

python
import datasets
import pandas as pd


# Choose the answers subsets to score:
# answers_subsets = ["meta-llama__Llama-3.1-8B-Instruct__code__gaia"]
# or get all the answers subsets present in the ANSWERS_DATASET
answers_subsets = datasets.get_dataset_config_names(ANSWERS_DATASET)
print("Number of answers_subsets", len(answers_subsets))
print("Example of answers_subset", answers_subsets[0])

result_df = score_answers(answers_subsets)
result_df["acc"] = (result_df["acc"] * 100).round(2)
result_df.head()
python
pivot_df = result_df.pivot_table(
    index=["model_id", "source"],
    columns=["agent_action_type"],
    values="acc",
    fill_value=float("nan"),
).reset_index()

Display results

python
display(pivot_df)
python
import matplotlib.pyplot as plt
from matplotlib.legend_handler import HandlerTuple  # Added import


# Assuming pivot_df is your original dataframe
models = pivot_df["model_id"].unique()
sources = pivot_df["source"].unique()

# Create figure and axis
plt.style.use("seaborn-v0_8-white")
fig, ax = plt.subplots(figsize=(15, 6))

# Set the width of each bar group and positions of the bars
width = 0.15  # width of each bar
spacing = 0.02  # space between bars within a group
group_spacing = 0.2  # space between model groups

# Calculate positions for the bars
num_sources = len(sources)
total_width_per_group = (width + spacing) * num_sources * 2  # *2 for agent and vanilla
x = np.arange(len(models)) * (total_width_per_group + group_spacing)

# Plot bars for each source
for i, source in enumerate(sources):
    source_data = pivot_df[pivot_df["source"] == source]
    agent_scores = [
        source_data[source_data["model_id"] == model]["code"].values[0]
        if len(source_data[source_data["model_id"] == model]) > 0
        else np.nan
        for model in models
    ]
    vanilla_scores = [
        source_data[source_data["model_id"] == model]["vanilla"].values[0]
        if len(source_data[source_data["model_id"] == model]) > 0
        else np.nan
        for model in models
    ]

    # Position calculation for each pair of bars
    pos = x + i * (width * 2 + spacing)

    agent_bars = ax.bar(pos, agent_scores, width, label=f"{source} (Agent)", alpha=0.8)
    vanilla_bars = ax.bar(
        pos + width * 0.6,
        vanilla_scores,
        width,
        hatch="////",
        alpha=0.5,
        hatch_linewidth=2,
        label=f"{source} (Vanilla)",
        color="white",
        edgecolor=agent_bars[0].get_facecolor(),
    )

# Customize the plot
ax.set_ylabel("Score")
ax.set_title("Model Performance Comparison")

# Set x-axis ticks in the middle of each group
group_centers = x + (total_width_per_group - spacing) / 2
ax.set_xticks(group_centers)

# Wrap long model names to prevent overlap
wrapped_labels = ["\n".join(model.split("/")) for model in models]
ax.set_xticklabels(wrapped_labels, rotation=0, ha="center")

# Modify legend to combine agent and vanilla entries
handles, labels = ax.get_legend_handles_labels()
unique_sources = sources
legend_elements = [
    (handles[i * 2], handles[i * 2 + 1], labels[i * 2].replace(" (Agent)", "")) for i in range(len(unique_sources))
]
custom_legend = ax.legend(
    [(agent_handle, vanilla_handle) for agent_handle, vanilla_handle, _ in legend_elements],
    [label for _, _, label in legend_elements],
    handler_map={tuple: HandlerTuple(ndivide=None)},
    bbox_to_anchor=(1.05, 1),
    loc="upper left",
)

ax.yaxis.grid(True, linestyle="--", alpha=0.3)
ax.set_ylim(bottom=0)
plt.tight_layout()
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)

plt.show()