showcase/shell-docs/src/content/ag-ui/sdk/kotlin/client/abstract-agent.mdx
AbstractAgent is the base class that provides the foundation for implementing custom agent connectivity patterns. It defines the core contract and common functionality that all agent implementations must follow, making it the starting point for building specialized agent types.
AbstractAgent provides:
class CustomAgent(
url: String,
configure: AgUiAgentConfig.() -> Unit = {}
) : AbstractAgent(url, configure) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return flow {
// Custom pre-processing
val processedInput = preprocessInput(input)
// Execute request using inherited HTTP client
httpClient.post(agentUrl) {
contentType(ContentType.Application.Json)
setBody(processedInput)
}.body<Flow<BaseEvent>>().collect { event ->
// Custom event processing
emit(processEvent(event))
}
}
}
private fun preprocessInput(input: RunAgentInput): RunAgentInput {
// Custom input processing logic
return input.copy(
context = input.context + mapOf("customFlag" to "true")
)
}
private fun processEvent(event: BaseEvent): BaseEvent {
// Custom event processing logic
return when (event) {
is TextMessageContentEvent -> {
// Transform content
event.copy(delta = event.delta.uppercase())
}
else -> event
}
}
}
class BatchAgent(
url: String,
configure: AgUiAgentConfig.() -> Unit = {}
) : AbstractAgent(url, configure) {
private val messageQueue = mutableListOf<String>()
fun queueMessage(message: String) {
messageQueue.add(message)
}
fun processBatch(threadId: String = "batch"): Flow<BaseEvent> {
val messages = messageQueue.map { content ->
UserMessage(id = generateId("user"), content = content)
}
val input = RunAgentInput(
threadId = threadId,
runId = generateRunId(),
messages = messages
)
messageQueue.clear()
return run(input)
}
override fun run(input: RunAgentInput): Flow<BaseEvent> {
// Custom batching logic
return super.run(input.copy(
context = input.context + mapOf(
"batchSize" to input.messages.size.toString(),
"batchId" to UUID.randomUUID().toString()
)
))
}
}
AbstractAgent uses AgUiAgentConfig for configuration:
abstract class MyAgent(
url: String,
configure: AgUiAgentConfig.() -> Unit
) : AbstractAgent(url, configure) {
init {
// Access configuration through inherited 'config' property
println("System prompt: ${config.systemPrompt}")
println("Debug mode: ${config.debug}")
println("Headers: ${config.headers}")
}
}
Must be implemented by subclasses to define request execution:
abstract fun run(input: RunAgentInput): Flow<BaseEvent>
Parameters:
input: Complete AG-UI protocol inputReturns: Flow<BaseEvent> - Stream of protocol events
Pre-configured HTTP client with authentication:
class MyAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return flow {
// Use inherited HTTP client
val response = httpClient.post(agentUrl) {
contentType(ContentType.Application.Json)
setBody(input)
}
// Process response...
}
}
}
Access to agent configuration:
class MyAgent : AbstractAgent(url, config) {
private fun customizeRequest(): HttpRequestBuilder.() -> Unit = {
// Use config properties
timeout {
requestTimeoutMillis = config.requestTimeout.inWholeMilliseconds
connectTimeoutMillis = config.connectTimeout.inWholeMilliseconds
}
// Add custom headers from config
config.headers.forEach { (key, value) ->
header(key, value)
}
}
}
The configured agent endpoint URL:
class MyAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return flow {
println("Connecting to: $agentUrl")
// Make request to agentUrl
}
}
}
Generate unique IDs for messages and runs:
class MyAgent : AbstractAgent(url, config) {
private fun createMessage(content: String): UserMessage {
return UserMessage(
id = generateId("user"), // Inherited utility
content = content
)
}
}
Generate unique run identifiers:
class MyAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
val runId = input.runId ?: generateRunId() // Inherited utility
// Use runId for request tracking
return processRequest(input.copy(runId = runId))
}
}
AbstractAgent automatically configures the HTTP client with:
Based on config, sets up:
ktor-client-androidktor-client-darwinktor-client-cioOptional request/response logging when config.debug = true
class PreprocessingAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return flow {
// Add metadata to all requests
val enhancedInput = input.copy(
context = input.context + mapOf(
"clientVersion" to "1.0.0",
"requestTime" to System.currentTimeMillis().toString()
)
)
// Execute with enhanced input
processRequest(enhancedInput).collect { emit(it) }
}
}
}
class FilteringAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return processRequest(input)
.filter { event ->
// Only emit certain event types
when (event) {
is TextMessageContentEvent,
is ToolCallStartEvent,
is RunFinishedEvent -> true
else -> false
}
}
}
}
class TransformingAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return processRequest(input)
.map { event ->
// Transform events before emission
when (event) {
is TextMessageContentEvent -> {
event.copy(delta = formatContent(event.delta))
}
else -> event
}
}
}
private fun formatContent(content: String): String {
// Custom content formatting
return content.trim().replace("\\n", "\n")
}
}
class RobustAgent : AbstractAgent(url, config) {
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return flow {
try {
processRequest(input).collect { event ->
emit(event)
}
} catch (e: Exception) {
// Convert exceptions to protocol errors
emit(ErrorEvent(
error = AgentError(
type = "custom_error",
message = e.message ?: "Unknown error",
details = mapOf("exception" to e::class.simpleName)
)
))
}
}
}
}
class ValidatingAgent : AbstractAgent(url, config) {
init {
// Validate configuration
require(config.systemPrompt?.isNotBlank() == true) {
"System prompt is required"
}
require(config.bearerToken?.isNotBlank() == true ||
config.apiKey?.isNotBlank() == true) {
"Authentication is required"
}
}
}
class ResourceAwareAgent : AbstractAgent(url, config) {
private val requestCounter = AtomicInteger(0)
override fun run(input: RunAgentInput): Flow<BaseEvent> {
val requestId = requestCounter.incrementAndGet()
return flow {
try {
if (config.debug) {
println("Starting request $requestId")
}
processRequest(input).collect { event ->
emit(event)
}
} finally {
if (config.debug) {
println("Completed request $requestId")
}
}
}
}
}
class ThreadSafeAgent : AbstractAgent(url, config) {
private val activeRequests = ConcurrentHashMap<String, Job>()
override fun run(input: RunAgentInput): Flow<BaseEvent> {
return flow {
val requestKey = "${input.threadId}-${input.runId}"
// Track active request
activeRequests[requestKey] = currentCoroutineContext().job
try {
processRequest(input).collect { event ->
emit(event)
}
} finally {
activeRequests.remove(requestKey)
}
}
}
fun cancelRequest(threadId: String, runId: String) {
val requestKey = "$threadId-$runId"
activeRequests[requestKey]?.cancel()
}
}