Back to Baml

Streaming

fern/01-guide/04-baml-basics/streaming.mdx

0.222.032.0 KB
Original Source

BAML lets you stream in structured JSON output from LLMs as it comes in.

If you tried streaming in a JSON output from an LLM you'd see something like:

{"items": [{"name": "Appl
{"items": [{"name": "Apple", "quantity": 2, "price": 1.
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost":
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": 3.00} # Completed

BAML gives you fine-grained control of how it fixes this partial JSON and transforms it into a series of semantically valid partial objects.

<Tip>You can check out more examples (including streaming in FastAPI and NextJS) in the BAML Examples repo.</Tip>

Let's stream the output of this function function ExtractReceiptInfo(email: string) -> ReceiptInfo for our example:

<Accordion title="extract-receipt-info.baml">
rust
class ReceiptItem {
  name string
  description string?
  quantity int
  price float
}

class ReceiptInfo {
    items ReceiptItem[]
    total_cost float?
}

function ExtractReceiptInfo(email: string) -> ReceiptInfo {
  client GPT4o
  prompt #"
    Given the receipt below:

    {{ email }}

    {{ ctx.output_format }}
  "#
}
</Accordion>

The BAML code generator creates a set of types in the baml_client library in a module called partial_types in baml_client. These types are modified from your original types to support streaming.

By default, BAML will convert all Class fields into nullable fields, and fill those fields with non-null values as much as possible given the tokens received so far.

<Tabs> <Tab title="Python" language="python"> BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so:
python
import asyncio
from baml_client import b, partial_types, types

# Using a stream:
def example1(receipt: str):
    stream = b.stream.ExtractReceiptInfo(receipt)

    # partial is a Partial type with all Optional fields
    for partial in stream:
        print(f"partial: parsed {len(partial.items)} items (object: {partial})")

    # final is the full, original, validated ReceiptInfo type
    final = stream.get_final_response()
    print(f"final: {len(final.items)} items (object: {final})")

# Using only get_final_response() of a stream
#
# In this case, you should just use b.ExtractReceiptInfo(receipt) instead,
# which is slightly faster and more efficient.
def example2(receipt: str):
    final = b.stream.ExtractReceiptInfo(receipt).get_final_response()
    print(f"final: {len(final.items)} items (object: {final})")

# Using the async client:
async def example3(receipt: str):
    # Note the import of the async client
    from baml_client.async_client import b
    stream = b.stream.ExtractReceiptInfo(receipt)
    async for partial in stream:
        print(f"partial: parsed {len(partial.items)} items (object: {partial})")

    final = await stream.get_final_response()
    print(f"final: {len(final.items)} items (object: {final})")

receipt = """
04/14/2024 1:05 pm

Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item	#	Price
Guide leash (1 Pair) uni UNI
1	$34.95
The Index Town Walls
1	$35.00
Boot Punch
3	$60.00
Subtotal	$129.95
Tax ($129.95 @ 9%)	$11.70
Total Tax	$11.70
Total	$141.65
"""

if __name__ == '__main__':
    #uncomment one at a time and run to see the difference
    example1(receipt)
    #example2(receipt)
    #asyncio.run(example3(receipt))
</Tab> <Tab title="TypeScript" language="typescript"> BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so:
ts
import { b } from './baml_client'

// Using both async iteration and getFinalResponse() from a stream
const example1 = async (receipt: string) => {
  const stream = b.stream.ExtractReceiptInfo(receipt)

  // partial is a Partial type with all Optional fields
  for await (const partial of stream) {
    console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
  }

  // final is the full, original, validated ReceiptInfo type
  const final = await stream.getFinalResponse()
  console.log(`final: ${final.items.length} items (object: ${final})`)
}

// Using only async iteration of a stream
const example2 = async (receipt: string) => {
  for await (const partial of b.stream.ExtractReceiptInfo(receipt)) {
    console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
  }
}

// Using only getFinalResponse() of a stream
//
// In this case, you should just use b.ExtractReceiptInfo(receipt) instead,
// which is faster and more efficient.
const example3 = async (receipt: string) => {
  const final = await b.stream.ExtractReceiptInfo(receipt).getFinalResponse()
  console.log(`final: ${final.items.length} items (object: ${final})`)
}

const receipt = `
04/14/2024 1:05 pm

Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item	#	Price
Guide leash (1 Pair) uni UNI
1	$34.95
The Index Town Walls
1	$35.00
Boot Punch
3	$60.00
Subtotal	$129.95
Tax ($129.95 @ 9%)	$11.70
Total Tax	$11.70
Total	$141.65
`

if (require.main === module) {
  example1(receipt)
  example2(receipt)
  example3(receipt)
}
</Tab> <Tab title="Go" language="go" > BAML will generate `b.Stream.ExtractReceiptInfo()` for you, which you can use like so:
go
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    b "example.com/myproject/baml_client"
    "example.com/myproject/baml_client/stream_types"
    "example.com/myproject/baml_client/types"
)

// Basic streaming with comprehensive error handling and context cancellation
func basicStreamingExample(receipt string) {
    // Create context with timeout to prevent hanging
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel() // Always clean up context resources

    stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
    if err != nil {
        log.Printf("Failed to create stream: %v", err)
        return
    }

    // Ensure stream is properly closed on exit
    defer func() {
        if stream != nil {
            // Note: In practice, range automatically handles closing
            // but explicit cleanup is shown here for demonstration
            log.Println("Stream processing completed")
        }
    }()

    for value := range stream {
        // Handle context cancellation
        select {
        case <-ctx.Done():
            log.Printf("Stream cancelled due to context: %v", ctx.Err())
            return
        default:
        }

        // Handle streaming errors
        if value.IsError {
            log.Printf("Stream error: %v", value.Error)
            return
        }

        // Process partial results
        if !value.IsFinal && value.Stream() != nil {
            partial := *value.Stream()
            fmt.Printf("Partial result: parsed %d items so far\n", len(partial.Items))

            // You could process partial results here
            for i, item := range partial.Items {
                if item.Name != "" { // Only show items with names parsed so far
                    fmt.Printf("  Item %d: %s - %s\n", i+1, item.Name, item.Price)
                }
            }
        }

        // Process final result
        if value.IsFinal && value.Final() != nil {
            final := *value.Final()
            fmt.Printf("Final result: %d items total\n", len(final.Items))
            fmt.Printf("Total amount: %s\n", final.Total)
            return
        }
    }
}

// Stream with early termination based on conditions
func streamWithEarlyTermination(receipt string) (*types.ReceiptInfo, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
    if err != nil {
        return nil, fmt.Errorf("failed to create stream: %w", err)
    }

    for value := range stream {
        // Check for cancellation
        select {
        case <-ctx.Done():
            return nil, fmt.Errorf("stream cancelled: %w", ctx.Err())
        default:
        }

        if value.IsError {
            return nil, fmt.Errorf("stream error: %w", value.Error)
        }

        // Early termination condition: stop if we have enough items
        if !value.IsFinal && value.Stream() != nil {
            partial := *value.Stream()
            if len(partial.Items) >= 3 { // Stop early if we have 3+ items
                fmt.Printf("Early termination: found %d items, stopping stream\n", len(partial.Items))
                cancel() // Cancel context to stop stream
                return &partial, nil
            }
        }

        if value.IsFinal && value.Final() != nil {
            final := *value.Final()
            return &final, nil
        }
    }

    return nil, fmt.Errorf("stream ended without final response")
}

// Concurrent streaming - process multiple receipts concurrently
func concurrentStreamingExample(receipts []string) {
    var wg sync.WaitGroup
    results := make(chan *types.ReceiptInfo, len(receipts))
    errors := make(chan error, len(receipts))

    // Create context with timeout for all goroutines
    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()

    for i, receipt := range receipts {
        wg.Add(1)
        go func(index int, receiptData string) {
            defer wg.Done()

            // Create per-goroutine context
            goroutineCtx, goroutineCancel := context.WithTimeout(ctx, 30*time.Second)
            defer goroutineCancel()

            stream, err := b.Stream.ExtractReceiptInfo(goroutineCtx, receiptData)
            if err != nil {
                errors <- fmt.Errorf("receipt %d: failed to create stream: %w", index, err)
                return
            }

            for value := range stream {
                select {
                case <-goroutineCtx.Done():
                    errors <- fmt.Errorf("receipt %d: stream cancelled: %w", index, goroutineCtx.Err())
                    return
                default:
                }

                if value.IsError {
                    errors <- fmt.Errorf("receipt %d: stream error: %w", index, value.Error)
                    return
                }

                if value.IsFinal && value.Final() != nil {
                    final := *value.Final()
                    fmt.Printf("Receipt %d: processed %d items\n", index, len(final.Items))
                    results <- &final
                    return
                }
            }

            errors <- fmt.Errorf("receipt %d: stream ended without final response", index)
        }(i, receipt)
    }

    // Wait for all goroutines and close channels
    go func() {
        wg.Wait()
        close(results)
        close(errors)
    }()

    // Collect results and errors
    var successCount int
    var errorCount int

    for results != nil || errors != nil {
        select {
        case result, ok := <-results:
            if !ok {
                results = nil
                continue
            }
            if result != nil {
                successCount++
                fmt.Printf("Successfully processed receipt with %d items, total: %s\n",
                    len(result.Items), result.Total)
            }

        case err, ok := <-errors:
            if !ok {
                errors = nil
                continue
            }
            if err != nil {
                errorCount++
                log.Printf("Error processing receipt: %v", err)
            }
        }
    }

    fmt.Printf("Concurrent processing completed: %d successes, %d errors\n",
        successCount, errorCount)
}

// Robust streaming with retry logic
func streamWithRetry(receipt string, maxRetries int) (*types.ReceiptInfo, error) {
    for attempt := 1; attempt <= maxRetries; attempt++ {
        // Create fresh context for each attempt
        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

        stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
        if err != nil {
            cancel()
            if attempt == maxRetries {
                return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
            }
            log.Printf("Attempt %d failed: %v, retrying...", attempt, err)
            time.Sleep(time.Duration(attempt) * time.Second) // Exponential backoff
            continue
        }

        for value := range stream {
            select {
            case <-ctx.Done():
                cancel()
                if attempt == maxRetries {
                    return nil, fmt.Errorf("stream timeout after %d attempts: %w", maxRetries, ctx.Err())
                }
                log.Printf("Attempt %d timed out, retrying...", attempt)
                break
            default:
            }

            if value.IsError {
                cancel()
                if attempt == maxRetries {
                    return nil, fmt.Errorf("stream failed after %d attempts: %w", maxRetries, value.Error)
                }
                log.Printf("Attempt %d failed with stream error: %v, retrying...", attempt, value.Error)
                time.Sleep(time.Duration(attempt) * time.Second)
                break
            }

            if value.IsFinal && value.Final() != nil {
                final := *value.Final()
                cancel()
                return &final, nil
            }
        }
    }

    return nil, fmt.Errorf("all %d attempts failed", maxRetries)
}

func main() {
    receipt := `04/14/2024 1:05 pm

Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item	#	Price
Guide leash (1 Pair) uni UNI
1	$34.95
The Index Town Walls
1	$35.00
Boot Punch
3	$60.00
Subtotal	$129.95
Tax ($129.95 @ 9%)	$11.70
Total Tax	$11.70
Total	$141.65`

    fmt.Println("=== Basic Streaming Example ===")
    basicStreamingExample(receipt)

    fmt.Println("\n=== Stream with Early Termination ===")
    result, err := streamWithEarlyTermination(receipt)
    if err != nil {
        log.Printf("Early termination example failed: %v", err)
    } else if result != nil {
        fmt.Printf("Early termination result: %d items\n", len(result.Items))
    }

    fmt.Println("\n=== Concurrent Streaming Example ===")
    receipts := []string{receipt, receipt, receipt} // Process same receipt 3 times concurrently
    concurrentStreamingExample(receipts)

    fmt.Println("\n=== Stream with Retry Example ===")
    retryResult, err := streamWithRetry(receipt, 3)
    if err != nil {
        log.Printf("Retry example failed: %v", err)
    } else if retryResult != nil {
        fmt.Printf("Retry example succeeded: %d items\n", len(retryResult.Items))
    }
}
</Tab> <Tab title="Ruby (beta)" language="ruby"> BAML will generate `Baml.Client.stream.ExtractReceiptInfo()` for you, which you can use like so:
ruby
require_relative "baml_client/client"

$b = Baml.Client

# Using both iteration and get_final_response() from a stream
def example1(receipt)
  stream = $b.stream.ExtractReceiptInfo(receipt)

  stream.each do |partial|
    puts "partial: #{partial.items&.length} items"
  end

  final = stream.get_final_response
  puts "final: #{final.items.length} items"
end

# Using only iteration of a stream
def example2(receipt)
  $b.stream.ExtractReceiptInfo(receipt).each do |partial|
    puts "partial: #{partial.items&.length} items"
  end
end

# Using only get_final_response() of a stream
#
# In this case, you should just use BamlClient.ExtractReceiptInfo(receipt) instead,
# which is faster and more efficient.
def example3(receipt)
  final = $b.stream.ExtractReceiptInfo(receipt).get_final_response
  puts "final: #{final.items.length} items"
end

receipt = <<~RECEIPT
  04/14/2024 1:05 pm

  Ticket: 220000082489
  Register: Shop Counter
  Employee: Connor
  Customer: Sam
  Item  #  Price
  Guide leash (1 Pair) uni UNI
  1 $34.95
  The Index Town Walls
  1 $35.00
  Boot Punch
  3 $60.00
  Subtotal $129.95
  Tax ($129.95 @ 9%) $11.70
  Total Tax $11.70
  Total $141.65
RECEIPT

if __FILE__ == $0
  example1(receipt)
  example2(receipt)
  example3(receipt)
end
</Tab> <Tab title="Rust" language="rust"> BAML will generate `B.ExtractReceiptInfo.stream()` for you, which you can use like so:
rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;

// Using both partials and get_final_response() from a stream
fn example1(receipt: &str) {
    let mut stream = B.ExtractReceiptInfo.stream(receipt).unwrap();

    // partial is a Partial type with all Optional fields
    for partial in stream.partials() {
        let partial = partial.unwrap();
        println!("partial: {} items", partial.items.len());
    }

    // final is the full, original, validated ReceiptInfo type
    let final_result = stream.get_final_response().unwrap();
    println!("final: {} items", final_result.items.len());
}

// Using only get_final_response() of a stream
//
// In this case, you should just use B.ExtractReceiptInfo.call(receipt) instead,
// which is slightly faster and more efficient.
fn example2(receipt: &str) {
    let stream = B.ExtractReceiptInfo.stream(receipt).unwrap();
    let final_result = stream.get_final_response().unwrap();
    println!("final: {} items", final_result.items.len());
}

fn main() {
    let receipt = "04/14/2024 1:05 pm\n\nTicket: 220000082489\nRegister: Shop Counter\nEmployee: Connor\nCustomer: Sam\nItem\t#\tPrice\nGuide leash (1 Pair) uni UNI\n1\t$34.95\nThe Index Town Walls\n1\t$35.00\nBoot Punch\n3\t$60.00\nSubtotal\t$129.95\nTax ($129.95 @ 9%)\t$11.70\nTotal Tax\t$11.70\nTotal\t$141.65";

    example1(receipt);
    example2(receipt);
}
</Tab> <Tab title="OpenAPI" language="openapi"> <Tip> When using `baml-cli serve`, streaming is available via `http://localhost:2024/stream/{FunctionName}`. However streaming routes are not added to the `openapi.yaml` file because there are no partial type definitions for JSON schema yet. </Tip> </Tab> </Tabs> <Note> Number fields are always streamed in only when the LLM completes them. E.g. if the final number is 129.95, you'll only see null or 129.95 instead of partial numbers like 1, 12, 129.9, etc. </Note>

Cancelling Streams

You can cancel ongoing streams using abort controllers, which is essential for responsive applications that allow users to stop generation or implement timeouts.

<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript import { b } from './baml_client'

const controller = new AbortController()

const stream = b.stream.ExtractReceiptInfo(receipt, { abortController: controller })

// Process stream with ability to cancel let itemCount = 0 for await (const partial of stream) { itemCount = partial.items?.length || 0 console.log(Received ${itemCount} items so far)

// Cancel if we have enough items if (itemCount >= 5) { console.log('Stopping stream - got enough items') controller.abort() break } }

// Or cancel after a timeout setTimeout(() => { controller.abort() console.log('Stream cancelled due to timeout') }, 5000)

</Tab>

<Tab title="Python" language="python">
```python
from baml_client.async_client import b
from baml_py import AbortController

controller = AbortController()

stream = b.stream.ExtractReceiptInfo(
    receipt,
    baml_options={"abort_controller": controller}
)

# Process stream with ability to cancel
item_count = 0
async for partial in stream:
    item_count = len(partial.items) if partial.items else 0
    print(f"Received {item_count} items so far")

    # Cancel if we have enough items
    if item_count >= 5:
        print("Stopping stream - got enough items")
        controller.abort()
        break

# Or cancel after a timeout
import asyncio
async def cancel_after_timeout():
    await asyncio.sleep(5)
    controller.abort()
    print("Stream cancelled due to timeout")

asyncio.create_task(cancel_after_timeout())
</Tab> <Tab title="Go" language="go"> ```go // Go already uses context for cancellation in the examples above ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()

stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt) if err != nil { log.Printf("Failed to create stream: %v", err) return }

for value := range stream { // Stream will automatically stop when context is cancelled select { case <-ctx.Done(): log.Printf("Stream cancelled: %v", ctx.Err()) return default: }

// Process partial results
if !value.IsFinal && value.Stream() != nil {
    partial := *value.Stream()
    if len(partial.Items) >= 5 {
        log.Printf("Stopping stream - got %d items", len(partial.Items))
        cancel() // Cancel the context to stop the stream
        return
    }
}

}

</Tab>

<Tab title="Ruby" language="ruby">
```ruby
require 'baml_client'

controller = Baml::AbortController.new

stream = $b.stream.ExtractReceiptInfo(
  receipt,
  baml_options: { abort_controller: controller }
)

# Process stream with ability to cancel
item_count = 0
stream.each do |partial|
  item_count = partial.items&.length || 0
  puts "Received #{item_count} items so far"

  # Cancel if we have enough items
  if item_count >= 5
    puts "Stopping stream - got enough items"
    controller.abort
    break
  end
end

# Or cancel after a timeout (in a separate thread)
Thread.new do
  sleep(5)
  controller.abort
  puts "Stream cancelled due to timeout"
end
</Tab> <Tab title="Rust" language="rust"> ```rust use baml::CancellationToken; use myproject::baml_client::sync_client::B; use std::time::Duration;

// Cancel a stream after a timeout let token = CancellationToken::new_with_timeout(Duration::from_secs(5));

let mut stream = B.ExtractReceiptInfo .with_cancellation_token(Some(token)) .stream(receipt) .unwrap();

for partial in stream.partials() { match partial { Ok(partial) => { println!("Received {} items so far", partial.items.len());

        // Stop consuming the stream if we have enough items
        if partial.items.len() >= 5 {
            println!("Stopping stream - got enough items");
            break;
        }
    }
    Err(e) => {
        eprintln!("Stream error: {}", e);
        break;
    }
}

}

</Tab>
</Tabs>

### Common Streaming Cancellation Patterns

#### User-Initiated Cancellation
Allow users to stop streaming generation with a "Stop" button:

<Tabs>
<Tab title="React" language="react">
```tsx
function StreamingComponent() {
  const [controller, setController] = useState<AbortController | null>(null)
  const [isStreaming, setIsStreaming] = useState(false)
  const [result, setResult] = useState("")

  const startStreaming = async () => {
    const newController = new AbortController()
    setController(newController)
    setIsStreaming(true)

    try {
      const stream = b.stream.GenerateContent(prompt, {
        abortController: newController
      })

      let accumulated = ""
      for await (const partial of stream) {
        accumulated = partial.content || ""
        setResult(accumulated)
      }
    } catch (error) {
      if (error.name === 'BamlAbortError') {
        console.log('Stream cancelled by user')
      }
    } finally {
      setIsStreaming(false)
      setController(null)
    }
  }

  const stopStreaming = () => {
    controller?.abort()
  }

  return (
    <div>
      <button onClick={startStreaming} disabled={isStreaming}>
        Start Streaming
      </button>
      <button onClick={stopStreaming} disabled={!isStreaming}>
        Stop
      </button>
      <div>{result}</div>
    </div>
  )
}
</Tab> <Tab title="FastAPI" language="python"> ```python from fastapi import FastAPI from fastapi.responses import StreamingResponse from baml_py import AbortController import asyncio

app = FastAPI() active_streams = {}

@app.post("/stream/{stream_id}") async def start_stream(stream_id: str, prompt: str): controller = AbortController() active_streams[stream_id] = controller

async def generate():
    try:
        stream = b.stream.GenerateContent(
            prompt,
            baml_options={"abort_controller": controller}
        )
        async for partial in stream:
            if controller.aborted:
                break
            yield f"data: {partial.content}\n\n"
    except BamlAbortError:
        yield "data: [CANCELLED]\n\n"
    finally:
        active_streams.pop(stream_id, None)

return StreamingResponse(generate(), media_type="text/event-stream")

@app.post("/stop/{stream_id}") async def stop_stream(stream_id: str): if controller := active_streams.get(stream_id): controller.abort() return {"status": "stopped"} return {"status": "not found"}

</Tab>
</Tabs>

For more examples and patterns, see the [Abort Controllers guide](/guide/baml-basics/abort-signal).

## Semantic Streaming

BAML provides powerful attributes to control how your data streams, ensuring that partial values always maintain semantic validity. Here are the three key streaming attributes:

### `@stream.done`
This attribute ensures a type or field is only streamed when it's completely finished. It's useful when you need atomic, fully-formed values.

For example:
```baml
class ReceiptItem {
  name string
  quantity int
  price float

  // The entire ReceiptItem will only stream when complete
  @@stream.done
}

// Receipts is a list of ReceiptItems,
// each internal item will only stream when complete
type Receipts = ReceiptItem[]

class Person {
  // Name will only appear when fully complete,
  // until then it will be null
  name string @stream.done
  // Numbers (floats and ints) will only appear
  // when fully complete by default
  age int
  // Bio will stream token by token
  bio string
}

Atomic list items with union types

A common pattern is streaming a list of items where each item can be one of several types (e.g. tool calls and messages). You can use @stream.done on the list element type to ensure each item only appears once it's fully complete:

baml
class ToolCall {
  name string
  parameters string
}

class Message {
  role string
  content string
}

type OutputItem = ToolCall | Message

// Each list element appears only when fully complete.
// The list grows incrementally as items finish.
function Run(input: string) -> (OutputItem @stream.done)[] {
  client MyClient
  prompt #"
    {{ input }}
    {{ ctx.output_format }}
  "#
}

When @stream.done is applied to a union type, it propagates to all variants. This means you don't need to add @@stream.done to each class individually — annotating the union is sufficient.

<Tip> You can also achieve the same behavior by adding `@@stream.done` to each class in the union. The `(T @stream.done)[]` syntax is more concise when the classes are used in other contexts where you don't want `@@stream.done`. </Tip>

@stream.not_null

This attribute ensures a containing object is only streamed when this field has a value. It's particularly useful for discriminator fields or required metadata.

For example:

baml
class Message {
  // Message won't stream until type is known
  type "error" | "success" | "info" @stream.not_null
  // Timestamp will only appear when fully complete
  // until then it will be null
  timestamp string @stream.done
  // Content can stream token by token
  content string
}

@stream.with_state

This attribute adds metadata to track if a field has finished streaming. It's perfect for showing loading states in UIs.

For example:

baml
class BlogPost {
  // The blog post will only stream when title is known
  title string @stream.done @stream.not_null
  // The content will stream token by token, and include completion state
  content string @stream.with_state
}

This will generate the following code in the partial_types module: <Tabs> <Tab title="Python" language="python">

python
class StreamState(BaseModel, Generic[T]):
  value: T,
  state: "incomplete" | "complete"

class BlogPost(BaseModel):
  title: str
  content: StreamState[str | None]
</Tab> <Tab title="Typescript" language="typescript"> ```typescript interface StreamState<T> { value: T, state: "incomplete" | "complete" }

interface BlogPost { title: StreamState<string> content: StreamState<string> }

</Tab>
</Tabs>

### Type Transformation Summary

Here's how these attributes affect your types in generated code:

| BAML Type                         | Generated Type (during streaming)              | Description                                    |
|----------------------------------|----------------------------|------------------------------------------------|
| `T`                               | `Partial[T]?`              | Default: Nullable and partial                   |
| `T @stream.done`                  | `T?`                       | Nullable but always complete when present       |
| `T @stream.not_null`              | `Partial[T]`               | Always present but may be partial              |
| `T @stream.done @stream.not_null` | `T`                        | Always present and always complete             |
| `T @stream.with_state`            | `StreamState[Partial[T]?]` | Includes streaming state metadata              |

<Warning>
The return type of a function is not affected by streaming attributes!
</Warning>

## Putting it all together

Let's put all of these concepts together to design an application that
streams a conversation containing stock recommendations, using semantic
streaming to ensure that the streamed data obeys our domain's invariants.

```baml
enum Stock {
  APPL
  MSFT
  GOOG
  BAML
}

// Make recommendations atomic - we do not want a recommendation to be
// modified by streaming additional messages.
class Recommendation {
  stock Stock
  amount float
  action "buy" | "sell"
  @@stream.done
}

class AssistantMessage {
  message_type "greeting" | "conversation" | "farewell" @stream.not_null
  message string @stream.with_state @stream.not_null
}

function Respond(
  history: (UserMessage | AssistantMessage | Recommendation)[]
) -> Message | Recommendation {
  client DeepseekR1
  prompt #"
    Make the message in the conversation, using a conversational
    message or a stock recommendation, based on this conversation history:
    {{ history }}.

    {{ ctx.output_format }}
  "#
}
<Tabs> <Tab title="Python" language="python"> The above BAML code will generate the following Python definitions in the `partial_types` module. The use of streaming attributes has several effects on the generated code:
  • Recommendation does not have any partial fields because it was marked @stream.done.
  • The Message.message string is wrapped in StreamState, allowing runtime checking of its completion status. This status could be used to render a spinner as the message streams in.
  • The Message.message_type field may not be null, because it was marked as @stream.not_null.
python
class StreamState(BaseModel, Generic[T]):
  value: T,
  state: Literal["Pending", "Incomplete", "Complete"]

class Stock(str, Enum):
    APPL = "APPL"
    MSFT = "MSFT"
    GOOG = "GOOG"
    BAML = "BAML"

class Recommendation(BaseClass):
    stock: Stock
    amount: float
    action: Literal["buy", "sell"]

class Message(BaseClass):
  message_type: Literal["gretting","conversation","farewell"]
  message: StreamState[string]
</Tab> <Tab title="Typescript" language="typescript" > This BAML code will generate the following Typescript definitions in the `partial_types` module. The use of streaming attributes has several effects on the generated code:
  • Recommendation does not have any partial fields because it was marked @stream.done.
  • The Message.message string is wrapped in StreamState, allowing runtime checking of its completion status. This status could be used to render a spinner as the message streams in.
  • The Message.message_type field may not be null, because it was marked as @stream.not_null.
typescript
export interface StreamState<T> {
  value: T,
  state: "Pending" | "Incomplete" | "Complete"
}

export enum Category {
  APPL = "APPl",
  MSFT = "MSFT",
  GOOG = "GOOG",
  BAML = "BAML",
}

export interface Recommendation {
  stock: Stock,
  amount: float,
  action: "buy" | "sell"
}

export interface Message {
  message_type: "gretting" | "conversation" | "farewell"
  message: StreamState<string>
}
</Tab> </Tabs>