apps/opik-documentation/documentation/fern/docs/evaluation/evaluate_your_llm.mdx
Evaluating your LLM application allows you to have confidence in the performance of your LLM application. In this guide, we will walk through the process of evaluating complex applications like LLM chains or agents.
<Tip> In this guide, we will focus on evaluating complex LLM applications. If you are looking at evaluating single prompts you can refer to the [Evaluate A Prompt](/v1/evaluation/evaluate_prompt) guide. </Tip>The evaluation is done in five steps:
Dataset that you would like to evaluate your application onWhile not required, we recommend adding tracking to your LLM application. This allows you to have
full visibility into each evaluation run. In the example below we will use a combination of the
track decorator and the track_openai function to trace the LLM application.
openai_client = track_openai(openai.OpenAI())
# This method is the LLM application that you want to evaluate
# Typically this is not updated when creating evaluations
@track
def your_llm_application(input: str) -> str:
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": input}],
)
return response.choices[0].message.content
```
Once you have added instrumentation to your LLM application, we can define the evaluation task. The evaluation task takes in as an input a dataset item and needs to return a dictionary with keys that match the parameters expected by the metrics you are using. In this example we can define the evaluation task as follows:
<CodeBlocks> ```typescript title="TypeScript" language="typescript" import { EvaluationTask } from "opik"; import { OpenAI } from "openai";// Define dataset item type type DatasetItem = { input: string; expected: string; };
const llmTask: EvaluationTask<DatasetItem> = async (datasetItem) => { const { input } = datasetItem;
const openai = new OpenAI();
const response = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
{ role: "system", content: "You are a coding assistant" },
{ role: "user", content: input }
],
});
return { output: response.choices[0].message.content };
};
```python title="Python" language="python"
def evaluation_task(x):
return {
"output": your_llm_application(x['user_question'])
}
In order to create an evaluation experiment, you will need to have a Dataset that includes all your test cases.
If you have already created a Dataset, you can use the get or create dataset methods to fetch it.
<CodeBlocks> ```typescript title="TypeScript" language="typescript" import { Opik } from "opik";const client = new Opik();
const dataset = await client.getOrCreateDataset<DatasetItem>("Example dataset", "Evaluation dataset", "my-project");
// Opik deduplicates items that are inserted into a dataset so we can insert them
// for multiple times
await dataset.insert([
{
input: "Hello, world!",
expected: "Hello, world!"
},
{
input: "What is the capital of France?",
expected: "Paris"
},
]);
```
```python title="Python" language="python"
from opik import Opik
client = Opik()
dataset = client.get_or_create_dataset(name="Example dataset", project_name="my-project")
# Opik deduplicates items that are inserted into a dataset so we can insert them
# for multiple times
dataset.insert([
{
"input": "Hello, world!",
"expected_output": "Hello, world!"
},
{
"input": "What is the capital of France?",
"expected_output": "Paris"
},
])
```
</CodeBlocks>
Opik provides a set of built-in evaluation metrics that you can choose from. These are broken down into two main categories:
equals or containshallucinations or context relevanceIn the same evaluation experiment, you can use multiple metrics to evaluate your application:
<CodeBlocks> ```typescript title="TypeScript" language="typescript" import { ExactMatch } from "opik";const exact_match_metric = new ExactMatch();
```python title="Python" language="python"
from opik.evaluation.metrics import Hallucination
hallucination_metric = Hallucination()
Now that we have the task we want to evaluate, the dataset to evaluate on, and the metrics we want to evaluate with, we can run the evaluation:
<CodeBlocks> ```typescript title="TypeScript" language="typescript" maxLines=1000 import { EvaluationTask, Opik, ExactMatch, evaluate } from "opik"; import { OpenAI } from "openai";// Define dataset item type
type DatasetItem = {
input: string;
expected: string;
};
// Define the evaluation task
const llmTask: EvaluationTask<DatasetItem> = async (datasetItem) => {
const { input } = datasetItem;
const openai = new OpenAI();
const response = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
{ role: "system", content: "You are a coding assistant" },
{ role: "user", content: input }
],
});
return { output: response.choices[0].message.content };
};
// Get or create the dataset - items are automatically deduplicated
const client = new Opik();
const dataset = await client.getOrCreateDataset<DatasetItem>("Example dataset", "Evaluation dataset", "my-project");
await dataset.insert([
{
input: "Hello, world!",
expected: "Hello, world!"
},
{
input: "What is the capital of France?",
expected: "Paris"
},
]);
// Define the metric
const exact_match_metric = new ExactMatch();
// Run the evaluation
const result = await evaluate({
dataset,
task: llmTask,
scoringMetrics: [exact_match_metric],
experimentName: "Example Evaluation",
projectName: "my-project",
});
console.log(`Experiment ID: ${result.experimentId}`);
console.log(`Experiment Name: ${result.experimentName}`);
console.log(`Total test cases: ${result.testResults.length}`);
```
```python title="Python" language="python" maxLines=1000
import opik
from opik import Opik, track
from opik.evaluation import evaluate
from opik.evaluation.metrics import Equals, Hallucination
from opik.integrations.openai import track_openai
import openai
opik.configure(project_name="my-project")
# Define the task to evaluate
openai_client = track_openai(openai.OpenAI())
MODEL = "gpt-3.5-turbo"
@track
def your_llm_application(input: str) -> str:
response = openai_client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": input}],
)
return response.choices[0].message.content
# Define the evaluation task
def evaluation_task(x):
return {
"output": your_llm_application(x['input'])
}
# Create a simple dataset
client = Opik()
dataset = client.get_or_create_dataset(name="Example dataset", project_name="my-project")
dataset.insert([
{"input": "What is the capital of France?"},
{"input": "What is the capital of Germany?"},
])
# Define the metrics
hallucination_metric = Hallucination()
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
project_name="my-project",
experiment_config={
"model": MODEL
}
)
```
Once the evaluation is complete, you will get a link to the Opik UI where you can analyze the evaluation results. In addition to being able to deep dive into each test case, you will also be able to compare multiple experiments side by side.
<Frame> </Frame>When you face the opik.exceptions.ScoreMethodMissingArguments exception, it means that the dataset
item and task output dictionaries do not contain all the arguments expected by the scoring method.
The way the evaluate function works is by merging the dataset item and task output dictionaries and
then passing the result to the scoring method. For example, if the dataset item contains the keys
user_question and context while the evaluation task returns a dictionary with the key output,
the scoring method will be called as scoring_method.score(user_question='...', context= '...', output= '...').
This can be an issue if the scoring method expects a different set of arguments.
You can solve this by either updating the dataset item or evaluation task to return the missing
arguments or by using the scoring_key_mapping parameter of the evaluate function. In the example
above, if the scoring method expects input as an argument, you can map the user_question key to
the input key as follows:
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
scoring_key_mapping={"input": "user_question"},
)
The Opik prompt library can be used to version your prompt templates.
When creating an Experiment, you can link the Experiment to a specific prompt version:
<CodeBlocks> ```typescript title="TypeScript" language="typescript" import { Opik, Prompt, evaluate, evaluatePrompt } from 'opik'; import { Hallucination } from 'opik';// Create a prompt
const prompt = new Prompt({
name: "My prompt",
prompt: "Translate to French: {{input}}",
projectName: "my-project",
});
// Link prompt to evaluation experiment
await evaluatePrompt({
dataset: myDataset,
messages: [
{ role: "user", content: "Translate to French: {{input}}" },
],
model: "gpt-4o",
scoringMetrics: [new Hallucination()],
prompts: [prompt],
projectName: "my-project",
});
```
```python title="Python" language="python"
import opik
# Create a prompt
prompt = opik.Prompt(
name="My prompt",
prompt="...",
project_name="my-project",
)
# Run the evaluation
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
prompts=[prompt],
project_name="my-project",
)
```
The experiment will now be linked to the prompt allowing you to view all experiments that use a specific prompt:
<Frame> </Frame>You can use the project_name parameter of the evaluate function to log evaluation traces to a specific project:
```python title="Python" language="python"
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
project_name="hallucination-detection",
)
```
You can use the nb_samples parameter to specify the number of samples to use for the evaluation. This is useful if you only want to evaluate a subset of the dataset.
```python title="Python" language="python"
evaluation = evaluate(
experiment_name="My experiment",
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
nb_samples=10,
)
```
You can evaluate only a subset of your dataset items by using the dataset_filter_string parameter. This is useful when you want to run experiments on specific categories of data or test particular scenarios:
# Evaluate only items with specific tags
evaluation = evaluate(
experiment_name="Production test cases",
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
dataset_filter_string='tags contains "production"',
)
# Evaluate items matching multiple conditions
evaluation = evaluate(
experiment_name="Hard finance questions",
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
dataset_filter_string='data.category = "finance" AND data.difficulty = "hard"',
)
# Filter by date range
evaluation = evaluate(
experiment_name="Recent test cases",
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
dataset_filter_string='created_at >= "2024-06-01T00:00:00Z"',
)
```
The filter uses Opik Query Language (OQL) syntax. For more details on filter syntax and supported columns, see Filtering syntax.
<Tip> You can combine filtering with other parameters like `nb_samples` to evaluate a specific number of items from a filtered subset. </Tip>You can use the dataset_sampler parameter to specify the instance of dataset sampler to use for sampling the dataset.
This is useful if you want to sample the dataset differently than the default sampling strategy (accept all items).
For example, you can use the RandomDatasetSampler to sample the dataset randomly:
evaluation = evaluate(
experiment_name="My experiment",
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
dataset_sampler=samplers.RandomDatasetSampler(max_samples=10),
)
```
In the example above, the evaluation will sample 10 random items from the dataset.
Also, you can implement your own dataset sampler by extending the BaseDatasetSampler and overriding the sample method.
from opik.api_objects.dataset import dataset_item
from opik.evaluation import samplers
class MyDatasetSampler(samplers.BaseDatasetSampler):
def __init__(self, filter_string: str, field_name: str) -> None:
self.filter_regex = re.compile(filter_string)
self.field_name = field_name
def sample(self, dataset: List[dataset_item.DatasetItem]) -> List[dataset_item.DatasetItem]:
# Sample items from the dataset that match the filter string in the 'field_name' field
return [item for item in filter(lambda x: self.filter_regex.search(x[self.field_name]), dataset)]
# Example usage
evaluation = evaluate(
experiment_name="My experiment",
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
dataset_sampler=MyDatasetSampler(filter_string="\\.*SUCCESS\\.*", field_name="output"),
)
```
Implementing your own dataset sampler is useful if you want to implement a custom sampling strategy. For instance, you can implement a dataset sampler that samples the dataset using some filtering criteria as in the example above.
The evaluate function returns an EvaluationResult object that contains the evaluation results.
You can create aggregated statistics for each metric by calling its aggregate_evaluation_scores method:
# Retrieve and print the aggregated scores statistics (mean, min, max, std) per metric
scores = evaluation.aggregate_evaluation_scores()
for metric_name, statistics in scores.aggregated_scores.items():
print(f"{metric_name}: {statistics}")
```
Aggregated statistics can help analyze evaluation results and are useful for comparing the performance of different models or different versions of the same model, for example.
In addition to per-item metrics, you can compute experiment-level aggregate metrics that are calculated across all test results. These experiment scores are displayed in the Opik UI alongside feedback scores and can be used for sorting and filtering experiments.
Experiment scores are computed after all test results are collected. You define experiment score functions that take a list of TestResult objects and return a list of ScoreResult objects representing aggregate metrics.
def compute_hallucination_max( test_results: List[test_result.TestResult], ) -> List[score_result.ScoreResult]: """Compute the maximum hallucination score across all test results.""" hallucination_scores = [ result.score_results[0].value for result in test_results if result.score_results and len(result.score_results) > 0 ]
if not hallucination_scores:
return []
return [
score_result.ScoreResult(
name="hallucination_metric (max)",
value=max(hallucination_scores),
reason=f"Maximum hallucination score across {len(hallucination_scores)} test cases"
)
]
evaluation = evaluate( dataset=dataset, task=evaluation_task, scoring_metrics=[Hallucination()], experiment_scoring_functions=[compute_hallucination_max], experiment_name="My experiment" )
print(f"Experiment scores: {evaluation.experiment_scores}")
</CodeBlocks>
<Tip>
Experiment scores are displayed in the Opik UI in the experiments table alongside feedback scores. They can be used for sorting and filtering experiments, making it easy to compare experiments based on aggregate metrics.
</Tip>
You can define multiple experiment score functions to compute different aggregate metrics:
<CodeBlocks>
```python title="Python" language="python"
from typing import List
from opik.evaluation import evaluate, test_result
from opik.evaluation.metrics import Equals, score_result
def compute_accuracy_stats(
test_results: List[test_result.TestResult],
) -> List[score_result.ScoreResult]:
"""Compute accuracy statistics across all test results."""
accuracy_scores = [
result.score_results[0].value
for result in test_results
if result.score_results and len(result.score_results) > 0
]
if not accuracy_scores:
return []
return [
score_result.ScoreResult(
name="accuracy (mean)",
value=sum(accuracy_scores) / len(accuracy_scores),
reason=f"Mean accuracy across {len(accuracy_scores)} test cases"
),
score_result.ScoreResult(
name="accuracy (min)",
value=min(accuracy_scores),
reason=f"Minimum accuracy across {len(accuracy_scores)} test cases"
),
score_result.ScoreResult(
name="accuracy (max)",
value=max(accuracy_scores),
reason=f"Maximum accuracy across {len(accuracy_scores)} test cases"
),
]
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_metrics=[Equals()],
experiment_scoring_functions=[compute_accuracy_stats],
experiment_name="My experiment"
)
The evaluate function does not support async evaluation tasks, if you pass
an async task you will get an error similar to:
Input should be a valid dictionary [type=dict_type, input_value='<coroutine object kyc_qu...ng_task at 0x3336d0a40>', input_type=str]
As it might not always be possible to convert all your LLM logic to not rely on async logic,
we recommend using asyncio.run within the evaluation task:
import asyncio
async def your_llm_application(input: str) -> str:
return "Hello, World"
def evaluation_task(x):
# your_llm_application here is an async function
result = asyncio.run(your_llm_application(x['input']))
return {
"output": result
}
This should solve the issue and allow you to run the evaluation.
<Tip> If you are running in a Jupyter notebook, you will need to add the following line to the top of your notebook:import nest_asyncio
nest_asyncio.apply()
otherwise you might get the error RuntimeError: asyncio.run() cannot be called from a running event loop
If you run into any issues, you can disable the multi-threading in the SDK by setting task_threads to 1:
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_metrics=[hallucination_metric],
task_threads=1
)
In order to evaluate datasets more efficiently, Opik uses multiple background threads to evaluate the dataset. If this is causing issues, you can disable these by setting task_threads and scoring_threads to 1 which will lead Opik to run all calculations in the main thread.
evaluation_taskSometimes your evaluation task needs extra context besides the dataset item (commonly referred to as x). For example, you may want to pass a model name, a system prompt, or a pre-initialized client.
Since evaluate calls the task as task(x) for each dataset item, the recommended pattern is to create a wrapper (or use functools.partial) that closes over any additional arguments.
Using a wrapper function:
# Extra dependencies you want to provide to the task
MODEL = "gpt-4o"
IMAGE_TYPE = "thumbnail"
def evaluation_task(x, model, image_type, client, prompt):
full_response = client.get_answer(
x["question"],
x["image_paths"][image_type],
prompt.format(),
model=model,
)
response = full_response["response"]
return {
"response": response,
"bbox": full_response.get("bounding_boxes"),
"image_url": full_response.get("image_url"),
}
def make_task(model, image_type, client, prompt):
# Return a unary function that evaluate() can call as task(x)
def _task(x):
return evaluation_task(x, model, image_type, client, prompt)
return _task
task = make_task(MODEL, IMAGE_TYPE, bot, system_prompt)
evaluation = evaluate(
dataset=dataset,
task=task, # evaluate will call task(x) for each item
scoring_metrics=[levenshteinratio_metric],
scoring_key_mapping={
"input": "question",
"output": "response",
"reference": "expected_answer",
},
)
In addition to using built-in metrics, Opik allows you to define custom scoring functions to evaluate your LLM applications. Scoring functions give you complete control over how your outputs are evaluated and can be tailored to your specific use cases.
There are two types of scoring functions you can use:
dataset_item and task_outputs parameterstask_span parameter for advanced evaluationPlain scoring functions receive dataset inputs and task outputs, making them ideal for evaluating the final results of your LLM application:
<CodeBlocks> ```python title="Python" language="python" from typing import Dict, Any from opik.evaluation.metrics import score_resultdef custom_equals_scorer( dataset_item: Dict[str, Any], task_outputs: Dict[str, Any] ) -> score_result.ScoreResult: """ Custom scoring function that compares expected output with actual output.
Args:
dataset_item: Data from the dataset item (includes expected outputs)
task_outputs: Outputs from the evaluation task
"""
expected = dataset_item.get("expected_output")
actual = task_outputs.get("output")
if expected == actual:
score = 1.0
reason = "Perfect match"
else:
score = 0.0
reason = f"Mismatch: expected '{expected}', got '{actual}'"
return score_result.ScoreResult(
name="custom_equals_scorer",
value=score,
reason=reason
)
</CodeBlocks>
You can use your custom scoring functions alongside built-in metrics:
<CodeBlocks>
```python title="Python" language="python"
from opik import evaluate
from opik.evaluation.metrics import Hallucination
# Create dataset
dataset = opik_client.create_dataset("custom_evaluation_dataset", project_name="my-project")
dataset.insert([
{
"input": "What is the capital of France?",
"expected_output": "Paris"
},
{
"input": "What is 2 + 2?",
"expected_output": "4"
}
])
# Define evaluation task
def evaluation_task(item):
# Your LLM application logic here
return {"output": your_llm_application(item["input"])}
# Run evaluation with custom scoring functions
evaluation = evaluate(
dataset=dataset,
task=evaluation_task,
scoring_functions=[
custom_equals_scorer
],
scoring_metrics=[
Hallucination() # Mix with built-in metrics
],
experiment_name="Custom Scoring Experiment"
)
Task span scoring functions provide access to detailed execution information about your LLM tasks. These functions receive a task_span parameter containing structured data about the task execution, including input, output, metadata, and nested operations.
Task span functions are particularly useful for evaluating:
Task span scoring functions accept a task_span parameter which is a SpanModel object:
def execution_time_scorer( task_span: SpanModel ) -> score_result.ScoreResult: """ Scoring function that evaluates based on execution time.
Args:
task_span: Complete execution information including timing
"""
if task_span.start_time and task_span.end_time:
duration = (task_span.end_time - task_span.start_time).total_seconds()
# Score based on execution speed
if duration < 1.0:
score = 1.0
reason = f"Fast execution: {duration:.2f}s"
elif duration < 5.0:
score = 0.8
reason = f"Acceptable execution time: {duration:.2f}s"
else:
score = 0.5
reason = f"Slow execution: {duration:.2f}s"
else:
score = 0.0
reason = "Cannot determine execution time"
return score_result.ScoreResult(
name="execution_time_scorer",
value=score,
reason=reason
)
def task_name_scorer( task_span: SpanModel ) -> score_result.ScoreResult: """ Scoring function that validates the task span name. """ expected_name = "your_llm_application" # Adjust to your function name
score = 1.0 if task_span.name == expected_name else 0.0
reason = f"Task name: '{task_span.name}'"
return score_result.ScoreResult(
name="task_name_scorer",
value=score,
reason=reason
)
</CodeBlocks>
##### Combined Scoring Functions
You can also create scoring functions that use both dataset inputs/outputs AND task span information:
<CodeBlocks>
```python title="Python" language="python"
def comprehensive_scorer(
dataset_item: Dict[str, Any],
task_outputs: Dict[str, Any],
task_span: SpanModel
) -> score_result.ScoreResult:
"""
Comprehensive scoring function using all available information.
Args:
dataset_item: Dataset item data
task_outputs: Task execution outputs
task_span: Detailed execution information
"""
# Check output correctness
expected = dataset_item.get("expected_output")
actual = task_outputs.get("output")
correctness_score = 1.0 if expected == actual else 0.0
# Check execution efficiency
if task_span.start_time and task_span.end_time:
duration = (task_span.end_time - task_span.start_time).total_seconds()
efficiency_score = 1.0 if duration < 2.0 else 0.5
else:
efficiency_score = 0.0
# Combined score (weighted average)
final_score = (correctness_score * 0.7) + (efficiency_score * 0.3)
return score_result.ScoreResult(
name="comprehensive_scorer",
value=final_score,
reason=f"Correctness: {correctness_score}, Efficiency: {efficiency_score}"
)
Task span scoring functions work seamlessly with the evaluation framework:
<CodeBlocks> ```python title="Python" language="python" from opik import track@track # Enable span collection for task span metrics def evaluation_task(item): return {"output": your_llm_application(item["input"])}
evaluation = evaluate( dataset=dataset, task=evaluation_task, # Must be decorated with @track scoring_functions=[ execution_time_scorer, task_name_scorer, comprehensive_scorer # Mix different types ], experiment_name="Task Span Evaluation" )
</CodeBlocks>
<Tip>
When you use task span scoring functions, Opik automatically enables span collection and analysis. You don't need to configure anything special - the system will detect functions with `task_span` parameters and handle them appropriately.
</Tip>
<Warning>
Task span scoring functions have access to detailed execution information including inputs, outputs, and metadata. Be mindful of sensitive data and ensure your functions handle this information appropriately.
</Warning>
### Using task span evaluation metrics
Opik supports advanced evaluation metrics that can analyze the detailed execution information of your LLM tasks. These metrics receive a `task_span` parameter containing structured data about the task execution, including input, output, metadata, and nested operations.
Task span metrics are particularly useful for evaluating:
- The internal structure and behavior of your LLM applications
- Performance characteristics like execution patterns
- Quality of intermediate steps in complex workflows
- Cost and usage optimization opportunities
- Agent trajectory
#### Creating task span metrics
To create a task span evaluation metric, define a metric class that accepts a `task_span` parameter in its `score` method. The `task_span` parameter is a [`SpanModel`](https://www.comet.com/docs/opik/python-sdk-reference/message_processing_emulation/SpanModel.html) object that contains detailed information about the task execution:
<CodeBlocks>
```python title="Python" language="python"
from typing import Any, Optional
from opik.evaluation.metrics import BaseMetric, score_result
from opik.message_processing.emulation.models import SpanModel
class ExecutionTimeMetric(BaseMetric):
def score(self, task_span: SpanModel, \*\*ignored_kwargs: Any) -> score_result.ScoreResult: # Calculate execution duration
if task_span.start_time and task_span.end_time:
duration = (task_span.end_time - task_span.start_time).total_seconds()
# Score based on execution speed
if duration < 1.0:
score = 1.0
reason = f"Fast execution: {duration:.2f}s"
elif duration < 5.0:
score = 0.8
reason = f"Acceptable execution time: {duration:.2f}s"
else:
score = 0.5
reason = f"Slow execution: {duration:.2f}s"
else:
score = 0.0
reason = "Cannot determine execution time"
return score_result.ScoreResult(
value=score,
name=self.name,
reason=reason
)
Task span metrics work alongside regular evaluation metrics and are automatically detected by the evaluation engine:
<CodeBlocks> ```python title="Python" language="python" from opik import evaluate from opik.evaluation.metrics import Equalsequals_metric = Equals() timing_metric = ExecutionTimeMetric()
evaluation = evaluate( dataset=dataset, task=evaluation_task, scoring_metrics=[ equals_metric, # Regular metric timing_metric, # Task span metric ], experiment_name="Comprehensive Evaluation" )
</CodeBlocks>
<Tip>
When you use task span metrics, Opik automatically enables span collection and
analysis. You don't need to configure anything special - the system will
detect metrics with `task_span` parameters and handle them appropriately.
</Tip>
#### Accessing span hierarchy
Task spans can contain nested spans representing sub-operations. You can analyze the complete execution hierarchy.
Here's an example of a tracked function that produces nested spans:
<CodeBlocks>
```python title="Python" language="python"
from opik import track
from opik.integrations.openai import track_openai
import openai
openai_client = track_openai(openai.OpenAI())
@track
def research_topic(topic: str) -> str:
"""Main research function that creates nested spans."""
# This will create a nested span for gathering context
context = gather_context(topic)
# This will create another nested span for analysis
analysis = analyze_information(context, topic)
# Final span for generating summary
summary = generate_summary(analysis, topic)
return summary
@track
def gather_context(topic: str) -> str:
"""Gather background context - creates its own span."""
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": f"Provide background context about: {topic}"
}]
)
return response.choices[0].message.content
@track
def analyze_information(context: str, topic: str) -> str:
"""Analyze the gathered information - creates its own span."""
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": f"Analyze this context about {topic}: {context}"
}]
)
return response.choices[0].message.content
@track
def generate_summary(analysis: str, topic: str) -> str:
"""Generate final summary - creates its own span."""
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": f"Create a summary for {topic} based on: {analysis}"
}]
)
return response.choices[0].message.content
When you call research_topic("artificial intelligence"), Opik will create a hierarchy of spans:
You can then analyze this complete execution hierarchy using task span metrics:
<CodeBlocks> ```python title="Python" language="python" class HierarchyAnalysisMetric(BaseMetric): def _analyze_hierarchy_recursively(self, span: SpanModel, hierarchy_stats: dict = None) -> dict: """Recursively analyze span hierarchy across the entire span tree.""" if hierarchy_stats is None: hierarchy_stats = { 'total_spans': 0, 'llm_spans': 0, 'tool_spans': 0, 'other_spans': 0, 'max_depth': 0, 'current_depth': 0, 'llm_span_names': [], 'tool_span_names': [] } # Count current span
hierarchy_stats['total_spans'] += 1
hierarchy_stats['max_depth'] = max(hierarchy_stats['max_depth'], hierarchy_stats['current_depth'])
# Categorize span types
if span.type == "llm":
hierarchy_stats['llm_spans'] += 1
hierarchy_stats['llm_span_names'].append(span.name)
elif span.type == "tool":
hierarchy_stats['tool_spans'] += 1
hierarchy_stats['tool_span_names'].append(span.name)
else:
hierarchy_stats['other_spans'] += 1
# Recursively analyze nested spans with depth tracking
for nested_span in span.spans:
hierarchy_stats['current_depth'] += 1
self._analyze_hierarchy_recursively(nested_span, hierarchy_stats)
hierarchy_stats['current_depth'] -= 1
return hierarchy_stats
def score(self, task_span: SpanModel, **ignored_kwargs: Any) -> score_result.ScoreResult:
# Analyze hierarchy across the entire span tree
# Only for illustrative purposes.
# Please adjust for your specific use case!
hierarchy_stats = self._analyze_hierarchy_recursively(task_span)
total_operations = hierarchy_stats['total_spans']
llm_operations = hierarchy_stats['llm_spans']
tool_operations = hierarchy_stats['tool_spans']
max_depth = hierarchy_stats['max_depth']
# Analyze the complexity and structure of the operation
if llm_operations > 5:
# Many LLM calls might indicate inefficient processing
if tool_operations == 0:
score = 0.4
reason = f"Over-complex operation: {llm_operations} LLM calls with no tool usage (depth: {max_depth})"
else:
score = 0.6
reason = f"Complex operation: {llm_operations} LLM calls, {tool_operations} tool calls (depth: {max_depth})"
elif llm_operations == 0:
# No reasoning might indicate a purely mechanical process
score = 0.3 if tool_operations > 0 else 0.1
reason = f"No reasoning detected: {tool_operations} tool calls only" if tool_operations > 0 else "No LLM or tool operations detected"
else:
# Balanced approach with reasonable LLM usage
balance_ratio = min(llm_operations, tool_operations) / max(llm_operations, tool_operations) if tool_operations > 0 else 0.8
depth_bonus = 1.0 if max_depth <= 3 else max(0.8, 1.0 - (max_depth - 3) * 0.05)
score = min(1.0, 0.7 + balance_ratio * 0.2 + depth_bonus * 0.1)
if tool_operations > 0:
reason = f"Well-structured operation: {llm_operations} LLM calls, {tool_operations} tool calls across {total_operations} spans (depth: {max_depth})"
else:
reason = f"Reasoning-focused operation: {llm_operations} LLM calls across {total_operations} spans (depth: {max_depth})"
return score_result.ScoreResult(
value=score,
name=self.name,
reason=reason
)
</CodeBlocks>
For the `SpanModel`'s hierarchy given above the `HierarchyAnalysisMetric` metric's score will be:
<CodeBlocks>
Score: 0.96, Reason: Reasoning-focused operation: 3 LLM calls across 7 spans (depth: 2)
</CodeBlocks>
#### Quickly testing task span metrics locally
You can validate a task span metric without running a full evaluation by recording spans locally. The SDK provides a context manager that captures all spans/traces created in the block and exposes them in-memory:
<CodeBlocks>
```python title="Python" language="python"
import opik
from opik import track
from opik.evaluation.metrics import score_result
from opik.message_processing.emulation.models import SpanModel
# Example metric under test
class ExecutionTimeMetric:
def __init__(self, name: str = "execution_time_metric"):
self.name = name
def score(self, task_span: SpanModel, **_):
if task_span.start_time and task_span.end_time:
duration = (task_span.end_time - task_span.start_time).total_seconds()
value = 1.0 if duration < 2.0 else 0.5
reason = f"Duration: {duration:.2f}s"
else:
value = 0.0
reason = "Missing timing information"
return score_result.ScoreResult(value=value, name=self.name, reason=reason)
@track
def my_tracked_function(question: str) -> str:
# Your LLM/tool code here that produces spans
return f"Answer to: {question}"
with opik.record_traces_locally() as storage:
# Execute tracked code that creates spans
_ = my_tracked_function("What is the capital of France?")
# Access the in-memory span tree (flush is automatic before reading)
span_trees = storage.span_trees
assert len(span_trees) > 0, "No spans recorded"
root_span = span_trees[0]
# Evaluate your task span metric directly
metric = ExecutionTimeMetric()
result = metric.score(task_span=root_span)
print(result)
```
</CodeBlocks>
<Warning>
Local recording cannot be nested. If a recording block is already active, entering another will raise an error.
</Warning>
#### Best practices for task span metrics
1. **Focus on execution patterns**: Use task span metrics to evaluate how your application executes, not just the final output
2. **Combine with regular metrics**: Mix task span metrics with traditional output-based metrics for comprehensive evaluation
3. **Analyze performance**: Leverage timing, cost, and usage information for optimization insights
4. **Handle missing data gracefully**: Always check for None values in optional span attributes
<Warning>
Task span metrics have access to detailed execution information including inputs, outputs, and metadata. Be mindful of sensitive data and ensure your metrics handle this information appropriately.
</Warning>
### Accessing logged experiments
You can access all the experiments logged to the platform from the SDK with the
`get experiment by name` methods:
<CodeBlocks>
```typescript title="TypeScript" language="typescript"
import { Opik } from "opik";
const client = new Opik({
apiKey: "your-api-key",
apiUrl: "https://www.comet.com/opik/api",
projectName: "your-project-name",
workspaceName: "your-workspace-name",
});
const experiments = await client.getExperimentsByName("My experiment");
// Access the first experiment content
const items = await experiments[0].getItems();
console.log(items);
```
```python title="Python" language="python"
import opik
# Get the experiment
opik_client = opik.Opik()
experiments = opik_client.get_experiments_by_name("My experiment")
# Access the first experiment content
items = experiments[0].get_items()
print(items)
```
</CodeBlocks>