showcase/shell-docs/src/content/ag-ui/sdk/dart/client/client.mdx
The AgUiClient class is the primary interface for connecting to AG-UI compatible servers. It handles HTTP communication, SSE streaming, binary protocol encoding/decoding, and provides a type-safe API for agent interactions.
AgUiClient({
required AgUiClientConfig config,
http.Client? httpClient,
Encoder? encoder,
Decoder? decoder,
})
config (required): Configuration object with server detailshttpClient (optional): Custom HTTP client implementationencoder (optional): Custom encoder for request serializationdecoder (optional): Custom decoder for response parsingExecutes an agent and returns a stream of decoded events.
Stream<BaseEvent> runAgent(
String agentId,
RunAgentInput input, {
Map<String, String>? headers,
})
agentId: Unique identifier for the agentinput: Agent input containing messages, context, and configurationheaders: Optional additional headers for this requestA Stream<BaseEvent> that emits protocol events as they arrive.
final input = SimpleRunAgentInput(
messages: [
UserMessage(id: 'msg_1', content: 'Hello, agent!'),
],
context: {'sessionId': '12345'},
);
await for (final event in client.runAgent('chat-agent', input)) {
switch (event) {
case RunStartedEvent():
print('Agent started');
case TextMessageDeltaEvent(delta: final text):
print('Agent says: $text');
case RunFinishedEvent():
print('Agent finished');
}
}
Executes an agent and returns raw SSE messages without decoding.
Stream<SseMessage> runAgentRaw(
String agentId,
RunAgentInput input, {
Map<String, String>? headers,
})
await for (final message in client.runAgentRaw('agent', input)) {
print('Raw event: ${message.event}');
print('Raw data: ${message.data}');
}
Cancels an active agent execution.
Future<void> cancelAgent(String agentId)
agentId: The agent ID to cancel// Start long-running agent
final stream = client.runAgent('analysis-agent', input);
// Set up listener with timeout
final subscription = stream.listen(
(event) => processEvent(event),
onError: (error) => handleError(error),
);
// Cancel after 10 seconds
Timer(Duration(seconds: 10), () async {
await client.cancelAgent('analysis-agent');
await subscription.cancel();
});
Cleans up all resources held by the client.
void dispose()
final AgUiClientConfig config;
The configuration used to initialize the client. Read-only.
Map<String, bool> get activeStreams;
Returns a map of currently active agent IDs and their status.
The client throws specific exceptions for different error scenarios:
General client-side errors.
try {
await for (final event in client.runAgent('agent', input)) {
// Process events
}
} on AgUiClientError catch (e) {
print('Client error: ${e.message}');
print('Error code: ${e.code}');
}
Network connectivity issues.
on NetworkError catch (e) {
print('Network error: ${e.message}');
// Implement retry logic
}
Input validation failures.
on ValidationError catch (e) {
print('Validation failed: ${e.message}');
print('Failed fields: ${e.fields}');
}
Server-side errors (5xx status codes).
on ServerError catch (e) {
print('Server error: ${e.statusCode}');
print('Message: ${e.message}');
}
Provide a custom HTTP client for advanced scenarios:
final retryClient = RetryClient(http.Client());
final client = AgUiClient(
config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
httpClient: retryClient,
);
Implement custom encoders/decoders for specialized formats:
class CustomEncoder implements Encoder {
@override
List<int> encode(RunAgentInput input) {
// Custom encoding logic
return utf8.encode(jsonEncode(input.toJson()));
}
}
class CustomDecoder implements Decoder {
@override
BaseEvent decode(List<int> data) {
// Custom decoding logic
final json = jsonDecode(utf8.decode(data));
return BaseEvent.fromJson(json);
}
}
final client = AgUiClient(
config: config,
encoder: CustomEncoder(),
decoder: CustomDecoder(),
);
Transform the event stream for specific use cases:
// Filter only message events
final messageStream = client
.runAgent('agent', input)
.where((event) => event is TextMessageEvent);
// Collect all text into a single string
final completeText = await client
.runAgent('agent', input)
.whereType<TextMessageDeltaEvent>()
.map((event) => event.delta)
.join();
Run multiple agents concurrently:
final agent1 = client.runAgent('agent1', input1);
final agent2 = client.runAgent('agent2', input2);
// Process both streams
await Future.wait([
agent1.forEach((event) => processAgent1(event)),
agent2.forEach((event) => processAgent2(event)),
]);
The client reuses HTTP connections when possible. For high-throughput scenarios:
final httpClient = http.Client();
// Configure connection pooling
final client = AgUiClient(
config: config,
httpClient: httpClient,
);
For long-running streams:
The binary protocol is more efficient than JSON for large payloads:
// Binary protocol is used automatically when supported
final stream = client.runAgent('agent', input);
Mock the client for unit tests:
class MockAgUiClient implements AgUiClient {
@override
Stream<BaseEvent> runAgent(String agentId, RunAgentInput input) {
return Stream.fromIterable([
RunStartedEvent(runId: 'test-run'),
TextMessageStartedEvent(messageId: 'msg-1'),
TextMessageDeltaEvent(messageId: 'msg-1', delta: 'Hello'),
TextMessageFinishedEvent(messageId: 'msg-1'),
RunFinishedEvent(runId: 'test-run'),
]);
}
}