fern/01-guide/04-baml-basics/streaming.mdx
BAML lets you stream in structured JSON output from LLMs as it comes in.
If you tried streaming in a JSON output from an LLM you'd see something like:
{"items": [{"name": "Appl
{"items": [{"name": "Apple", "quantity": 2, "price": 1.
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost":
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": 3.00} # Completed
BAML gives you fine-grained control of how it fixes this partial JSON and transforms it into a series of semantically valid partial objects.
<Tip>You can check out more examples (including streaming in FastAPI and NextJS) in the BAML Examples repo.</Tip>
Let's stream the output of this function function ExtractReceiptInfo(email: string) -> ReceiptInfo for our example:
class ReceiptItem {
name string
description string?
quantity int
price float
}
class ReceiptInfo {
items ReceiptItem[]
total_cost float?
}
function ExtractReceiptInfo(email: string) -> ReceiptInfo {
client GPT4o
prompt #"
Given the receipt below:
{{ email }}
{{ ctx.output_format }}
"#
}
The BAML code generator creates a set of types in the baml_client library
in a module called partial_types in baml_client. These types are modified
from your original types to support streaming.
By default, BAML will convert all Class fields into nullable fields, and fill those fields with non-null values as much as possible given the tokens received so far.
<Tabs> <Tab title="Python" language="python"> BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so:import asyncio
from baml_client import b, partial_types, types
# Using a stream:
def example1(receipt: str):
stream = b.stream.ExtractReceiptInfo(receipt)
# partial is a Partial type with all Optional fields
for partial in stream:
print(f"partial: parsed {len(partial.items)} items (object: {partial})")
# final is the full, original, validated ReceiptInfo type
final = stream.get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
# Using only get_final_response() of a stream
#
# In this case, you should just use b.ExtractReceiptInfo(receipt) instead,
# which is slightly faster and more efficient.
def example2(receipt: str):
final = b.stream.ExtractReceiptInfo(receipt).get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
# Using the async client:
async def example3(receipt: str):
# Note the import of the async client
from baml_client.async_client import b
stream = b.stream.ExtractReceiptInfo(receipt)
async for partial in stream:
print(f"partial: parsed {len(partial.items)} items (object: {partial})")
final = await stream.get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
receipt = """
04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65
"""
if __name__ == '__main__':
#uncomment one at a time and run to see the difference
example1(receipt)
#example2(receipt)
#asyncio.run(example3(receipt))
import { b } from './baml_client'
// Using both async iteration and getFinalResponse() from a stream
const example1 = async (receipt: string) => {
const stream = b.stream.ExtractReceiptInfo(receipt)
// partial is a Partial type with all Optional fields
for await (const partial of stream) {
console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
}
// final is the full, original, validated ReceiptInfo type
const final = await stream.getFinalResponse()
console.log(`final: ${final.items.length} items (object: ${final})`)
}
// Using only async iteration of a stream
const example2 = async (receipt: string) => {
for await (const partial of b.stream.ExtractReceiptInfo(receipt)) {
console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
}
}
// Using only getFinalResponse() of a stream
//
// In this case, you should just use b.ExtractReceiptInfo(receipt) instead,
// which is faster and more efficient.
const example3 = async (receipt: string) => {
const final = await b.stream.ExtractReceiptInfo(receipt).getFinalResponse()
console.log(`final: ${final.items.length} items (object: ${final})`)
}
const receipt = `
04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65
`
if (require.main === module) {
example1(receipt)
example2(receipt)
example3(receipt)
}
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
b "example.com/myproject/baml_client"
"example.com/myproject/baml_client/stream_types"
"example.com/myproject/baml_client/types"
)
// Basic streaming with comprehensive error handling and context cancellation
func basicStreamingExample(receipt string) {
// Create context with timeout to prevent hanging
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() // Always clean up context resources
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
log.Printf("Failed to create stream: %v", err)
return
}
// Ensure stream is properly closed on exit
defer func() {
if stream != nil {
// Note: In practice, range automatically handles closing
// but explicit cleanup is shown here for demonstration
log.Println("Stream processing completed")
}
}()
for value := range stream {
// Handle context cancellation
select {
case <-ctx.Done():
log.Printf("Stream cancelled due to context: %v", ctx.Err())
return
default:
}
// Handle streaming errors
if value.IsError {
log.Printf("Stream error: %v", value.Error)
return
}
// Process partial results
if !value.IsFinal && value.Stream() != nil {
partial := *value.Stream()
fmt.Printf("Partial result: parsed %d items so far\n", len(partial.Items))
// You could process partial results here
for i, item := range partial.Items {
if item.Name != "" { // Only show items with names parsed so far
fmt.Printf(" Item %d: %s - %s\n", i+1, item.Name, item.Price)
}
}
}
// Process final result
if value.IsFinal && value.Final() != nil {
final := *value.Final()
fmt.Printf("Final result: %d items total\n", len(final.Items))
fmt.Printf("Total amount: %s\n", final.Total)
return
}
}
}
// Stream with early termination based on conditions
func streamWithEarlyTermination(receipt string) (*types.ReceiptInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
for value := range stream {
// Check for cancellation
select {
case <-ctx.Done():
return nil, fmt.Errorf("stream cancelled: %w", ctx.Err())
default:
}
if value.IsError {
return nil, fmt.Errorf("stream error: %w", value.Error)
}
// Early termination condition: stop if we have enough items
if !value.IsFinal && value.Stream() != nil {
partial := *value.Stream()
if len(partial.Items) >= 3 { // Stop early if we have 3+ items
fmt.Printf("Early termination: found %d items, stopping stream\n", len(partial.Items))
cancel() // Cancel context to stop stream
return &partial, nil
}
}
if value.IsFinal && value.Final() != nil {
final := *value.Final()
return &final, nil
}
}
return nil, fmt.Errorf("stream ended without final response")
}
// Concurrent streaming - process multiple receipts concurrently
func concurrentStreamingExample(receipts []string) {
var wg sync.WaitGroup
results := make(chan *types.ReceiptInfo, len(receipts))
errors := make(chan error, len(receipts))
// Create context with timeout for all goroutines
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
for i, receipt := range receipts {
wg.Add(1)
go func(index int, receiptData string) {
defer wg.Done()
// Create per-goroutine context
goroutineCtx, goroutineCancel := context.WithTimeout(ctx, 30*time.Second)
defer goroutineCancel()
stream, err := b.Stream.ExtractReceiptInfo(goroutineCtx, receiptData)
if err != nil {
errors <- fmt.Errorf("receipt %d: failed to create stream: %w", index, err)
return
}
for value := range stream {
select {
case <-goroutineCtx.Done():
errors <- fmt.Errorf("receipt %d: stream cancelled: %w", index, goroutineCtx.Err())
return
default:
}
if value.IsError {
errors <- fmt.Errorf("receipt %d: stream error: %w", index, value.Error)
return
}
if value.IsFinal && value.Final() != nil {
final := *value.Final()
fmt.Printf("Receipt %d: processed %d items\n", index, len(final.Items))
results <- &final
return
}
}
errors <- fmt.Errorf("receipt %d: stream ended without final response", index)
}(i, receipt)
}
// Wait for all goroutines and close channels
go func() {
wg.Wait()
close(results)
close(errors)
}()
// Collect results and errors
var successCount int
var errorCount int
for results != nil || errors != nil {
select {
case result, ok := <-results:
if !ok {
results = nil
continue
}
if result != nil {
successCount++
fmt.Printf("Successfully processed receipt with %d items, total: %s\n",
len(result.Items), result.Total)
}
case err, ok := <-errors:
if !ok {
errors = nil
continue
}
if err != nil {
errorCount++
log.Printf("Error processing receipt: %v", err)
}
}
}
fmt.Printf("Concurrent processing completed: %d successes, %d errors\n",
successCount, errorCount)
}
// Robust streaming with retry logic
func streamWithRetry(receipt string, maxRetries int) (*types.ReceiptInfo, error) {
for attempt := 1; attempt <= maxRetries; attempt++ {
// Create fresh context for each attempt
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
cancel()
if attempt == maxRetries {
return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
log.Printf("Attempt %d failed: %v, retrying...", attempt, err)
time.Sleep(time.Duration(attempt) * time.Second) // Exponential backoff
continue
}
for value := range stream {
select {
case <-ctx.Done():
cancel()
if attempt == maxRetries {
return nil, fmt.Errorf("stream timeout after %d attempts: %w", maxRetries, ctx.Err())
}
log.Printf("Attempt %d timed out, retrying...", attempt)
break
default:
}
if value.IsError {
cancel()
if attempt == maxRetries {
return nil, fmt.Errorf("stream failed after %d attempts: %w", maxRetries, value.Error)
}
log.Printf("Attempt %d failed with stream error: %v, retrying...", attempt, value.Error)
time.Sleep(time.Duration(attempt) * time.Second)
break
}
if value.IsFinal && value.Final() != nil {
final := *value.Final()
cancel()
return &final, nil
}
}
}
return nil, fmt.Errorf("all %d attempts failed", maxRetries)
}
func main() {
receipt := `04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65`
fmt.Println("=== Basic Streaming Example ===")
basicStreamingExample(receipt)
fmt.Println("\n=== Stream with Early Termination ===")
result, err := streamWithEarlyTermination(receipt)
if err != nil {
log.Printf("Early termination example failed: %v", err)
} else if result != nil {
fmt.Printf("Early termination result: %d items\n", len(result.Items))
}
fmt.Println("\n=== Concurrent Streaming Example ===")
receipts := []string{receipt, receipt, receipt} // Process same receipt 3 times concurrently
concurrentStreamingExample(receipts)
fmt.Println("\n=== Stream with Retry Example ===")
retryResult, err := streamWithRetry(receipt, 3)
if err != nil {
log.Printf("Retry example failed: %v", err)
} else if retryResult != nil {
fmt.Printf("Retry example succeeded: %d items\n", len(retryResult.Items))
}
}
require_relative "baml_client/client"
$b = Baml.Client
# Using both iteration and get_final_response() from a stream
def example1(receipt)
stream = $b.stream.ExtractReceiptInfo(receipt)
stream.each do |partial|
puts "partial: #{partial.items&.length} items"
end
final = stream.get_final_response
puts "final: #{final.items.length} items"
end
# Using only iteration of a stream
def example2(receipt)
$b.stream.ExtractReceiptInfo(receipt).each do |partial|
puts "partial: #{partial.items&.length} items"
end
end
# Using only get_final_response() of a stream
#
# In this case, you should just use BamlClient.ExtractReceiptInfo(receipt) instead,
# which is faster and more efficient.
def example3(receipt)
final = $b.stream.ExtractReceiptInfo(receipt).get_final_response
puts "final: #{final.items.length} items"
end
receipt = <<~RECEIPT
04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65
RECEIPT
if __FILE__ == $0
example1(receipt)
example2(receipt)
example3(receipt)
end
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
// Using both partials and get_final_response() from a stream
fn example1(receipt: &str) {
let mut stream = B.ExtractReceiptInfo.stream(receipt).unwrap();
// partial is a Partial type with all Optional fields
for partial in stream.partials() {
let partial = partial.unwrap();
println!("partial: {} items", partial.items.len());
}
// final is the full, original, validated ReceiptInfo type
let final_result = stream.get_final_response().unwrap();
println!("final: {} items", final_result.items.len());
}
// Using only get_final_response() of a stream
//
// In this case, you should just use B.ExtractReceiptInfo.call(receipt) instead,
// which is slightly faster and more efficient.
fn example2(receipt: &str) {
let stream = B.ExtractReceiptInfo.stream(receipt).unwrap();
let final_result = stream.get_final_response().unwrap();
println!("final: {} items", final_result.items.len());
}
fn main() {
let receipt = "04/14/2024 1:05 pm\n\nTicket: 220000082489\nRegister: Shop Counter\nEmployee: Connor\nCustomer: Sam\nItem\t#\tPrice\nGuide leash (1 Pair) uni UNI\n1\t$34.95\nThe Index Town Walls\n1\t$35.00\nBoot Punch\n3\t$60.00\nSubtotal\t$129.95\nTax ($129.95 @ 9%)\t$11.70\nTotal Tax\t$11.70\nTotal\t$141.65";
example1(receipt);
example2(receipt);
}
You can cancel ongoing streams using abort controllers, which is essential for responsive applications that allow users to stop generation or implement timeouts.
<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript import { b } from './baml_client'const controller = new AbortController()
const stream = b.stream.ExtractReceiptInfo(receipt, { abortController: controller })
// Process stream with ability to cancel
let itemCount = 0
for await (const partial of stream) {
itemCount = partial.items?.length || 0
console.log(Received ${itemCount} items so far)
// Cancel if we have enough items if (itemCount >= 5) { console.log('Stopping stream - got enough items') controller.abort() break } }
// Or cancel after a timeout setTimeout(() => { controller.abort() console.log('Stream cancelled due to timeout') }, 5000)
</Tab>
<Tab title="Python" language="python">
```python
from baml_client.async_client import b
from baml_py import AbortController
controller = AbortController()
stream = b.stream.ExtractReceiptInfo(
receipt,
baml_options={"abort_controller": controller}
)
# Process stream with ability to cancel
item_count = 0
async for partial in stream:
item_count = len(partial.items) if partial.items else 0
print(f"Received {item_count} items so far")
# Cancel if we have enough items
if item_count >= 5:
print("Stopping stream - got enough items")
controller.abort()
break
# Or cancel after a timeout
import asyncio
async def cancel_after_timeout():
await asyncio.sleep(5)
controller.abort()
print("Stream cancelled due to timeout")
asyncio.create_task(cancel_after_timeout())
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt) if err != nil { log.Printf("Failed to create stream: %v", err) return }
for value := range stream { // Stream will automatically stop when context is cancelled select { case <-ctx.Done(): log.Printf("Stream cancelled: %v", ctx.Err()) return default: }
// Process partial results
if !value.IsFinal && value.Stream() != nil {
partial := *value.Stream()
if len(partial.Items) >= 5 {
log.Printf("Stopping stream - got %d items", len(partial.Items))
cancel() // Cancel the context to stop the stream
return
}
}
}
</Tab>
<Tab title="Ruby" language="ruby">
```ruby
require 'baml_client'
controller = Baml::AbortController.new
stream = $b.stream.ExtractReceiptInfo(
receipt,
baml_options: { abort_controller: controller }
)
# Process stream with ability to cancel
item_count = 0
stream.each do |partial|
item_count = partial.items&.length || 0
puts "Received #{item_count} items so far"
# Cancel if we have enough items
if item_count >= 5
puts "Stopping stream - got enough items"
controller.abort
break
end
end
# Or cancel after a timeout (in a separate thread)
Thread.new do
sleep(5)
controller.abort
puts "Stream cancelled due to timeout"
end
// Cancel a stream after a timeout let token = CancellationToken::new_with_timeout(Duration::from_secs(5));
let mut stream = B.ExtractReceiptInfo .with_cancellation_token(Some(token)) .stream(receipt) .unwrap();
for partial in stream.partials() { match partial { Ok(partial) => { println!("Received {} items so far", partial.items.len());
// Stop consuming the stream if we have enough items
if partial.items.len() >= 5 {
println!("Stopping stream - got enough items");
break;
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
</Tab>
</Tabs>
### Common Streaming Cancellation Patterns
#### User-Initiated Cancellation
Allow users to stop streaming generation with a "Stop" button:
<Tabs>
<Tab title="React" language="react">
```tsx
function StreamingComponent() {
const [controller, setController] = useState<AbortController | null>(null)
const [isStreaming, setIsStreaming] = useState(false)
const [result, setResult] = useState("")
const startStreaming = async () => {
const newController = new AbortController()
setController(newController)
setIsStreaming(true)
try {
const stream = b.stream.GenerateContent(prompt, {
abortController: newController
})
let accumulated = ""
for await (const partial of stream) {
accumulated = partial.content || ""
setResult(accumulated)
}
} catch (error) {
if (error.name === 'BamlAbortError') {
console.log('Stream cancelled by user')
}
} finally {
setIsStreaming(false)
setController(null)
}
}
const stopStreaming = () => {
controller?.abort()
}
return (
<div>
<button onClick={startStreaming} disabled={isStreaming}>
Start Streaming
</button>
<button onClick={stopStreaming} disabled={!isStreaming}>
Stop
</button>
<div>{result}</div>
</div>
)
}
app = FastAPI() active_streams = {}
@app.post("/stream/{stream_id}") async def start_stream(stream_id: str, prompt: str): controller = AbortController() active_streams[stream_id] = controller
async def generate():
try:
stream = b.stream.GenerateContent(
prompt,
baml_options={"abort_controller": controller}
)
async for partial in stream:
if controller.aborted:
break
yield f"data: {partial.content}\n\n"
except BamlAbortError:
yield "data: [CANCELLED]\n\n"
finally:
active_streams.pop(stream_id, None)
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/stop/{stream_id}") async def stop_stream(stream_id: str): if controller := active_streams.get(stream_id): controller.abort() return {"status": "stopped"} return {"status": "not found"}
</Tab>
</Tabs>
For more examples and patterns, see the [Abort Controllers guide](/guide/baml-basics/abort-signal).
## Semantic Streaming
BAML provides powerful attributes to control how your data streams, ensuring that partial values always maintain semantic validity. Here are the three key streaming attributes:
### `@stream.done`
This attribute ensures a type or field is only streamed when it's completely finished. It's useful when you need atomic, fully-formed values.
For example:
```baml
class ReceiptItem {
name string
quantity int
price float
// The entire ReceiptItem will only stream when complete
@@stream.done
}
// Receipts is a list of ReceiptItems,
// each internal item will only stream when complete
type Receipts = ReceiptItem[]
class Person {
// Name will only appear when fully complete,
// until then it will be null
name string @stream.done
// Numbers (floats and ints) will only appear
// when fully complete by default
age int
// Bio will stream token by token
bio string
}
A common pattern is streaming a list of items where each item can be one of
several types (e.g. tool calls and messages). You can use @stream.done on
the list element type to ensure each item only appears once it's fully complete:
class ToolCall {
name string
parameters string
}
class Message {
role string
content string
}
type OutputItem = ToolCall | Message
// Each list element appears only when fully complete.
// The list grows incrementally as items finish.
function Run(input: string) -> (OutputItem @stream.done)[] {
client MyClient
prompt #"
{{ input }}
{{ ctx.output_format }}
"#
}
When @stream.done is applied to a union type, it propagates to all variants.
This means you don't need to add @@stream.done to each class individually —
annotating the union is sufficient.
@stream.not_nullThis attribute ensures a containing object is only streamed when this field has a value. It's particularly useful for discriminator fields or required metadata.
For example:
class Message {
// Message won't stream until type is known
type "error" | "success" | "info" @stream.not_null
// Timestamp will only appear when fully complete
// until then it will be null
timestamp string @stream.done
// Content can stream token by token
content string
}
@stream.with_stateThis attribute adds metadata to track if a field has finished streaming. It's perfect for showing loading states in UIs.
For example:
class BlogPost {
// The blog post will only stream when title is known
title string @stream.done @stream.not_null
// The content will stream token by token, and include completion state
content string @stream.with_state
}
This will generate the following code in the partial_types module:
<Tabs>
<Tab title="Python" language="python">
class StreamState(BaseModel, Generic[T]):
value: T,
state: "incomplete" | "complete"
class BlogPost(BaseModel):
title: str
content: StreamState[str | None]
interface BlogPost { title: StreamState<string> content: StreamState<string> }
</Tab>
</Tabs>
### Type Transformation Summary
Here's how these attributes affect your types in generated code:
| BAML Type | Generated Type (during streaming) | Description |
|----------------------------------|----------------------------|------------------------------------------------|
| `T` | `Partial[T]?` | Default: Nullable and partial |
| `T @stream.done` | `T?` | Nullable but always complete when present |
| `T @stream.not_null` | `Partial[T]` | Always present but may be partial |
| `T @stream.done @stream.not_null` | `T` | Always present and always complete |
| `T @stream.with_state` | `StreamState[Partial[T]?]` | Includes streaming state metadata |
<Warning>
The return type of a function is not affected by streaming attributes!
</Warning>
## Putting it all together
Let's put all of these concepts together to design an application that
streams a conversation containing stock recommendations, using semantic
streaming to ensure that the streamed data obeys our domain's invariants.
```baml
enum Stock {
APPL
MSFT
GOOG
BAML
}
// Make recommendations atomic - we do not want a recommendation to be
// modified by streaming additional messages.
class Recommendation {
stock Stock
amount float
action "buy" | "sell"
@@stream.done
}
class AssistantMessage {
message_type "greeting" | "conversation" | "farewell" @stream.not_null
message string @stream.with_state @stream.not_null
}
function Respond(
history: (UserMessage | AssistantMessage | Recommendation)[]
) -> Message | Recommendation {
client DeepseekR1
prompt #"
Make the message in the conversation, using a conversational
message or a stock recommendation, based on this conversation history:
{{ history }}.
{{ ctx.output_format }}
"#
}
Recommendation does not have any partial fields because it was marked
@stream.done.Message.message string is wrapped in StreamState, allowing
runtime checking of its completion status. This status could be used
to render a spinner as the message streams in.Message.message_type field may not be null, because it was marked
as @stream.not_null.class StreamState(BaseModel, Generic[T]):
value: T,
state: Literal["Pending", "Incomplete", "Complete"]
class Stock(str, Enum):
APPL = "APPL"
MSFT = "MSFT"
GOOG = "GOOG"
BAML = "BAML"
class Recommendation(BaseClass):
stock: Stock
amount: float
action: Literal["buy", "sell"]
class Message(BaseClass):
message_type: Literal["gretting","conversation","farewell"]
message: StreamState[string]
Recommendation does not have any partial fields because it was marked
@stream.done.Message.message string is wrapped in StreamState, allowing
runtime checking of its completion status. This status could be used
to render a spinner as the message streams in.Message.message_type field may not be null, because it was marked
as @stream.not_null.export interface StreamState<T> {
value: T,
state: "Pending" | "Incomplete" | "Complete"
}
export enum Category {
APPL = "APPl",
MSFT = "MSFT",
GOOG = "GOOG",
BAML = "BAML",
}
export interface Recommendation {
stock: Stock,
amount: float,
action: "buy" | "sell"
}
export interface Message {
message_type: "gretting" | "conversation" | "farewell"
message: StreamState<string>
}