fern/01-guide/04-baml-basics/concurrent-calls.mdx
We’ll use function ClassifyMessage(input: string) -> Category for our example:
function ClassifyMessage(input: string) -> Category { client GPT4o prompt #" Classify the following INPUT into ONE of the following categories:
INPUT: {{ input }}
{{ ctx.output_format }}
Response:
"# }
</Accordion>
<Tabs>
<Tab title="Python" language="python">
You can make concurrent `b.ClassifyMessage()` calls like so:
```python main.py
import asyncio
from baml_client.async_client import b
from baml_client.types import Category
async def main():
await asyncio.gather(
b.ClassifyMessage("I want to cancel my order"),
b.ClassifyMessage("I want a refund")
)
if __name__ == '__main__':
asyncio.run(main())
You can make concurrent b.ClassifyMessage() calls like so:
import { b } from './baml_client'
import { Category } from './baml_client/types'
import assert from 'assert'
const main = async () => {
const category = await Promise.all(
b.ClassifyMessage('I want to cancel my order'),
b.ClassifyMessage('I want a refund'),
)
}
if (require.main === module) {
main()
}
You can make concurrent b.ClassifyMessage() calls using goroutines:
package main
import (
"context"
"sync"
b "example.com/myproject/baml_client"
"example.com/myproject/baml_client/types"
)
func main() {
ctx := context.Background()
var wg sync.WaitGroup
results := make(chan types.Category, 2)
// Launch concurrent goroutines
wg.Add(2)
go func() {
defer wg.Done()
result, err := b.ClassifyMessage(ctx, "I want to cancel my order")
if err == nil {
results <- result
}
}()
go func() {
defer wg.Done()
result, err := b.ClassifyMessage(ctx, "I want a refund")
if err == nil {
results <- result
}
}()
wg.Wait()
close(results)
// Collect results
for result := range results {
// Handle each result
_ = result
}
}
BAML Ruby (beta) does not currently support async/concurrent calls.
Please contact us if this is something you need.
</Tab> <Tab title="Rust" language="rust">You can make concurrent B.ClassifyMessage.call() calls using threads:
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
use std::thread;
fn main() {
let handles: Vec<_> = vec![
"I want to cancel my order",
"I want a refund",
]
.into_iter()
.map(|msg| {
let msg = msg.to_string();
thread::spawn(move || B.ClassifyMessage.call(&msg))
})
.collect();
let results: Vec<Category> = handles
.into_iter()
.map(|h| h.join().unwrap().unwrap())
.collect();
}
When running multiple operations in parallel, you can use abort controllers to cancel them all at once or individually.
Use a single abort controller to cancel all parallel operations:
<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript import { b } from './baml_client'const controller = new AbortController()
// Start multiple operations with the same controller const promises = [ b.ClassifyMessage('I want to cancel my order', { abortController: controller }), b.ClassifyMessage('I want a refund', { abortController: controller }), b.ClassifyMessage('Is my package shipped?', { abortController: controller }) ]
// Cancel all operations after 2 seconds setTimeout(() => { controller.abort() console.log('All operations cancelled') }, 2000)
try { const results = await Promise.all(promises) console.log('All completed:', results) } catch (error) { if (error.name === 'BamlAbortError') { console.log('Operations were cancelled') } }
</Tab>
<Tab title="Python" language="python">
```python
import asyncio
from baml_client.async_client import b
from baml_py import AbortController, BamlAbortError
async def main():
controller = AbortController()
# Start multiple operations with the same controller
tasks = [
b.ClassifyMessage(
'I want to cancel my order',
baml_options={"abort_controller": controller}
),
b.ClassifyMessage(
'I want a refund',
baml_options={"abort_controller": controller}
),
b.ClassifyMessage(
'Is my package shipped?',
baml_options={"abort_controller": controller}
)
]
# Cancel all operations after 2 seconds
async def cancel_after_timeout():
await asyncio.sleep(2)
controller.abort()
print('All operations cancelled')
asyncio.create_task(cancel_after_timeout())
try:
results = await asyncio.gather(*tasks)
print('All completed:', results)
except BamlAbortError:
print('Operations were cancelled')
import ( "context" "fmt" "sync" "time"
b "example.com/myproject/baml_client"
)
func main() { // Create a cancellable context ctx, cancel := context.WithCancel(context.Background())
// Cancel all operations after 2 seconds
go func() {
time.Sleep(2 * time.Second)
cancel()
fmt.Println("All operations cancelled")
}()
var wg sync.WaitGroup
messages := []string{
"I want to cancel my order",
"I want a refund",
"Is my package shipped?",
}
for _, msg := range messages {
wg.Add(1)
go func(message string) {
defer wg.Done()
result, err := b.ClassifyMessage(ctx, message)
if err != nil {
if err == context.Canceled {
fmt.Printf("Cancelled: %s\n", message)
}
return
}
fmt.Printf("Completed: %s -> %v\n", message, result)
}(msg)
}
wg.Wait()
}
</Tab>
<Tab title="Rust" language="rust">
```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use std::thread;
use std::time::Duration;
fn main() {
// Create a shared cancellation token
let token = CancellationToken::new();
let token_for_cancel = token.clone();
// Cancel all operations after 2 seconds
thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
token_for_cancel.cancel();
println!("All operations cancelled");
});
let messages = vec![
"I want to cancel my order",
"I want a refund",
"Is my package shipped?",
];
let handles: Vec<_> = messages
.into_iter()
.map(|msg| {
let t = token.clone();
thread::spawn(move || {
let result = B.ClassifyMessage
.with_cancellation_token(Some(t))
.call(msg);
match result {
Ok(category) => println!("Completed: {} -> {:?}", msg, category),
Err(e) => println!("Cancelled: {}", msg),
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
Use separate controllers to cancel operations independently:
<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript const controllers = [ new AbortController(), new AbortController(), new AbortController() ]const promises = [ b.ClassifyMessage('I want to cancel my order', { abortController: controllers[0] }), b.ClassifyMessage('I want a refund', { abortController: controllers[1] }), b.ClassifyMessage('Is my package shipped?', { abortController: controllers[2] }) ]
// Cancel only the second operation controllers[1].abort()
const results = await Promise.allSettled(promises)
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(Operation ${index} completed:, result.value)
} else {
console.log(Operation ${index} failed:, result.reason.message)
}
})
</Tab>
<Tab title="Python" language="python">
```python
controllers = [
AbortController(),
AbortController(),
AbortController()
]
tasks = [
b.ClassifyMessage(
'I want to cancel my order',
baml_options={"abort_controller": controllers[0]}
),
b.ClassifyMessage(
'I want a refund',
baml_options={"abort_controller": controllers[1]}
),
b.ClassifyMessage(
'Is my package shipped?',
baml_options={"abort_controller": controllers[2]}
)
]
# Cancel only the second operation
controllers[1].abort()
# Use gather with return_exceptions to handle partial failures
results = await asyncio.gather(*tasks, return_exceptions=True)
for index, result in enumerate(results):
if isinstance(result, Exception):
print(f"Operation {index} failed: {result}")
else:
print(f"Operation {index} completed: {result}")
Race multiple LLM providers and cancel slower ones when the fastest completes. This pattern is useful for optimizing latency by using whichever provider responds first.
<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript import { ClientRegistry } from '@boundaryml/baml'async function fastestProviderWins(message: string) { const controllers = [ new AbortController(), new AbortController(), new AbortController() ]
// Create separate client registries for each provider const openaiRegistry = new ClientRegistry() openaiRegistry.addLlmClient('OpenAI', 'openai', { model: 'gpt-5-mini', api_key: process.env.OPENAI_API_KEY }) openaiRegistry.setPrimary('OpenAI')
const anthropicRegistry = new ClientRegistry() anthropicRegistry.addLlmClient('Anthropic', 'anthropic', { model: 'claude-3-5-haiku-20241022', api_key: process.env.ANTHROPIC_API_KEY }) anthropicRegistry.setPrimary('Anthropic')
const geminiRegistry = new ClientRegistry() geminiRegistry.addLlmClient('Gemini', 'vertex-ai', { model: 'gemini-2.5-flash', location: 'us-central1', credentials: process.env.GOOGLE_APPLICATION_CREDENTIALS }) geminiRegistry.setPrimary('Gemini')
const promises = [ b.ClassifyMessage(message, { clientRegistry: openaiRegistry, abortController: controllers[0] }), b.ClassifyMessage(message, { clientRegistry: anthropicRegistry, abortController: controllers[1] }), b.ClassifyMessage(message, { clientRegistry: geminiRegistry, abortController: controllers[2] }) ]
try { // Wait for the first to complete const result = await Promise.race(promises)
// Cancel the others
controllers.forEach(c => c.abort())
return result
} catch (error) { // All failed - cancel any still running controllers.forEach(c => c.abort()) throw error } }
</Tab>
<Tab title="Python" language="python">
```python
import os
from baml_py import ClientRegistry
async def fastest_provider_wins(message: str):
controllers = [
AbortController(),
AbortController(),
AbortController()
]
# Create separate client registries for each provider
openai_registry = ClientRegistry()
openai_registry.add_llm_client('OpenAI', 'openai', {
'model': 'gpt-5-mini',
'api_key': os.environ.get('OPENAI_API_KEY')
})
openai_registry.set_primary('OpenAI')
anthropic_registry = ClientRegistry()
anthropic_registry.add_llm_client('Anthropic', 'anthropic', {
'model': 'claude-3-5-haiku-20241022',
'api_key': os.environ.get('ANTHROPIC_API_KEY')
})
anthropic_registry.set_primary('Anthropic')
gemini_registry = ClientRegistry()
gemini_registry.add_llm_client('Gemini', 'vertex-ai', {
'model': 'gemini-2.5-flash',
'location': 'us-central1',
'credentials': os.environ.get('GOOGLE_APPLICATION_CREDENTIALS_CONTENT')
})
gemini_registry.set_primary('Gemini')
# Create tasks
tasks = [
asyncio.create_task(
b.ClassifyMessage(message, baml_options={
'client_registry': openai_registry,
'abort_controller': controllers[0]
})
),
asyncio.create_task(
b.ClassifyMessage(message, baml_options={
'client_registry': anthropic_registry,
'abort_controller': controllers[1]
})
),
asyncio.create_task(
b.ClassifyMessage(message, baml_options={
'client_registry': gemini_registry,
'abort_controller': controllers[2]
})
)
]
try:
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel the others
for controller in controllers:
controller.abort()
# Cancel pending tasks
for task in pending:
task.cancel()
# Get result from completed task
result = done.pop().result()
return result
except Exception as e:
# Cancel all on error
for controller in controllers:
controller.abort()
for task in tasks:
if not task.done():
task.cancel()
raise
b "example.com/myproject/baml_client"
)
func fastestProviderWins(message string) (interface{}, error) { // Create separate contexts for each provider ctx1, cancel1 := context.WithCancel(context.Background()) ctx2, cancel2 := context.WithCancel(context.Background()) ctx3, cancel3 := context.WithCancel(context.Background())
// Defer cleanup
defer cancel1()
defer cancel2()
defer cancel3()
// Create client registries for each provider
openaiRegistry, _ := b.NewClientRegistry()
openaiRegistry.AddLlmClient("OpenAI", "openai", map[string]interface{}{
"model": "gpt-5-mini",
"api_key": os.Getenv("OPENAI_API_KEY"),
})
openaiRegistry.SetPrimary("OpenAI")
anthropicRegistry, _ := b.NewClientRegistry()
anthropicRegistry.AddLlmClient("Anthropic", "anthropic", map[string]interface{}{
"model": "claude-3-5-haiku-20241022",
"api_key": os.Getenv("ANTHROPIC_API_KEY"),
})
anthropicRegistry.SetPrimary("Anthropic")
geminiRegistry, _ := b.NewClientRegistry()
geminiRegistry.AddLlmClient("Gemini", "vertex-ai", map[string]interface{}{
"model": "gemini-2.5-flash",
"location": "us-central1",
"credentials": os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"),
})
geminiRegistry.SetPrimary("Gemini")
type result struct {
data interface{}
err error
provider string
}
resultChan := make(chan result, 3)
var wg sync.WaitGroup
wg.Add(3)
// Launch goroutines for each provider
go func() {
defer wg.Done()
data, err := b.ClassifyMessage(ctx1, message, b.WithClientRegistry(openaiRegistry))
resultChan <- result{data: data, err: err, provider: "OpenAI"}
}()
go func() {
defer wg.Done()
data, err := b.ClassifyMessage(ctx2, message, b.WithClientRegistry(anthropicRegistry))
resultChan <- result{data: data, err: err, provider: "Anthropic"}
}()
go func() {
defer wg.Done()
data, err := b.ClassifyMessage(ctx3, message, b.WithClientRegistry(geminiRegistry))
resultChan <- result{data: data, err: err, provider: "Gemini"}
}()
// Wait for first successful result
go func() {
wg.Wait()
close(resultChan)
}()
// Get first result and cancel others
firstResult := <-resultChan
// Cancel all contexts to stop other operations
cancel1()
cancel2()
cancel3()
if firstResult.err != nil {
// If first failed, try to get another result
select {
case secondResult := <-resultChan:
if secondResult.err == nil {
fmt.Printf("Provider %s won\n", secondResult.provider)
return secondResult.data, nil
}
default:
// No more results
}
return nil, firstResult.err
}
fmt.Printf("Provider %s won\n", firstResult.provider)
return firstResult.data, nil
}
</Tab>
<Tab title="Rust" language="rust">
```rust
use baml::{CancellationToken, ClientRegistry};
use myproject::baml_client::sync_client::B;
use std::collections::HashMap;
use std::sync::mpsc;
use std::thread;
fn fastest_provider_wins(message: &str) -> Result<String, String> {
let (tx, rx) = mpsc::channel();
let cancel_token = CancellationToken::new();
// OpenAI
let msg = message.to_string();
let tx1 = tx.clone();
let t1 = cancel_token.clone();
thread::spawn(move || {
let mut registry = ClientRegistry::new();
let mut options = HashMap::new();
options.insert("model".to_string(), serde_json::json!("gpt-5-mini"));
registry.add_llm_client("OpenAI", "openai", options);
registry.set_primary_client("OpenAI");
if let Ok(result) = B.ClassifyMessage
.with_client_registry(®istry)
.with_cancellation_token(Some(t1))
.call(&msg)
{
let _ = tx1.send(("OpenAI", result));
}
});
// Anthropic
let msg = message.to_string();
let tx2 = tx.clone();
let t2 = cancel_token.clone();
thread::spawn(move || {
let mut registry = ClientRegistry::new();
let mut options = HashMap::new();
options.insert("model".to_string(), serde_json::json!("claude-3-5-haiku-20241022"));
registry.add_llm_client("Anthropic", "anthropic", options);
registry.set_primary_client("Anthropic");
if let Ok(result) = B.ClassifyMessage
.with_client_registry(®istry)
.with_cancellation_token(Some(t2))
.call(&msg)
{
let _ = tx2.send(("Anthropic", result));
}
});
drop(tx);
// Wait for the first result and cancel slower operations
match rx.recv() {
Ok((provider, result)) => {
cancel_token.cancel();
println!("Provider {} won", provider);
Ok(format!("{:?}", result))
}
Err(_) => Err("All providers failed".to_string()),
}
}
Set automatic timeouts to prevent operations from running indefinitely:
<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript async function classifyWithTimeout(messages: string[], timeoutMs: number = 5000) { const controller = new AbortController()// Set timeout for all operations const timeoutId = setTimeout(() => { controller.abort() }, timeoutMs)
try { const promises = messages.map(msg => b.ClassifyMessage(msg, { abortController: controller }) )
const results = await Promise.all(promises)
clearTimeout(timeoutId)
return results
} catch (error) {
clearTimeout(timeoutId)
if (error.name === 'BamlAbortError') {
throw new Error(Operations timed out after ${timeoutMs}ms)
}
throw error
}
}
</Tab>
<Tab title="Python" language="python">
```python
import asyncio
from baml_py import AbortController
async def classify_with_timeout(messages: list[str], timeout_seconds: float = 5):
controller = AbortController()
async def timeout_task():
await asyncio.sleep(timeout_seconds)
controller.abort()
# Start timeout
timeout = asyncio.create_task(timeout_task())
try:
tasks = [
b.ClassifyMessage(msg, baml_options={"abort_controller": controller})
for msg in messages
]
results = await asyncio.gather(*tasks)
timeout.cancel()
return results
except BamlAbortError:
raise TimeoutError(f"Operations timed out after {timeout_seconds}s")
except Exception:
timeout.cancel()
raise
results := make([]types.Category, len(messages))
errors := make([]error, len(messages))
var wg sync.WaitGroup
for i, msg := range messages {
wg.Add(1)
go func(index int, message string) {
defer wg.Done()
result, err := b.ClassifyMessage(ctx, message)
results[index] = result
errors[index] = err
}(i, msg)
}
wg.Wait()
// Check for errors
for i, err := range errors {
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("operations timed out after %v", timeout)
}
return nil, fmt.Errorf("message %d failed: %w", i, err)
}
}
return results, nil
}
</Tab>
<Tab title="Rust" language="rust">
```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
use std::thread;
use std::time::Duration;
fn classify_with_timeout(
messages: &[&str],
timeout: Duration,
) -> Result<Vec<Category>, String> {
let token = CancellationToken::new_with_timeout(timeout);
let handles: Vec<_> = messages
.iter()
.map(|msg| {
let t = token.clone();
let msg = msg.to_string();
thread::spawn(move || {
B.ClassifyMessage
.with_cancellation_token(Some(t))
.call(&msg)
})
})
.collect();
let mut results = Vec::new();
for handle in handles {
match handle.join().unwrap() {
Ok(result) => results.push(result),
Err(e) => return Err(format!("Classification failed: {}", e)),
}
}
Ok(results)
}
Process items in batches with the ability to cancel remaining batches:
<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript async function processBatches<T, R>( items: T[], batchSize: number, processor: (item: T, controller: AbortController) => Promise<R> ): Promise<R[]> { const results: R[] = [] const masterController = new AbortController()try { for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize)
// Check if we should stop
if (masterController.signal.aborted) {
throw new Error('Batch processing cancelled')
}
// Process batch in parallel
const batchPromises = batch.map(item =>
processor(item, masterController)
)
const batchResults = await Promise.all(batchPromises)
results.push(...batchResults)
console.log(`Completed batch ${Math.floor(i / batchSize) + 1}`)
}
return results
} catch (error) { masterController.abort() throw error } }
// Usage const messages = ['message1', 'message2', 'message3', /.../] const results = await processBatches( messages, 5, // batch size (msg, controller) => b.ClassifyMessage(msg, { abortController: controller }) )
</Tab>
<Tab title="Python" language="python">
```python
async def process_batches(items, batch_size, processor):
results = []
master_controller = AbortController()
try:
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Check if we should stop
if master_controller.aborted:
raise Exception('Batch processing cancelled')
# Process batch in parallel
batch_tasks = [
processor(item, master_controller)
for item in batch
]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
print(f"Completed batch {i // batch_size + 1}")
return results
except Exception as e:
master_controller.abort()
raise
# Usage
messages = ['message1', 'message2', 'message3']
results = await process_batches(
messages,
5, # batch size
lambda msg, ctrl: b.ClassifyMessage(msg, baml_options={"abort_controller": ctrl})
)
For basic abort controller usage and error handling, see the Abort Controllers guide.