Back to Copilotkit

AbstractAgent

showcase/shell-docs/src/content/ag-ui/sdk/kotlin/client/abstract-agent.mdx

1.57.010.4 KB
Original Source

AbstractAgent

AbstractAgent is the base class that provides the foundation for implementing custom agent connectivity patterns. It defines the core contract and common functionality that all agent implementations must follow, making it the starting point for building specialized agent types.

Overview

AbstractAgent provides:

  • Core HTTP client configuration and management
  • Authentication handling
  • Request/response lifecycle hooks
  • Common utility methods
  • Standardized error handling patterns

Usage

Creating Custom Agent Types

kotlin
class CustomAgent(
    url: String,
    configure: AgUiAgentConfig.() -> Unit = {}
) : AbstractAgent(url, configure) {

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            // Custom pre-processing
            val processedInput = preprocessInput(input)

            // Execute request using inherited HTTP client
            httpClient.post(agentUrl) {
                contentType(ContentType.Application.Json)
                setBody(processedInput)
            }.body<Flow<BaseEvent>>().collect { event ->
                // Custom event processing
                emit(processEvent(event))
            }
        }
    }

    private fun preprocessInput(input: RunAgentInput): RunAgentInput {
        // Custom input processing logic
        return input.copy(
            context = input.context + mapOf("customFlag" to "true")
        )
    }

    private fun processEvent(event: BaseEvent): BaseEvent {
        // Custom event processing logic
        return when (event) {
            is TextMessageContentEvent -> {
                // Transform content
                event.copy(delta = event.delta.uppercase())
            }
            else -> event
        }
    }
}

Specialized Agent Implementation

kotlin
class BatchAgent(
    url: String,
    configure: AgUiAgentConfig.() -> Unit = {}
) : AbstractAgent(url, configure) {

    private val messageQueue = mutableListOf<String>()

    fun queueMessage(message: String) {
        messageQueue.add(message)
    }

    fun processBatch(threadId: String = "batch"): Flow<BaseEvent> {
        val messages = messageQueue.map { content ->
            UserMessage(id = generateId("user"), content = content)
        }

        val input = RunAgentInput(
            threadId = threadId,
            runId = generateRunId(),
            messages = messages
        )

        messageQueue.clear()
        return run(input)
    }

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        // Custom batching logic
        return super.run(input.copy(
            context = input.context + mapOf(
                "batchSize" to input.messages.size.toString(),
                "batchId" to UUID.randomUUID().toString()
            )
        ))
    }
}

Configuration

AbstractAgent uses AgUiAgentConfig for configuration:

kotlin
abstract class MyAgent(
    url: String,
    configure: AgUiAgentConfig.() -> Unit
) : AbstractAgent(url, configure) {

    init {
        // Access configuration through inherited 'config' property
        println("System prompt: ${config.systemPrompt}")
        println("Debug mode: ${config.debug}")
        println("Headers: ${config.headers}")
    }
}

Core Methods

Abstract Methods

run

Must be implemented by subclasses to define request execution:

kotlin
abstract fun run(input: RunAgentInput): Flow<BaseEvent>

Parameters:

  • input: Complete AG-UI protocol input

Returns: Flow<BaseEvent> - Stream of protocol events

Protected Properties

httpClient

Pre-configured HTTP client with authentication:

kotlin
class MyAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            // Use inherited HTTP client
            val response = httpClient.post(agentUrl) {
                contentType(ContentType.Application.Json)
                setBody(input)
            }
            // Process response...
        }
    }
}

config

Access to agent configuration:

kotlin
class MyAgent : AbstractAgent(url, config) {
    private fun customizeRequest(): HttpRequestBuilder.() -> Unit = {
        // Use config properties
        timeout {
            requestTimeoutMillis = config.requestTimeout.inWholeMilliseconds
            connectTimeoutMillis = config.connectTimeout.inWholeMilliseconds
        }

        // Add custom headers from config
        config.headers.forEach { (key, value) ->
            header(key, value)
        }
    }
}

agentUrl

The configured agent endpoint URL:

kotlin
class MyAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            println("Connecting to: $agentUrl")
            // Make request to agentUrl
        }
    }
}

Utility Methods

generateId

Generate unique IDs for messages and runs:

kotlin
class MyAgent : AbstractAgent(url, config) {
    private fun createMessage(content: String): UserMessage {
        return UserMessage(
            id = generateId("user"), // Inherited utility
            content = content
        )
    }
}

generateRunId

Generate unique run identifiers:

kotlin
class MyAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        val runId = input.runId ?: generateRunId() // Inherited utility
        // Use runId for request tracking
        return processRequest(input.copy(runId = runId))
    }
}

HTTP Client Configuration

AbstractAgent automatically configures the HTTP client with:

Authentication

Based on config, sets up:

  • Bearer token authentication
  • API key authentication
  • Basic authentication
  • Custom authentication providers

Platform-Specific Engines

  • Android: ktor-client-android
  • iOS: ktor-client-darwin
  • JVM: ktor-client-cio

Content Negotiation

  • JSON serialization with kotlinx.serialization
  • Automatic request/response handling

Logging

Optional request/response logging when config.debug = true

Common Implementation Patterns

Request Preprocessing

kotlin
class PreprocessingAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            // Add metadata to all requests
            val enhancedInput = input.copy(
                context = input.context + mapOf(
                    "clientVersion" to "1.0.0",
                    "requestTime" to System.currentTimeMillis().toString()
                )
            )

            // Execute with enhanced input
            processRequest(enhancedInput).collect { emit(it) }
        }
    }
}

Event Filtering

kotlin
class FilteringAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return processRequest(input)
            .filter { event ->
                // Only emit certain event types
                when (event) {
                    is TextMessageContentEvent,
                    is ToolCallStartEvent,
                    is RunFinishedEvent -> true
                    else -> false
                }
            }
    }
}

Response Transformation

kotlin
class TransformingAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return processRequest(input)
            .map { event ->
                // Transform events before emission
                when (event) {
                    is TextMessageContentEvent -> {
                        event.copy(delta = formatContent(event.delta))
                    }
                    else -> event
                }
            }
    }

    private fun formatContent(content: String): String {
        // Custom content formatting
        return content.trim().replace("\\n", "\n")
    }
}

Error Handling

kotlin
class RobustAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            try {
                processRequest(input).collect { event ->
                    emit(event)
                }
            } catch (e: Exception) {
                // Convert exceptions to protocol errors
                emit(ErrorEvent(
                    error = AgentError(
                        type = "custom_error",
                        message = e.message ?: "Unknown error",
                        details = mapOf("exception" to e::class.simpleName)
                    )
                ))
            }
        }
    }
}

Best Practices

Configuration Validation

kotlin
class ValidatingAgent : AbstractAgent(url, config) {
    init {
        // Validate configuration
        require(config.systemPrompt?.isNotBlank() == true) {
            "System prompt is required"
        }

        require(config.bearerToken?.isNotBlank() == true ||
               config.apiKey?.isNotBlank() == true) {
            "Authentication is required"
        }
    }
}

Resource Management

kotlin
class ResourceAwareAgent : AbstractAgent(url, config) {
    private val requestCounter = AtomicInteger(0)

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        val requestId = requestCounter.incrementAndGet()

        return flow {
            try {
                if (config.debug) {
                    println("Starting request $requestId")
                }

                processRequest(input).collect { event ->
                    emit(event)
                }
            } finally {
                if (config.debug) {
                    println("Completed request $requestId")
                }
            }
        }
    }
}

Thread Safety

kotlin
class ThreadSafeAgent : AbstractAgent(url, config) {
    private val activeRequests = ConcurrentHashMap<String, Job>()

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            val requestKey = "${input.threadId}-${input.runId}"

            // Track active request
            activeRequests[requestKey] = currentCoroutineContext().job

            try {
                processRequest(input).collect { event ->
                    emit(event)
                }
            } finally {
                activeRequests.remove(requestKey)
            }
        }
    }

    fun cancelRequest(threadId: String, runId: String) {
        val requestKey = "$threadId-$runId"
        activeRequests[requestKey]?.cancel()
    }
}