Back to Baml

Concurrent function calls

fern/01-guide/04-baml-basics/concurrent-calls.mdx

0.222.024.8 KB
Original Source

We’ll use function ClassifyMessage(input: string) -> Category for our example:

<Accordion title="classify-message.baml"> ```baml enum Category { Refund CancelOrder TechnicalSupport AccountIssue Question }

function ClassifyMessage(input: string) -> Category { client GPT4o prompt #" Classify the following INPUT into ONE of the following categories:

INPUT: {{ input }}

{{ ctx.output_format }}

Response:

"# }

</Accordion>

<Tabs>
<Tab title="Python" language="python">

You can make concurrent `b.ClassifyMessage()` calls like so:

```python main.py
import asyncio

from baml_client.async_client import b
from baml_client.types import Category

async def main():
    await asyncio.gather(
        b.ClassifyMessage("I want to cancel my order"),
        b.ClassifyMessage("I want a refund")
    )

if __name__ == '__main__':
    asyncio.run(main())
</Tab> <Tab title="TypeScript" language="typescript">

You can make concurrent b.ClassifyMessage() calls like so:

ts
import { b } from './baml_client'
import { Category } from './baml_client/types'
import assert from 'assert'

const main = async () => {
  const category = await Promise.all(
    b.ClassifyMessage('I want to cancel my order'),
    b.ClassifyMessage('I want a refund'),
  )
}

if (require.main === module) {
  main()
}

</Tab> <Tab title="Go" language="go">

You can make concurrent b.ClassifyMessage() calls using goroutines:

go
package main

import (
    "context"
    "sync"

    b "example.com/myproject/baml_client"
    "example.com/myproject/baml_client/types"
)

func main() {
    ctx := context.Background()
    
    var wg sync.WaitGroup
    results := make(chan types.Category, 2)
    
    // Launch concurrent goroutines
    wg.Add(2)
    
    go func() {
        defer wg.Done()
        result, err := b.ClassifyMessage(ctx, "I want to cancel my order")
        if err == nil {
            results <- result
        }
    }()
    
    go func() {
        defer wg.Done()
        result, err := b.ClassifyMessage(ctx, "I want a refund")
        if err == nil {
            results <- result
        }
    }()
    
    wg.Wait()
    close(results)
    
    // Collect results
    for result := range results {
        // Handle each result
        _ = result
    }
}
</Tab> <Tab title="Ruby (beta)" language="ruby">

BAML Ruby (beta) does not currently support async/concurrent calls.

Please contact us if this is something you need.

</Tab> <Tab title="Rust" language="rust">

You can make concurrent B.ClassifyMessage.call() calls using threads:

rust
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
use std::thread;

fn main() {
    let handles: Vec<_> = vec![
        "I want to cancel my order",
        "I want a refund",
    ]
    .into_iter()
    .map(|msg| {
        let msg = msg.to_string();
        thread::spawn(move || B.ClassifyMessage.call(&msg))
    })
    .collect();

    let results: Vec<Category> = handles
        .into_iter()
        .map(|h| h.join().unwrap().unwrap())
        .collect();
}
</Tab> </Tabs>

Cancelling Parallel Operations

When running multiple operations in parallel, you can use abort controllers to cancel them all at once or individually.

Cancel All Operations

Use a single abort controller to cancel all parallel operations:

<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript import { b } from './baml_client'

const controller = new AbortController()

// Start multiple operations with the same controller const promises = [ b.ClassifyMessage('I want to cancel my order', { abortController: controller }), b.ClassifyMessage('I want a refund', { abortController: controller }), b.ClassifyMessage('Is my package shipped?', { abortController: controller }) ]

// Cancel all operations after 2 seconds setTimeout(() => { controller.abort() console.log('All operations cancelled') }, 2000)

try { const results = await Promise.all(promises) console.log('All completed:', results) } catch (error) { if (error.name === 'BamlAbortError') { console.log('Operations were cancelled') } }

</Tab>

<Tab title="Python" language="python">
```python
import asyncio
from baml_client.async_client import b
from baml_py import AbortController, BamlAbortError

async def main():
    controller = AbortController()
    
    # Start multiple operations with the same controller
    tasks = [
        b.ClassifyMessage(
            'I want to cancel my order',
            baml_options={"abort_controller": controller}
        ),
        b.ClassifyMessage(
            'I want a refund',
            baml_options={"abort_controller": controller}
        ),
        b.ClassifyMessage(
            'Is my package shipped?',
            baml_options={"abort_controller": controller}
        )
    ]
    
    # Cancel all operations after 2 seconds
    async def cancel_after_timeout():
        await asyncio.sleep(2)
        controller.abort()
        print('All operations cancelled')
    
    asyncio.create_task(cancel_after_timeout())
    
    try:
        results = await asyncio.gather(*tasks)
        print('All completed:', results)
    except BamlAbortError:
        print('Operations were cancelled')
</Tab> <Tab title="Go" language="go"> ```go package main

import ( "context" "fmt" "sync" "time"

b "example.com/myproject/baml_client"

)

func main() { // Create a cancellable context ctx, cancel := context.WithCancel(context.Background())

// Cancel all operations after 2 seconds
go func() {
    time.Sleep(2 * time.Second)
    cancel()
    fmt.Println("All operations cancelled")
}()

var wg sync.WaitGroup
messages := []string{
    "I want to cancel my order",
    "I want a refund",
    "Is my package shipped?",
}

for _, msg := range messages {
    wg.Add(1)
    go func(message string) {
        defer wg.Done()
        result, err := b.ClassifyMessage(ctx, message)
        if err != nil {
            if err == context.Canceled {
                fmt.Printf("Cancelled: %s\n", message)
            }
            return
        }
        fmt.Printf("Completed: %s -> %v\n", message, result)
    }(msg)
}

wg.Wait()

}

</Tab>

<Tab title="Rust" language="rust">
```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use std::thread;
use std::time::Duration;

fn main() {
    // Create a shared cancellation token
    let token = CancellationToken::new();
    let token_for_cancel = token.clone();

    // Cancel all operations after 2 seconds
    thread::spawn(move || {
        thread::sleep(Duration::from_secs(2));
        token_for_cancel.cancel();
        println!("All operations cancelled");
    });

    let messages = vec![
        "I want to cancel my order",
        "I want a refund",
        "Is my package shipped?",
    ];

    let handles: Vec<_> = messages
        .into_iter()
        .map(|msg| {
            let t = token.clone();
            thread::spawn(move || {
                let result = B.ClassifyMessage
                    .with_cancellation_token(Some(t))
                    .call(msg);
                match result {
                    Ok(category) => println!("Completed: {} -> {:?}", msg, category),
                    Err(e) => println!("Cancelled: {}", msg),
                }
            })
        })
        .collect();

    for handle in handles {
        handle.join().unwrap();
    }
}
</Tab> </Tabs>

Cancel Individual Operations

Use separate controllers to cancel operations independently:

<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript const controllers = [ new AbortController(), new AbortController(), new AbortController() ]

const promises = [ b.ClassifyMessage('I want to cancel my order', { abortController: controllers[0] }), b.ClassifyMessage('I want a refund', { abortController: controllers[1] }), b.ClassifyMessage('Is my package shipped?', { abortController: controllers[2] }) ]

// Cancel only the second operation controllers[1].abort()

const results = await Promise.allSettled(promises) results.forEach((result, index) => { if (result.status === 'fulfilled') { console.log(Operation ${index} completed:, result.value) } else { console.log(Operation ${index} failed:, result.reason.message) } })

</Tab>

<Tab title="Python" language="python">
```python
controllers = [
    AbortController(),
    AbortController(),
    AbortController()
]

tasks = [
    b.ClassifyMessage(
        'I want to cancel my order',
        baml_options={"abort_controller": controllers[0]}
    ),
    b.ClassifyMessage(
        'I want a refund',
        baml_options={"abort_controller": controllers[1]}
    ),
    b.ClassifyMessage(
        'Is my package shipped?',
        baml_options={"abort_controller": controllers[2]}
    )
]

# Cancel only the second operation
controllers[1].abort()

# Use gather with return_exceptions to handle partial failures
results = await asyncio.gather(*tasks, return_exceptions=True)
for index, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Operation {index} failed: {result}")
    else:
        print(f"Operation {index} completed: {result}")
</Tab> </Tabs>

Fastest Request Wins

Race multiple LLM providers and cancel slower ones when the fastest completes. This pattern is useful for optimizing latency by using whichever provider responds first.

<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript import { ClientRegistry } from '@boundaryml/baml'

async function fastestProviderWins(message: string) { const controllers = [ new AbortController(), new AbortController(), new AbortController() ]

// Create separate client registries for each provider const openaiRegistry = new ClientRegistry() openaiRegistry.addLlmClient('OpenAI', 'openai', { model: 'gpt-5-mini', api_key: process.env.OPENAI_API_KEY }) openaiRegistry.setPrimary('OpenAI')

const anthropicRegistry = new ClientRegistry() anthropicRegistry.addLlmClient('Anthropic', 'anthropic', { model: 'claude-3-5-haiku-20241022', api_key: process.env.ANTHROPIC_API_KEY }) anthropicRegistry.setPrimary('Anthropic')

const geminiRegistry = new ClientRegistry() geminiRegistry.addLlmClient('Gemini', 'vertex-ai', { model: 'gemini-2.5-flash', location: 'us-central1', credentials: process.env.GOOGLE_APPLICATION_CREDENTIALS }) geminiRegistry.setPrimary('Gemini')

const promises = [ b.ClassifyMessage(message, { clientRegistry: openaiRegistry, abortController: controllers[0] }), b.ClassifyMessage(message, { clientRegistry: anthropicRegistry, abortController: controllers[1] }), b.ClassifyMessage(message, { clientRegistry: geminiRegistry, abortController: controllers[2] }) ]

try { // Wait for the first to complete const result = await Promise.race(promises)

// Cancel the others
controllers.forEach(c => c.abort())

return result

} catch (error) { // All failed - cancel any still running controllers.forEach(c => c.abort()) throw error } }

</Tab>

<Tab title="Python" language="python">
```python
import os
from baml_py import ClientRegistry

async def fastest_provider_wins(message: str):
    controllers = [
        AbortController(),
        AbortController(),
        AbortController()
    ]
    
    # Create separate client registries for each provider
    openai_registry = ClientRegistry()
    openai_registry.add_llm_client('OpenAI', 'openai', {
        'model': 'gpt-5-mini',
        'api_key': os.environ.get('OPENAI_API_KEY')
    })
    openai_registry.set_primary('OpenAI')
    
    anthropic_registry = ClientRegistry()
    anthropic_registry.add_llm_client('Anthropic', 'anthropic', {
        'model': 'claude-3-5-haiku-20241022',
        'api_key': os.environ.get('ANTHROPIC_API_KEY')
    })
    anthropic_registry.set_primary('Anthropic')
    
    gemini_registry = ClientRegistry()
    gemini_registry.add_llm_client('Gemini', 'vertex-ai', {
        'model': 'gemini-2.5-flash',
        'location': 'us-central1',
        'credentials': os.environ.get('GOOGLE_APPLICATION_CREDENTIALS_CONTENT')
    })
    gemini_registry.set_primary('Gemini')
    
    # Create tasks
    tasks = [
        asyncio.create_task(
            b.ClassifyMessage(message, baml_options={
                'client_registry': openai_registry,
                'abort_controller': controllers[0]
            })
        ),
        asyncio.create_task(
            b.ClassifyMessage(message, baml_options={
                'client_registry': anthropic_registry,
                'abort_controller': controllers[1]
            })
        ),
        asyncio.create_task(
            b.ClassifyMessage(message, baml_options={
                'client_registry': gemini_registry,
                'abort_controller': controllers[2]
            })
        )
    ]
    
    try:
        # Wait for first to complete
        done, pending = await asyncio.wait(
            tasks,
            return_when=asyncio.FIRST_COMPLETED
        )
        
        # Cancel the others
        for controller in controllers:
            controller.abort()
        
        # Cancel pending tasks
        for task in pending:
            task.cancel()
        
        # Get result from completed task
        result = done.pop().result()
        return result
        
    except Exception as e:
        # Cancel all on error
        for controller in controllers:
            controller.abort()
        for task in tasks:
            if not task.done():
                task.cancel()
        raise
</Tab> <Tab title="Go" language="go"> ```go import ( "context" "fmt" "os" "sync"
b "example.com/myproject/baml_client"

)

func fastestProviderWins(message string) (interface{}, error) { // Create separate contexts for each provider ctx1, cancel1 := context.WithCancel(context.Background()) ctx2, cancel2 := context.WithCancel(context.Background()) ctx3, cancel3 := context.WithCancel(context.Background())

// Defer cleanup
defer cancel1()
defer cancel2()
defer cancel3()

// Create client registries for each provider
openaiRegistry, _ := b.NewClientRegistry()
openaiRegistry.AddLlmClient("OpenAI", "openai", map[string]interface{}{
    "model": "gpt-5-mini",
    "api_key": os.Getenv("OPENAI_API_KEY"),
})
openaiRegistry.SetPrimary("OpenAI")

anthropicRegistry, _ := b.NewClientRegistry()
anthropicRegistry.AddLlmClient("Anthropic", "anthropic", map[string]interface{}{
    "model": "claude-3-5-haiku-20241022",
    "api_key": os.Getenv("ANTHROPIC_API_KEY"),
})
anthropicRegistry.SetPrimary("Anthropic")

geminiRegistry, _ := b.NewClientRegistry()
geminiRegistry.AddLlmClient("Gemini", "vertex-ai", map[string]interface{}{
    "model": "gemini-2.5-flash",
    "location": "us-central1",
    "credentials": os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"),
})
geminiRegistry.SetPrimary("Gemini")

type result struct {
    data interface{}
    err  error
    provider string
}

resultChan := make(chan result, 3)
var wg sync.WaitGroup
wg.Add(3)

// Launch goroutines for each provider
go func() {
    defer wg.Done()
    data, err := b.ClassifyMessage(ctx1, message, b.WithClientRegistry(openaiRegistry))
    resultChan <- result{data: data, err: err, provider: "OpenAI"}
}()

go func() {
    defer wg.Done()
    data, err := b.ClassifyMessage(ctx2, message, b.WithClientRegistry(anthropicRegistry))
    resultChan <- result{data: data, err: err, provider: "Anthropic"}
}()

go func() {
    defer wg.Done()
    data, err := b.ClassifyMessage(ctx3, message, b.WithClientRegistry(geminiRegistry))
    resultChan <- result{data: data, err: err, provider: "Gemini"}
}()

// Wait for first successful result
go func() {
    wg.Wait()
    close(resultChan)
}()

// Get first result and cancel others
firstResult := <-resultChan

// Cancel all contexts to stop other operations
cancel1()
cancel2()
cancel3()

if firstResult.err != nil {
    // If first failed, try to get another result
    select {
    case secondResult := <-resultChan:
        if secondResult.err == nil {
            fmt.Printf("Provider %s won\n", secondResult.provider)
            return secondResult.data, nil
        }
    default:
        // No more results
    }
    return nil, firstResult.err
}

fmt.Printf("Provider %s won\n", firstResult.provider)
return firstResult.data, nil

}

</Tab>

<Tab title="Rust" language="rust">
```rust
use baml::{CancellationToken, ClientRegistry};
use myproject::baml_client::sync_client::B;
use std::collections::HashMap;
use std::sync::mpsc;
use std::thread;

fn fastest_provider_wins(message: &str) -> Result<String, String> {
    let (tx, rx) = mpsc::channel();
    let cancel_token = CancellationToken::new();

    // OpenAI
    let msg = message.to_string();
    let tx1 = tx.clone();
    let t1 = cancel_token.clone();
    thread::spawn(move || {
        let mut registry = ClientRegistry::new();
        let mut options = HashMap::new();
        options.insert("model".to_string(), serde_json::json!("gpt-5-mini"));
        registry.add_llm_client("OpenAI", "openai", options);
        registry.set_primary_client("OpenAI");
        if let Ok(result) = B.ClassifyMessage
            .with_client_registry(&registry)
            .with_cancellation_token(Some(t1))
            .call(&msg)
        {
            let _ = tx1.send(("OpenAI", result));
        }
    });

    // Anthropic
    let msg = message.to_string();
    let tx2 = tx.clone();
    let t2 = cancel_token.clone();
    thread::spawn(move || {
        let mut registry = ClientRegistry::new();
        let mut options = HashMap::new();
        options.insert("model".to_string(), serde_json::json!("claude-3-5-haiku-20241022"));
        registry.add_llm_client("Anthropic", "anthropic", options);
        registry.set_primary_client("Anthropic");
        if let Ok(result) = B.ClassifyMessage
            .with_client_registry(&registry)
            .with_cancellation_token(Some(t2))
            .call(&msg)
        {
            let _ = tx2.send(("Anthropic", result));
        }
    });

    drop(tx);

    // Wait for the first result and cancel slower operations
    match rx.recv() {
        Ok((provider, result)) => {
            cancel_token.cancel();
            println!("Provider {} won", provider);
            Ok(format!("{:?}", result))
        }
        Err(_) => Err("All providers failed".to_string()),
    }
}
</Tab> </Tabs>

Implementing Timeouts for Parallel Operations

Set automatic timeouts to prevent operations from running indefinitely:

<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript async function classifyWithTimeout(messages: string[], timeoutMs: number = 5000) { const controller = new AbortController()

// Set timeout for all operations const timeoutId = setTimeout(() => { controller.abort() }, timeoutMs)

try { const promises = messages.map(msg => b.ClassifyMessage(msg, { abortController: controller }) )

const results = await Promise.all(promises)
clearTimeout(timeoutId)
return results

} catch (error) { clearTimeout(timeoutId) if (error.name === 'BamlAbortError') { throw new Error(Operations timed out after ${timeoutMs}ms) } throw error } }

</Tab>

<Tab title="Python" language="python">
```python
import asyncio
from baml_py import AbortController

async def classify_with_timeout(messages: list[str], timeout_seconds: float = 5):
    controller = AbortController()
    
    async def timeout_task():
        await asyncio.sleep(timeout_seconds)
        controller.abort()
    
    # Start timeout
    timeout = asyncio.create_task(timeout_task())
    
    try:
        tasks = [
            b.ClassifyMessage(msg, baml_options={"abort_controller": controller})
            for msg in messages
        ]
        
        results = await asyncio.gather(*tasks)
        timeout.cancel()
        return results
    except BamlAbortError:
        raise TimeoutError(f"Operations timed out after {timeout_seconds}s")
    except Exception:
        timeout.cancel()
        raise
</Tab> <Tab title="Go" language="go"> ```go func classifyWithTimeout(messages []string, timeout time.Duration) ([]types.Category, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel()
results := make([]types.Category, len(messages))
errors := make([]error, len(messages))
var wg sync.WaitGroup

for i, msg := range messages {
    wg.Add(1)
    go func(index int, message string) {
        defer wg.Done()
        result, err := b.ClassifyMessage(ctx, message)
        results[index] = result
        errors[index] = err
    }(i, msg)
}

wg.Wait()

// Check for errors
for i, err := range errors {
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            return nil, fmt.Errorf("operations timed out after %v", timeout)
        }
        return nil, fmt.Errorf("message %d failed: %w", i, err)
    }
}

return results, nil

}

</Tab>

<Tab title="Rust" language="rust">
```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
use std::thread;
use std::time::Duration;

fn classify_with_timeout(
    messages: &[&str],
    timeout: Duration,
) -> Result<Vec<Category>, String> {
    let token = CancellationToken::new_with_timeout(timeout);

    let handles: Vec<_> = messages
        .iter()
        .map(|msg| {
            let t = token.clone();
            let msg = msg.to_string();
            thread::spawn(move || {
                B.ClassifyMessage
                    .with_cancellation_token(Some(t))
                    .call(&msg)
            })
        })
        .collect();

    let mut results = Vec::new();
    for handle in handles {
        match handle.join().unwrap() {
            Ok(result) => results.push(result),
            Err(e) => return Err(format!("Classification failed: {}", e)),
        }
    }

    Ok(results)
}
</Tab> </Tabs>

Batching with Cancellation Support

Process items in batches with the ability to cancel remaining batches:

<Tabs> <Tab title="TypeScript" language="typescript"> ```typescript async function processBatches<T, R>( items: T[], batchSize: number, processor: (item: T, controller: AbortController) => Promise<R> ): Promise<R[]> { const results: R[] = [] const masterController = new AbortController()

try { for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize)

  // Check if we should stop
  if (masterController.signal.aborted) {
    throw new Error('Batch processing cancelled')
  }
  
  // Process batch in parallel
  const batchPromises = batch.map(item => 
    processor(item, masterController)
  )
  
  const batchResults = await Promise.all(batchPromises)
  results.push(...batchResults)
  
  console.log(`Completed batch ${Math.floor(i / batchSize) + 1}`)
}

return results

} catch (error) { masterController.abort() throw error } }

// Usage const messages = ['message1', 'message2', 'message3', /.../] const results = await processBatches( messages, 5, // batch size (msg, controller) => b.ClassifyMessage(msg, { abortController: controller }) )

</Tab>

<Tab title="Python" language="python">
```python
async def process_batches(items, batch_size, processor):
    results = []
    master_controller = AbortController()
    
    try:
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            
            # Check if we should stop
            if master_controller.aborted:
                raise Exception('Batch processing cancelled')
            
            # Process batch in parallel
            batch_tasks = [
                processor(item, master_controller)
                for item in batch
            ]
            
            batch_results = await asyncio.gather(*batch_tasks)
            results.extend(batch_results)
            
            print(f"Completed batch {i // batch_size + 1}")
        
        return results
    except Exception as e:
        master_controller.abort()
        raise

# Usage
messages = ['message1', 'message2', 'message3']
results = await process_batches(
    messages,
    5,  # batch size
    lambda msg, ctrl: b.ClassifyMessage(msg, baml_options={"abort_controller": ctrl})
)
</Tab> </Tabs>

For basic abort controller usage and error handling, see the Abort Controllers guide.