Back to Graphrag

Metrics

packages/graphrag-llm/notebooks/04_metrics.ipynb

3.0.912.0 KB
Original Source

Metrics

Metrics are automatically tracked for completion and embedding calls.

python
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

import json
import os

from dotenv import load_dotenv
from graphrag_llm.completion import LLMCompletion, create_completion
from graphrag_llm.config import AuthMethod, ModelConfig

load_dotenv()

api_key = os.getenv("GRAPHRAG_API_KEY")
model_config = ModelConfig(
    model_provider="azure",
    model=os.getenv("GRAPHRAG_MODEL", "gpt-4o"),
    azure_deployment_name=os.getenv("GRAPHRAG_MODEL", "gpt-4o"),
    api_base=os.getenv("GRAPHRAG_API_BASE"),
    api_version=os.getenv("GRAPHRAG_API_VERSION", "2025-04-01-preview"),
    api_key=api_key,
    auth_method=AuthMethod.AzureManagedIdentity if not api_key else AuthMethod.ApiKey,
)
llm_completion: LLMCompletion = create_completion(model_config)

response = llm_completion.completion(
    messages="What is the capital of France?",
)

print(f"Metrics for: {llm_completion.metrics_store.id}")
print(json.dumps(llm_completion.metrics_store.get_metrics(), indent=2))

Disable Metrics

Set metrics to None in the ModelConfig to disable metrics.

python
model_config.metrics = None
llm_completion_no_metrics: LLMCompletion = create_completion(model_config)

response = llm_completion_no_metrics.completion(
    messages="What is the capital of France?",
)

# Now .metrics_store should be a NoOpMetricsStore
print(f"Metrics for: {llm_completion_no_metrics.metrics_store.id}")
print(json.dumps(llm_completion_no_metrics.metrics_store.get_metrics(), indent=2))

Automatic Metrics Logging

Metrics foreach instantiated model are automatically logged on process exit. To see this, update the log level to info.

python
import logging

logging.basicConfig(level=logging.INFO)

llm_completion.metrics_store.clear_metrics()
response = llm_completion.completion(
    messages="What is the capital of France?",
)

# NOTE: Call _on_exit_ to simulate application exit since
# the notebook process does not exit and the llm_completion
# object is not garbage collected.
# This should not be called in normal python scripts.
llm_completion.metrics_store._on_exit_()  # type: ignore

Save Metrics to a File

Instead of logging on exit, metrics can automatically be saved to a file on exit by using a MetricsWriter.File metrics writer.

python
from pathlib import Path

from graphrag_llm.config import MetricsConfig, MetricsWriterType

model_config.metrics = MetricsConfig(
    writer=MetricsWriterType.File,
    base_dir="./metrics",  # Default
)
llm_completion: LLMCompletion = create_completion(model_config)

response = llm_completion.completion(
    messages="What is the capital of France?",
)

# NOTE: Call _on_exit_ to simulate application exit since
# the notebook process does not exit and the llm_completion
# object is not garbage collected.
# This should not be called in normal python scripts.
llm_completion.metrics_store._on_exit_()  # type: ignore

metrics_dir = Path("./metrics")
for metric_file in metrics_dir.glob("*.jsonl"):
    print(f"Contents of {metric_file}:")
    print(metric_file.read_text())
    break  # Just print one file for brevity

Default Metrics

  • attempted_request_count: Number of network requests made, not including retries.
  • successful_response_count: Number of successful responses.
  • failed_response_count: Number of network requests that threw errors and could not be resolved even after retries. successful_response_count + failed_response_count should equal attempted_request_count unless the job or process was killed early.
  • failure_rate: failed_response_count / attempted_request_count.
  • requests_with_retries: Number of original requests that had to go through a retry loop.
  • retries: Number of network requests that were retries.
  • retry_rate: retries / (retries + attempted_request_count)
  • compute_duration_seconds: Total number of seconds to complete all non-streaming network requests.
  • compute_duration_per_response_seconds: compute_duration_seconds / successful non-streaming responses
  • runtime_duration_seconds: Only present if using the batching utilities. The batching utilities run multiple completions/embedding in parallel so runtime_duration_seconds is the actual runtime duration. Comparing this with compute_duration_seconds indicates how much time was saved using the batching utilities vs if all network requests ran in series.
  • cached_responses: Number of cached responses. Only present if using a cache. When a response is cached so are the corresponding metrics. When a response is retrieved from the cache the corresponding metrics are also retrieved from the cache and provided in the overall metrics so metrics like compute_duration_seconds, input_cost, output_cost, etc include cached rsponses metrics. This is helpful when having to resume stopped jobs or rerunning failed jobs. At the end of the job the metrics indicate how long and costly the job would have been when running off a fresh cache/no cache. The cached_responses only indicates how many network requests were skipped and retrieved from cache.
  • streaming_responses: Number of requests using the stream=True parameter. Many metrics such as token counts and costs are not tracked for streaming requests as that would require analyzing the stream to completion within the middleware stack and preventing the ability to build true streaming interfaces with graphrag-llm
  • responses_with_tokens: Number of responses in which token counts were obtained. Typically this should equal successful_response_count - streaming_responses.
  • prompt_tokens: Total number of prompt tokens used accross all successful non-streaming network requests.
  • completion_tokens: Total number of completion tokens accress all succesful non-streaming network requests.
  • total_tokens: prompt_tokens + completion_tokens
  • tokens_per_response: total_tokens / responses_with_tokens
  • responses_with_cost: Number of responses in which costs were calculated. typically this should equal successful_response_count - streaming_responses.
  • input_cost: Cost of the input tokens accross all successful non-streaming network requests.
  • output_cost: Cost of the output tokens accross all successful non-streaming network requests.
  • total_cost: input_cost + output_cost
  • cost_per_response: total_cost / responses_with_cost.

Custom Model Costs

The default metrics include costs for prompt tokens and completion tokens. These are calculated using a registry of known models and associated costs managed by litellm: https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json

One can register custom model costs if using a custom model that is not in the registry or one that differs from the known/default cost.

python
from graphrag_llm.model_cost_registry import model_cost_registry

model_cost_registry.register_model_costs(
    model="azure/gpt-4o",  # This should use format "{model_provider}/{model_name}" and not the azure deployment name
    costs={
        # Expensive model
        "input_cost_per_token": 1000,
        "output_cost_per_token": 5000,
    },
)

llm_completion.metrics_store.clear_metrics()
response = llm_completion.completion(
    messages="What is the capital of France?",
)

print(json.dumps(llm_completion.metrics_store.get_metrics(), indent=2))

Custom Metrics Processor

It is possible to register a custom metrics processor if one needs to track metrics not already tracked.

python
import json
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any

from dotenv import load_dotenv
from graphrag_llm.completion import LLMCompletion, create_completion
from graphrag_llm.config import MetricsConfig, MetricsWriterType, ModelConfig
from graphrag_llm.metrics import metrics_aggregator, register_metrics_processor
from graphrag_llm.metrics.default_metrics_processor import DefaultMetricsProcessor
from graphrag_llm.types import (
    LLMCompletionChunk,
    LLMCompletionResponse,
    LLMEmbeddingResponse,
    Metrics,
)

load_dotenv()


class MyCustomMetricsProcessor(DefaultMetricsProcessor):
    """Custom metrics processor.

    Inheriting from DefaultMetricsProcessor to add to the default metrics being
    tracked instead of implementing the interface from scratch.

    Metrics = dict[str, float]. The metrics passed to process_metrics method
    represent the metrics for a single request. Typically, you will count/flag
    metrics of interest per request and then aggregate them in the metrics_aggregator.
    """

    def __init__(self, some_custom_option: str, **kwargs: Any) -> None:
        """Initialize the custom metrics processor."""
        super().__init__(**kwargs)
        self._some_custom_option = some_custom_option  # Not actually used

    def process_metrics(
        self,
        *,
        model_config: ModelConfig,
        metrics: Metrics,
        input_args: dict[str, Any],
        response: LLMCompletionResponse
        | Iterator[LLMCompletionChunk]
        | AsyncIterator[LLMCompletionChunk]
        | LLMEmbeddingResponse,
    ) -> None:
        """On top of the default metrics, track if temperature argument was used.

        Expected to mutate the metrics dict in place with metrics you want to track.

        process_metrics is only called for successful requests and will be passed in the response
        from either a completion or embedding call.

        Args
        ----
            model_config: ModelConfig
                The model config used for the request.
            metrics: Metrics
                The metrics dict to be mutated in place.
            input_args: dict[str, Any]
                The input arguments passed to completion or embedding.
            response: LLMChatCompletion | Iterator[LLMChatCompletionChunk] | LLMEmbeddingResponse
                Either a completion or embedding response from the LLM.
        """
        # Track default metrics first
        super().process_metrics(
            model_config=model_config,
            metrics=metrics,
            input_args=input_args,
            response=response,
        )

        metrics["responses_with_temperature"] = 1 if "temperature" in input_args else 0


# Register custom metrics processor
register_metrics_processor(
    processor_type="custom_with_temperature",
    processor_initializer=MyCustomMetricsProcessor,
)


# Custom aggregator to calculate temperature usage rate
def _temperature_rate(metrics: "Metrics") -> None:
    """Calculate temperature usage rate.

    Custom aggregate function to track the usage rate of temperature parameter.

    Here, metrics represents the aggregated metrics for the current model.
    """
    responses = metrics.get("successful_response_count", 0)
    temperature_responses = metrics.get("responses_with_temperature", 0)
    if responses > 0:
        metrics["temperature_rate"] = temperature_responses / responses
    else:
        metrics["temperature_rate"] = 0.0


# Register custom aggregator
metrics_aggregator.register("temperature_rate", _temperature_rate)

api_key = os.getenv("GRAPHRAG_API_KEY")
model_config = ModelConfig(
    model_provider="azure",
    model=os.getenv("GRAPHRAG_MODEL", "gpt-4o"),
    azure_deployment_name=os.getenv("GRAPHRAG_MODEL", "gpt-4o"),
    api_base=os.getenv("GRAPHRAG_API_BASE"),
    api_version=os.getenv("GRAPHRAG_API_VERSION", "2025-04-01-preview"),
    api_key=api_key,
    auth_method=AuthMethod.AzureManagedIdentity if not api_key else AuthMethod.ApiKey,
    metrics=MetricsConfig(
        # Use the custom metrics processor registered above
        type="custom_with_temperature",
        some_custom_option="example_option_value",  # type: ignore
        writer=MetricsWriterType.File,
        base_dir="./metrics",  # Default
    ),
)
llm_completion: LLMCompletion = create_completion(model_config)

response = llm_completion.completion(
    messages="What is the capital of France?",
)

response_with_temperature = llm_completion.completion(
    messages="What is the capital of France?",
    temperature=0.7,
)

print(f"Metrics for: {llm_completion.metrics_store.id}")
print(json.dumps(llm_completion.metrics_store.get_metrics(), indent=2))