showcase/shell-docs/src/content/ag-ui/sdk/kotlin/client/http-agent.mdx
HttpAgent is a low-level HTTP transport implementation that provides direct control over AG-UI protocol communication. It handles the core HTTP communication, Server-Sent Events (SSE) parsing, and event verification, serving as the foundation for higher-level agent implementations.
Build custom agent behavior on top of the HTTP transport:
class CustomAgent(url: String, config: AgUiAgentConfig) {
private val httpAgent = HttpAgent(url, config)
fun customInteraction(input: RunAgentInput): Flow<BaseEvent> {
return httpAgent.run(input)
.onEach { event ->
// Custom event processing
when (event) {
is TextMessageContentEvent -> logMessage(event.delta)
is ToolCallStartEvent -> handleToolCall(event)
}
}
}
}
Direct access to AG-UI protocol events:
val httpAgent = HttpAgent("https://api.example.com/agent") {
bearerToken = "your-token"
}
val input = RunAgentInput(
threadId = "thread-123",
runId = "run-456",
messages = listOf(
UserMessage(id = "user-1", content = "Hello")
)
)
httpAgent.run(input).collect { event ->
// Raw protocol events
println("Event: ${event.eventType}")
}
HttpAgent uses the same configuration as other agents:
val httpAgent = HttpAgent("https://api.example.com/agent") {
// Authentication
bearerToken = "your-token"
// Request configuration
requestTimeout = 30.seconds
connectTimeout = 10.seconds
// Debug logging
debug = true
// Custom headers
headers = mapOf("X-Custom" to "value")
}
Execute an AG-UI protocol request:
fun run(input: RunAgentInput): Flow<BaseEvent>
Parameters:
input: Complete AG-UI protocol input with messages, state, tools, etc.Returns: Flow<BaseEvent> - Stream of protocol events
Example:
val input = RunAgentInput(
threadId = "conversation-1",
runId = UUID.randomUUID().toString(),
messages = listOf(
SystemMessage(id = "sys-1", content = "You are helpful"),
UserMessage(id = "user-1", content = "What's 2+2?")
),
state = buildJsonObject { put("context", "math") },
tools = emptyList(),
context = emptyMap()
)
httpAgent.run(input).collect { event ->
when (event) {
is RunStartedEvent -> println("Run started: ${event.runId}")
is TextMessageStartEvent -> println("Agent responding...")
is TextMessageContentEvent -> print(event.delta)
is TextMessageEndEvent -> println("\nResponse complete")
is RunFinishedEvent -> println("Run finished")
}
}
HttpAgent emits all AG-UI protocol events:
httpAgent.run(input).collect { event ->
when (event) {
// Run lifecycle
is RunStartedEvent -> { /* Run began */ }
is RunFinishedEvent -> { /* Run completed */ }
// Text messages
is TextMessageStartEvent -> { /* Agent started message */ }
is TextMessageContentEvent -> { /* Message content chunk */ }
is TextMessageEndEvent -> { /* Agent finished message */ }
// Tool calls
is ToolCallStartEvent -> { /* Tool call initiated */ }
is ToolCallArgsEvent -> { /* Tool arguments chunk */ }
is ToolCallEndEvent -> { /* Tool call complete */ }
is ToolResultEvent -> { /* Tool execution result */ }
// State management
is StateSnapshotEvent -> { /* Complete state snapshot */ }
is StateDeltaEvent -> { /* Incremental state change */ }
// Errors
is ErrorEvent -> { /* Protocol or execution error */ }
}
}
HttpAgent includes automatic event verification:
// Events are automatically verified against the protocol
httpAgent.run(input).collect { event ->
// All events here have passed protocol validation
// Invalid sequences will emit ErrorEvent instead
}
HttpAgent uses SSE for real-time streaming:
text/event-streamPOST requests to /agent endpoint with:
{
"threadId": "thread-123",
"runId": "run-456",
"messages": [...],
"state": {...},
"tools": [...],
"context": {...}
}
ktor-client-android (OkHttp under the hood)ktor-client-darwin (NSURLSession under the hood)ktor-client-ciohttpAgent.run(input).collect { event ->
if (event is ErrorEvent) {
when (event.error.type) {
"network" -> {
// Connection issues, timeouts, etc.
println("Network error: ${event.error.message}")
}
"protocol" -> {
// Invalid protocol messages
println("Protocol error: ${event.error.message}")
}
"authentication" -> {
// Auth failures
println("Auth error: ${event.error.message}")
}
}
}
}
val httpAgent = HttpAgent(url) {
bearerToken = token
// Ktor client handles connection-level retries
// Application-level retries should be implemented by caller
}
class EventProcessor {
fun process(input: RunAgentInput): Flow<ProcessedEvent> {
return HttpAgent(url, config)
.run(input)
.filter { event ->
// Filter relevant events
event is TextMessageContentEvent || event is ToolCallStartEvent
}
.map { event ->
// Transform events
when (event) {
is TextMessageContentEvent -> ProcessedEvent.TextChunk(event.delta)
is ToolCallStartEvent -> ProcessedEvent.ToolStart(event.toolCallName)
else -> ProcessedEvent.Other
}
}
}
}
class StatefulHttpAgent(url: String, config: AgUiAgentConfig) {
private val httpAgent = HttpAgent(url, config)
private var currentState: JsonElement = JsonObject(emptyMap())
fun runWithState(
messages: List<Message>,
threadId: String = "default"
): Flow<BaseEvent> {
val input = RunAgentInput(
threadId = threadId,
runId = UUID.randomUUID().toString(),
messages = messages,
state = currentState
)
return httpAgent.run(input).onEach { event ->
// Update state from events
when (event) {
is StateSnapshotEvent -> currentState = event.snapshot
is StateDeltaEvent -> {
// Apply JSON patch (simplified)
currentState = applyPatch(currentState, event.delta)
}
}
}
}
}
HttpAgent is thread-safe and can handle concurrent requests:
val httpAgent = HttpAgent(url) { bearerToken = token }
// Multiple concurrent requests
launch {
httpAgent.run(input1).collect { }
}
launch {
httpAgent.run(input2).collect { }
}
// Reuse HttpAgent instance for multiple requests
val httpAgent = HttpAgent(url, config)
repeat(10) { i ->
val input = RunAgentInput(/* ... */)
httpAgent.run(input).collect { }
}
// HTTP connections are reused automatically
// Handle large streaming responses efficiently
httpAgent.run(input)
.buffer(capacity = 100) // Buffer events
.collect { event ->
// Process events in batches
}
fun runSafely(messages: List<Message>): Flow<BaseEvent> {
require(messages.isNotEmpty()) { "Messages cannot be empty" }
require(messages.any { it is UserMessage }) { "Must include user message" }
val input = RunAgentInput(
threadId = "validated",
runId = UUID.randomUUID().toString(),
messages = messages
)
return httpAgent.run(input)
}
fun runWithRetry(input: RunAgentInput, maxRetries: Int = 3): Flow<BaseEvent> {
return flow {
repeat(maxRetries) { attempt ->
try {
httpAgent.run(input).collect { event ->
emit(event)
if (event is ErrorEvent && event.error.type == "network") {
throw IOException("Network error")
}
}
return@flow // Success, exit retry loop
} catch (e: IOException) {
if (attempt == maxRetries - 1) throw e
delay(1000 * (attempt + 1)) // Exponential backoff
}
}
}
}
// HttpAgent automatically manages HTTP resources
// No explicit cleanup required, but lifecycle should be considered:
class AgentService {
private val httpAgent = HttpAgent(url, config)
// Agent instance lifecycle matches service lifecycle
// Resources cleaned up when service is garbage collected
}