Back to Baml

Collector

fern/03-reference/baml_client/collector.mdx

0.222.026.4 KB
Original Source
<Info> This feature was added in 0.79.0 </Info>

The Collector allows you to inspect the internal state of BAML function calls, including raw HTTP requests, responses, usage metrics, and timing information, so you can always see the raw data, without any abstraction layers.

Quick Start

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector

Create a collector with optional name

collector = Collector(name="my-collector")

Use it with a function call

result = b.ExtractResume("...", baml_options={"collector": collector})

Access logging information

print(collector.last.usage) # Print usage metrics print(collector.last.raw_llm_response) # Print final response as string

since there may be retries, print the last http response received

print(collector.last.calls[-1].http_response)

</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import { b } from 'baml_client'
import { Collector } from '@boundaryml/baml'

// Create a collector with optional name
const collector = new Collector("my-collector")

// Use it with a function call
const result = await b.ExtractResume("...", { collector })

// Access logging information
console.log(collector.last?.usage)  // Print usage metrics
console.log(collector.last?.rawLlmResponse)  // Print final response
// since there may be retries, print the last http response received
console.log(collector.last?.calls[-1].httpResponse)
</Tab> <Tab title="Ruby" language="ruby"> ```ruby require_relative "baml_client/client" b = Baml.Client

Create a collector with optional name

collector = Baml::Collector.new(name: "my-collector")

Use it with a function call

res = b.ExtractResume(input: '...', baml_options: { collector: collector })

Access logging information

print(collector.last.usage) # Print usage metrics print(collector.last.calls[-1].http_response) # Print final response print(collector.last.raw_llm_response) # a string of the last response made

</Tab>

<Tab title="Rust" language="rust">
```rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::new_collector;

// Create a collector with optional name
let collector = new_collector("my-collector");

// Use it with a function call
let result = B.ExtractResume
    .with_collector(&collector)
    .call("...")
    .unwrap();

// Access logging information
let logs = collector.logs();
if let Some(log) = logs.last() {
    println!("{:?}", log.usage());
}
</Tab> </Tabs>

Common Use Cases

Basic Logging

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector # Import the Collector class

def run(): # Create a collector instance with an optional name collector = Collector(name="my-collector") # collector will be modified by the function to include all internal state res = b.ExtractResume("...", baml_options={"collector": collector}) # This will print the return type of the function print(res)

# This is guaranteed to be set by the function
assert collector.last is not None

# This will print the id of the last request
print(collector.last.id)

# This will print the usage of the last request
# (This aggregates usage from all retries if there was usage emitted)
print(collector.last.usage)

# This will print the raw response of the last request
print(collector.last.calls[-1].http_response)

# This will print the raw text we used to run the parser.
print(collector.last.raw_llm_response)
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import {b} from 'baml_client'
import {Collector} from '@boundaryml/baml'

async function run() {
    // Create a collector instance with an optional name
    const collector = new Collector("my-collector")
    // collector will be modified by the function to include all internal state
    const res = await b.ExtractResume("...", { collector })
    // This will print the return type of the function
    console.log(res)

    // This is guaranteed to be set by the function
    assert(collector.last)

    // This will print the id of the last request
    console.log(collector.last.id)

    // This will print the usage of the last request
    // (This aggregates usage from all retries if there was usage emitted)
    console.log(collector.last.usage)

    // This will print the raw response of the last request
    console.log(collector.last.calls[-1].httpResponse)

    // This will print the raw text we used to run the parser.
    console.log(collector.last.rawLlmResponse)
}
</Tab> <Tab title="Ruby" language="ruby"> ```ruby require_relative "baml_client/client" b = Baml.Client

def run # Create a collector instance collector = Baml::Collector.new(name: "my-collector") # The function will now use the collector to track internal state res = b.ExtractResume(input: 'hi there', baml_options: { collector: collector })

# This will print the return type of the function
print(res)

# This is guaranteed to be set by the function
raise "Assertion failed" unless collector.last

# This will print the id of the last request
print(collector.last.id)

# This will print the usage of the last request
# (This aggregates usage from all retries if there was usage emitted)
print(collector.last.usage)

# This will print the raw response of the last request
print(collector.last.calls[-1].http_response)

# This will print the raw text we used to run the parser.
print(collector.last.raw_llm_response)

end

Call the function

run

</Tab>

<Tab title="Rust" language="rust">
```rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::new_collector;

fn run() {
    let collector = new_collector("my-collector");
    let res = B.ExtractResume
        .with_collector(&collector)
        .call("...")
        .unwrap();
    println!("{:?}", res);

    let logs = collector.logs();
    let last = logs.last().expect("Should have at least one log");

    // Print the id of the last request
    println!("{}", last.id());

    // Print usage (aggregated from all retries)
    println!("{:?}", last.usage());

    // Print function name
    println!("{}", last.function_name());
}
</Tab> </Tabs>

Tags

You can attach custom metadata to function calls using tags. These can come from a parent trace context or be specified per call.

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_client.tracing import trace, set_tags from baml_py import Collector

@trace async def run_with_tags(): # Parent trace tags set_tags(parent_id="p123", run="xyz")

collector = Collector(name="tags-collector")

# Per-call tags via baml_options
await b.TestOpenAIGPT4oMini(
    "hi",
    baml_options={"collector": collector, "tags": {"call_id": "first"}},
)

# Retrieve merged tags from the last log
log = collector.last
assert log is not None
print(log.tags)  # {'parent_id': 'p123', 'run': 'xyz', 'call_id': 'first'}
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import { b } from "baml_client";
import { Collector } from "@boundaryml/baml";
import { traceAsync, setTags } from "../baml_client/tracing";

const parent = traceAsync("parentTS", async () => {
  setTags({ parentId: "p123", run: "xyz" });
  const collector = new Collector("tags-collector");
  await b.TestOpenAIGPT4oMini("hi", { collector, tags: { callId: "first" } });
  const tags = collector.last!.tags;
  console.log(tags);
});

await parent();
</Tab> <Tab title="Go" language="go"> ```go ctx := context.Background() collector, _ := b.NewCollector("tags-collector") _, _ = b.TestOpenAIGPT4oMini( ctx, "hi", b.WithCollector(collector), b.WithTags(map[string]string{"callId": "first", "version": "v1"}), )

logs, _ := collector.Logs() if len(logs) > 0 { tags, _ := logs[0].Tags() fmt.Printf("Tags: %+v\n", tags) }

</Tab>

<Tab title="Rust" language="rust">
```rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::new_collector;

let collector = new_collector("tags-collector");
let result = B.TestOpenAIGPT4oMini
    .with_collector(&collector)
    .with_tag("call_id", "first")
    .with_tag("version", "v1")
    .call("hi")
    .unwrap();

let logs = collector.logs();
if let Some(log) = logs.last() {
    println!("Tags: {:?}", log.tags());
}
</Tab> </Tabs>

Managing Collector State

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector

def run(): collector = Collector(name="reusable-collector") res = b.ExtractResume("...", baml_options={"collector": collector})

# Reuse the same collector
res = b.TestOpenAIGPT4oMini("Second call", baml_options={"collector": collector})
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import {b} from 'baml_client'
import {Collector} from '@boundaryml/baml'

async function run() {
    const collector = new Collector("reusable-collector")
    const res = await b.ExtractResume("...", { collector })

    // Reuse the same collector
    const res2 = await b.ExtractResume("...", { collector })
}
</Tab> <Tab title="Ruby" language="ruby"> ```ruby require_relative "baml_client/client" b = Baml.Client

def run collector = Baml::Collector.new(name: "reusable-collector") res = b.ExtractResume(input: 'First call', baml_options: { collector: collector })

# Reuse the same collector
res = b.ExtractResume(input: 'Second call', baml_options: { collector: collector })

end

</Tab>

<Tab title="Rust" language="rust">
```rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::new_collector;

let collector = new_collector("reusable-collector");
let res = B.ExtractResume
    .with_collector(&collector)
    .call("First call")
    .unwrap();

// Reuse the same collector
let res2 = B.ExtractResume
    .with_collector(&collector)
    .call("Second call")
    .unwrap();
</Tab> </Tabs>

Using Multiple Collectors

You can use multiple collectors to track different aspects of your application:

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector

def run(): # Create separate collectors for different parts of your application collector_a = Collector(name="collector-a") collector_b = Collector(name="collector-b")

# Use both collectors for the same function call
res = b.ExtractResume("...", baml_options={"collector": [collector_a, collector_b]})

# Both collectors will have the same logs
assert collector_a.last.usage.input_tokens == collector_b.last.usage.input_tokens

# Use only collector_a for another call
res2 = b.TestOpenAIGPT4oMini("another call", baml_options={"collector": collector_a})

# collector_a will have 2 logs, collector_b will still have 1
assert len(collector_a.logs) == 2
assert len(collector_b.logs) == 1
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import {b} from 'baml_client'
import {Collector} from '@boundaryml/baml'

async function run() {
    // Create separate collectors for different parts of your application
    const collector_a = new Collector("collector-a")
    const collector_b = new Collector("collector-b")
    
    // Use both collectors for the same function call
    const res = await b.ExtractResume("...", { collector: [collector_a, collector_b] })
    
    // Both collectors will have the same logs
    assert(collector_a.last?.usage.inputTokens === collector_b.last?.usage.inputTokens)
    
    // Use only collector_a for another call
    const res2 = await b.ExtractResume("...", { collector: collector_a })
    
    // collector_a will have 2 logs, collector_b will still have 1
    assert(collector_a.logs.length === 2)
    assert(collector_b.logs.length === 1)
}
</Tab> <Tab title="Ruby" language="ruby"> ```ruby require_relative "baml_client/client" b = Baml.Client def run # Create separate collectors for different parts of your application collector_a = Baml::Collector.new(name: "collector-a") collector_b = Baml::Collector.new(name: "collector-b")
# Use both collectors for the same function call
res = b.ExtractResume(input: 'hi there', baml_options: { collector: [collector_a, collector_b] })

# Both collectors will have the same logs
raise "Assertion failed" unless collector_a.last.usage.input_tokens == collector_b.last.usage.input_tokens

# Use only collector_a for another call
res2 = b.ExtractResume(input: 'another call', baml_options: { collector: collector_a })

# collector_a will have 2 logs, collector_b will still have 1
raise "Assertion failed" unless collector_a.logs.length == 2
raise "Assertion failed" unless collector_b.logs.length == 1

end

</Tab>

<Tab title="Rust" language="rust">
```rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::new_collector;

let collector_a = new_collector("collector-a");
let collector_b = new_collector("collector-b");

// Use both collectors for the same function call
let res = B.ExtractResume
    .with_collectors(&[collector_a.clone(), collector_b.clone()])
    .call("...")
    .unwrap();

// Use only collector_a for another call
let res2 = B.ExtractResume
    .with_collector(&collector_a)
    .call("another call")
    .unwrap();

// collector_a will have 2 logs, collector_b will still have 1
assert_eq!(collector_a.logs().len(), 2);
assert_eq!(collector_b.logs().len(), 1);
</Tab> </Tabs>

Usage Tracking

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector

def run(): collector_a = Collector(name="collector-a") res = b.ExtractResume("...", baml_options={"collector": collector_a})

collector_b = Collector(name="collector-b")
res = b.ExtractResume("...", baml_options={"collector": collector_b})

# The total usage of both logs is now available
print(collector_a.usage)
print(collector_b.usage)
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import {b} from 'baml_client'
import {Collector} from '@boundaryml/baml'

async function run() {
    const collector_a = new Collector("collector-a")
    const res = await b.ExtractResume("...", { collector: collector_a })

    const collector_b = new Collector("collector-b")
    const res2 = await b.ExtractResume("...", { collector: collector_b })
    // The total usage of both logs is now available
    console.log(collector_a.usage)
    console.log(collector_b.usage)
}
</Tab> <Tab title="Ruby" language="ruby"> ```ruby require_relative "baml_client/client"

def run collector_a = Baml::Collector.new(name: "collector-a") res = Baml.Client.ExtractResume(input: 'First call', baml_options: { collector: collector_a })

collector_b = Baml::Collector.new(name: "collector-b")
res = Baml.Client.ExtractResume(input: 'Second call', baml_options: { collector: collector_b })


# The total usage of both logs is now available
print(collector_a.usage)
print(collector_b.usage)

end

</Tab>

<Tab title="Rust" language="rust">
```rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::new_collector;

let collector_a = new_collector("collector-a");
let res = B.ExtractResume
    .with_collector(&collector_a)
    .call("...")
    .unwrap();

let collector_b = new_collector("collector-b");
let res2 = B.ExtractResume
    .with_collector(&collector_b)
    .call("...")
    .unwrap();

// The total usage of both logs is now available
println!("{:?}", collector_a.usage());
println!("{:?}", collector_b.usage());
</Tab> </Tabs>

Accessing SSE Responses (Streaming)

When using streaming, you can access the raw Server-Sent Events (SSE) responses received from the LLM provider. This is useful for debugging, logging, or accessing provider-specific data not exposed in the standard response.

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector

async def run(): collector = Collector(name="stream-collector")

# Use streaming
stream = b.stream.ExtractResume("...", baml_options={"collector": collector})
async for chunk in stream:
    print(chunk)  # Process streamed chunks

# After streaming completes, access the raw SSE responses
log = collector.last
if log and log.selected_call:
    sse_responses = log.selected_call.sse_responses()
    if sse_responses:
        for sse in sse_responses:
            # Raw text of the SSE data field
            print(f"Raw: {sse.text}")
            # Parse as JSON if valid (returns None if not valid JSON)
            parsed = sse.json()
            if parsed:
                print(f"JSON: {parsed}")
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import { b } from 'baml_client'
import { Collector } from '@boundaryml/baml'

async function run() {
    const collector = new Collector("stream-collector")

    // Use streaming
    const stream = b.stream.ExtractResume("...", { collector })
    for await (const chunk of stream) {
        console.log(chunk)  // Process streamed chunks
    }

    // After streaming completes, access the raw SSE responses
    const log = collector.last
    if (log?.selectedCall) {
        const sseResponses = log.selectedCall.sseResponses()
        if (sseResponses) {
            for (const sse of sseResponses) {
                // Raw text of the SSE data field
                console.log(`Raw: ${sse.text}`)
                // Parse as JSON if valid (returns null if not valid JSON)
                const parsed = sse.json()
                if (parsed) {
                    console.log(`JSON: ${JSON.stringify(parsed)}`)
                }
            }
        }
    }
}
</Tab> </Tabs> <Info> SSE responses capture the raw streaming data from the provider. For HTTP-based providers (OpenAI, Anthropic, Google, etc.), this is the actual SSE event data. For AWS Bedrock, which uses a binary protocol, the responses contain a JSON wrapper with the Debug representation of the SDK types (see [AWS SDK serialization issue](https://github.com/awslabs/aws-sdk-rust/issues/645)). </Info>

Cached Token Tracking

When using providers that support prompt caching (like Anthropic, OpenAI, Google, or Vertex), you can track cached input tokens via the cached_input_tokens field:

<Tabs> <Tab title="Python" language="python"> ```python from baml_client import b from baml_py import Collector

async def run(): collector = Collector(name="cache-tracker")

# First call - content will be cached by the provider
res = await b.TestCaching(large_content, "Question 1", baml_options={"collector": collector})

# Second call with same content - should use cached tokens
res2 = await b.TestCaching(large_content, "Question 2", baml_options={"collector": collector})

# Access cached token counts
first_log = collector.logs[0]
second_log = collector.logs[1]

print(f"First call cached tokens: {first_log.usage.cached_input_tokens}")
print(f"Second call cached tokens: {second_log.usage.cached_input_tokens}")

# Collector aggregates cached tokens across all calls
print(f"Total cached tokens: {collector.usage.cached_input_tokens}")

# You can also access cached tokens per LLM call (including retries)
print(f"Per-call cached tokens: {first_log.calls[0].usage.cached_input_tokens}")
</Tab>

<Tab title="TypeScript" language="typescript">
```typescript
import { b } from 'baml_client'
import { Collector } from '@boundaryml/baml'

async function run() {
    const collector = new Collector("cache-tracker")
    
    // First call - content will be cached by the provider
    const res = await b.TestCaching(largeContent, "Question 1", { collector })
    
    // Second call with same content - should use cached tokens
    const res2 = await b.TestCaching(largeContent, "Question 2", { collector })
    
    // Access cached token counts
    const firstLog = collector.logs[0]
    const secondLog = collector.logs[1]
    
    console.log(`First call cached tokens: ${firstLog.usage.cachedInputTokens}`)
    console.log(`Second call cached tokens: ${secondLog.usage.cachedInputTokens}`)
    
    // Collector aggregates cached tokens across all calls
    console.log(`Total cached tokens: ${collector.usage.cachedInputTokens}`)
    
    // You can also access cached tokens per LLM call (including retries)
    console.log(`Per-call cached tokens: ${firstLog.calls[0].usage?.cachedInputTokens}`)
}
</Tab> </Tabs> <Info> Cached token tracking is supported for Anthropic, OpenAI, Google AI, and Vertex AI providers. AWS Bedrock does not currently support cached token reporting and will return `null` for this field. </Info>

API Reference

Collector Class

The Collector class provides properties to introspect the internal state of BAML function calls.

PropertyTypeDescription
logsList[FunctionLog]A list of all function calls (ordered from oldest to newest)
lastFunctionLog | nullThe most recent function log.
usageUsageThe cumulative total usage of all requests this collector has tracked. This includes all retries and fallbacks, if those did use any tokens.

The Collector class provides the following methods:

MethodTypeDescription
(removed)IDs are not exposed in the client. Use tags to correlate calls.
clear()voidClears all logs.

FunctionLog Class

The FunctionLog class has the following properties:

PropertyTypeDescription
idstringThe id of the request.
function_namestringThe name of the function.
log_type"call" | "stream"The manner in which the function was called.
timingTimingThe timing of the request.
usageUsageThe usage of the request (aggregated from all calls).
calls(LLMCall | LLMStreamCall)[]Every call made to the LLM (including fallbacks and retries). Sorted from oldest to newest.
selected_call(LLMCall | LLMStreamCall)?The call used by BAML for parsing the response (there may be many due to fallbacks and retries).
raw_llm_responsestring | nullThe raw text from the best matching LLM.
tagsMap[str, any]Any user provided metadata.

Timing Class

The Timing class has the following properties:

PropertyTypeDescription
start_time_utc_msintThe start time of the request in milliseconds since epoch.
duration_msint | nullThe duration of the request in milliseconds.

StreamTiming Class (extends Timing)

No unique properties.

Usage Class

The Usage class has the following properties:

PropertyTypeDescription
input_tokensint | nullThe cumulative number of tokens used in the inputs.
output_tokensint | nullThe cumulative number of tokens used in the outputs.
cached_input_tokensint | nullThe number of cached input tokens (e.g., Anthropic's cache_read_input_tokens).
<Info> Note: Usage may not include all provider-specific token types like "thinking_tokens" or "cache_creation_input_tokens". For those, you may need to look at the raw HTTP response and build your own adapters. </Info>

LLMCall Class

The LLMCall class has the following properties:

PropertyTypeDescription
client_namestrThe name of the client used.
providerstrThe provider of the client used.
timingTimingThe timing of the request.
http_requestHttpRequestThe raw HTTP request sent to the client.
http_responseHttpResponse | nullThe raw HTTP response from the client (null for streaming).
usageUsage | nullThe usage of the request (if available).
selectedboolWhether this call was selected and used for parsing.

LLMStreamCall Class (extends LLMCall)

The LLMStreamCall includes the same properties as LLMCall plus the following:

PropertyTypeDescription
timingStreamTimingThe timing of the request.
sse_responses()SSEResponse[] | nullThe raw SSE responses received during streaming. Returns null if not available.

HttpRequest Class

The HttpRequest class has the following properties:

PropertyTypeDescription
urlstrThe URL of the request.
methodstrThe HTTP method of the request.
headersobjectThe request headers.
bodyHTTPBodyThe request body.

HttpResponse Class

The HttpResponse class has the following properties:

PropertyTypeDescription
statusintThe HTTP status code.
headersobjectThe response headers.
bodyHTTPBodyThe response body.

HTTPBody Class

The HTTPBody class has the following properties:

PropertyTypeDescription
text()stringThe body as a string.
json()objectThe body as a JSON object.

SSEResponse Class

The SSEResponse class represents a single Server-Sent Event received during streaming.

Property/MethodTypeDescription
textstringThe raw text content of the SSE data field.
json()object | nullParses and returns the text as JSON. Returns null if the text is not valid JSON.
<Note> The SSE event type (e.g., "message_delta", "content_block_delta") and event ID are not currently exposed. Only the data payload is available via `text` and `json()`. </Note>

Best Practices

  1. Use a single collector instance when tracking related function calls in a chain.
  2. Clear the collector when reusing it for unrelated operations.
  3. Consider using multiple collectors to track different parts of your application.
  4. Use function IDs when tracking specific calls in parallel operations.
  5. For streaming calls, be aware that http_response will be null, but you can still access usage information.