Back to Copilotkit

HttpAgent

showcase/shell-docs/src/content/ag-ui/sdk/kotlin/client/http-agent.mdx

1.57.09.4 KB
Original Source

HttpAgent

HttpAgent is a low-level HTTP transport implementation that provides direct control over AG-UI protocol communication. It handles the core HTTP communication, Server-Sent Events (SSE) parsing, and event verification, serving as the foundation for higher-level agent implementations.

Usage Scenarios

Custom Agent Implementation

Build custom agent behavior on top of the HTTP transport:

kotlin
class CustomAgent(url: String, config: AgUiAgentConfig) {
    private val httpAgent = HttpAgent(url, config)

    fun customInteraction(input: RunAgentInput): Flow<BaseEvent> {
        return httpAgent.run(input)
            .onEach { event ->
                // Custom event processing
                when (event) {
                    is TextMessageContentEvent -> logMessage(event.delta)
                    is ToolCallStartEvent -> handleToolCall(event)
                }
            }
    }
}

Protocol-Level Access

Direct access to AG-UI protocol events:

kotlin
val httpAgent = HttpAgent("https://api.example.com/agent") {
    bearerToken = "your-token"
}

val input = RunAgentInput(
    threadId = "thread-123",
    runId = "run-456",
    messages = listOf(
        UserMessage(id = "user-1", content = "Hello")
    )
)

httpAgent.run(input).collect { event ->
    // Raw protocol events
    println("Event: ${event.eventType}")
}

Configuration

HttpAgent uses the same configuration as other agents:

kotlin
val httpAgent = HttpAgent("https://api.example.com/agent") {
    // Authentication
    bearerToken = "your-token"

    // Request configuration
    requestTimeout = 30.seconds
    connectTimeout = 10.seconds

    // Debug logging
    debug = true

    // Custom headers
    headers = mapOf("X-Custom" to "value")
}

Methods

run

Execute an AG-UI protocol request:

kotlin
fun run(input: RunAgentInput): Flow<BaseEvent>

Parameters:

  • input: Complete AG-UI protocol input with messages, state, tools, etc.

Returns: Flow<BaseEvent> - Stream of protocol events

Example:

kotlin
val input = RunAgentInput(
    threadId = "conversation-1",
    runId = UUID.randomUUID().toString(),
    messages = listOf(
        SystemMessage(id = "sys-1", content = "You are helpful"),
        UserMessage(id = "user-1", content = "What's 2+2?")
    ),
    state = buildJsonObject { put("context", "math") },
    tools = emptyList(),
    context = emptyMap()
)

httpAgent.run(input).collect { event ->
    when (event) {
        is RunStartedEvent -> println("Run started: ${event.runId}")
        is TextMessageStartEvent -> println("Agent responding...")
        is TextMessageContentEvent -> print(event.delta)
        is TextMessageEndEvent -> println("\nResponse complete")
        is RunFinishedEvent -> println("Run finished")
    }
}

Event Processing

Raw Protocol Events

HttpAgent emits all AG-UI protocol events:

kotlin
httpAgent.run(input).collect { event ->
    when (event) {
        // Run lifecycle
        is RunStartedEvent -> { /* Run began */ }
        is RunFinishedEvent -> { /* Run completed */ }

        // Text messages
        is TextMessageStartEvent -> { /* Agent started message */ }
        is TextMessageContentEvent -> { /* Message content chunk */ }
        is TextMessageEndEvent -> { /* Agent finished message */ }

        // Tool calls
        is ToolCallStartEvent -> { /* Tool call initiated */ }
        is ToolCallArgsEvent -> { /* Tool arguments chunk */ }
        is ToolCallEndEvent -> { /* Tool call complete */ }
        is ToolResultEvent -> { /* Tool execution result */ }

        // State management
        is StateSnapshotEvent -> { /* Complete state snapshot */ }
        is StateDeltaEvent -> { /* Incremental state change */ }

        // Errors
        is ErrorEvent -> { /* Protocol or execution error */ }
    }
}

Event Verification

HttpAgent includes automatic event verification:

kotlin
// Events are automatically verified against the protocol
httpAgent.run(input).collect { event ->
    // All events here have passed protocol validation
    // Invalid sequences will emit ErrorEvent instead
}

HTTP Transport Details

Server-Sent Events (SSE)

HttpAgent uses SSE for real-time streaming:

  • Automatic connection management
  • Reconnection on connection loss
  • Proper SSE event parsing
  • Content-Type: text/event-stream

Request Format

POST requests to /agent endpoint with:

json
{
  "threadId": "thread-123",
  "runId": "run-456",
  "messages": [...],
  "state": {...},
  "tools": [...],
  "context": {...}
}

Platform HTTP Engines

  • Android: Uses ktor-client-android (OkHttp under the hood)
  • iOS: Uses ktor-client-darwin (NSURLSession under the hood)
  • JVM: Uses ktor-client-cio

Error Handling

Network Errors

kotlin
httpAgent.run(input).collect { event ->
    if (event is ErrorEvent) {
        when (event.error.type) {
            "network" -> {
                // Connection issues, timeouts, etc.
                println("Network error: ${event.error.message}")
            }
            "protocol" -> {
                // Invalid protocol messages
                println("Protocol error: ${event.error.message}")
            }
            "authentication" -> {
                // Auth failures
                println("Auth error: ${event.error.message}")
            }
        }
    }
}

Automatic Retries

kotlin
val httpAgent = HttpAgent(url) {
    bearerToken = token

    // Ktor client handles connection-level retries
    // Application-level retries should be implemented by caller
}

Advanced Usage

Custom Event Processing Pipeline

kotlin
class EventProcessor {
    fun process(input: RunAgentInput): Flow<ProcessedEvent> {
        return HttpAgent(url, config)
            .run(input)
            .filter { event ->
                // Filter relevant events
                event is TextMessageContentEvent || event is ToolCallStartEvent
            }
            .map { event ->
                // Transform events
                when (event) {
                    is TextMessageContentEvent -> ProcessedEvent.TextChunk(event.delta)
                    is ToolCallStartEvent -> ProcessedEvent.ToolStart(event.toolCallName)
                    else -> ProcessedEvent.Other
                }
            }
    }
}

State Management Integration

kotlin
class StatefulHttpAgent(url: String, config: AgUiAgentConfig) {
    private val httpAgent = HttpAgent(url, config)
    private var currentState: JsonElement = JsonObject(emptyMap())

    fun runWithState(
        messages: List<Message>,
        threadId: String = "default"
    ): Flow<BaseEvent> {
        val input = RunAgentInput(
            threadId = threadId,
            runId = UUID.randomUUID().toString(),
            messages = messages,
            state = currentState
        )

        return httpAgent.run(input).onEach { event ->
            // Update state from events
            when (event) {
                is StateSnapshotEvent -> currentState = event.snapshot
                is StateDeltaEvent -> {
                    // Apply JSON patch (simplified)
                    currentState = applyPatch(currentState, event.delta)
                }
            }
        }
    }
}

Thread Safety

HttpAgent is thread-safe and can handle concurrent requests:

kotlin
val httpAgent = HttpAgent(url) { bearerToken = token }

// Multiple concurrent requests
launch {
    httpAgent.run(input1).collect { }
}

launch {
    httpAgent.run(input2).collect { }
}

Performance Considerations

Connection Reuse

kotlin
// Reuse HttpAgent instance for multiple requests
val httpAgent = HttpAgent(url, config)

repeat(10) { i ->
    val input = RunAgentInput(/* ... */)
    httpAgent.run(input).collect { }
}
// HTTP connections are reused automatically

Large Responses

kotlin
// Handle large streaming responses efficiently
httpAgent.run(input)
    .buffer(capacity = 100) // Buffer events
    .collect { event ->
        // Process events in batches
    }

Best Practices

Input Validation

kotlin
fun runSafely(messages: List<Message>): Flow<BaseEvent> {
    require(messages.isNotEmpty()) { "Messages cannot be empty" }
    require(messages.any { it is UserMessage }) { "Must include user message" }

    val input = RunAgentInput(
        threadId = "validated",
        runId = UUID.randomUUID().toString(),
        messages = messages
    )

    return httpAgent.run(input)
}

Error Recovery

kotlin
fun runWithRetry(input: RunAgentInput, maxRetries: Int = 3): Flow<BaseEvent> {
    return flow {
        repeat(maxRetries) { attempt ->
            try {
                httpAgent.run(input).collect { event ->
                    emit(event)
                    if (event is ErrorEvent && event.error.type == "network") {
                        throw IOException("Network error")
                    }
                }
                return@flow // Success, exit retry loop
            } catch (e: IOException) {
                if (attempt == maxRetries - 1) throw e
                delay(1000 * (attempt + 1)) // Exponential backoff
            }
        }
    }
}

Resource Management

kotlin
// HttpAgent automatically manages HTTP resources
// No explicit cleanup required, but lifecycle should be considered:

class AgentService {
    private val httpAgent = HttpAgent(url, config)

    // Agent instance lifecycle matches service lifecycle
    // Resources cleaned up when service is garbage collected
}