Back to Copilotkit

AgUiClient

showcase/shell-docs/src/content/ag-ui/sdk/dart/client/client.mdx

1.57.26.8 KB
Original Source

AgUiClient

The AgUiClient class is the primary interface for connecting to AG-UI compatible servers. It handles HTTP communication, SSE streaming, binary protocol encoding/decoding, and provides a type-safe API for agent interactions.

Constructor

dart
AgUiClient({
  required AgUiClientConfig config,
  http.Client? httpClient,
  Encoder? encoder,
  Decoder? decoder,
})

Parameters

  • config (required): Configuration object with server details
  • httpClient (optional): Custom HTTP client implementation
  • encoder (optional): Custom encoder for request serialization
  • decoder (optional): Custom decoder for response parsing

Methods

runAgent

Executes an agent and returns a stream of decoded events.

dart
Stream<BaseEvent> runAgent(
  String agentId,
  RunAgentInput input, {
  Map<String, String>? headers,
})

Parameters

  • agentId: Unique identifier for the agent
  • input: Agent input containing messages, context, and configuration
  • headers: Optional additional headers for this request

Returns

A Stream<BaseEvent> that emits protocol events as they arrive.

Example

dart
final input = SimpleRunAgentInput(
  messages: [
    UserMessage(id: 'msg_1', content: 'Hello, agent!'),
  ],
  context: {'sessionId': '12345'},
);

await for (final event in client.runAgent('chat-agent', input)) {
  switch (event) {
    case RunStartedEvent():
      print('Agent started');
    case TextMessageDeltaEvent(delta: final text):
      print('Agent says: $text');
    case RunFinishedEvent():
      print('Agent finished');
  }
}

runAgentRaw

Executes an agent and returns raw SSE messages without decoding.

dart
Stream<SseMessage> runAgentRaw(
  String agentId,
  RunAgentInput input, {
  Map<String, String>? headers,
})

Use Cases

  • Custom event processing
  • Debugging and logging
  • Performance optimization when decoding isn't needed

Example

dart
await for (final message in client.runAgentRaw('agent', input)) {
  print('Raw event: ${message.event}');
  print('Raw data: ${message.data}');
}

cancelAgent

Cancels an active agent execution.

dart
Future<void> cancelAgent(String agentId)

Parameters

  • agentId: The agent ID to cancel

Behavior

  • Immediately closes the SSE connection
  • Cleans up resources
  • Causes the event stream to complete

Example

dart
// Start long-running agent
final stream = client.runAgent('analysis-agent', input);

// Set up listener with timeout
final subscription = stream.listen(
  (event) => processEvent(event),
  onError: (error) => handleError(error),
);

// Cancel after 10 seconds
Timer(Duration(seconds: 10), () async {
  await client.cancelAgent('analysis-agent');
  await subscription.cancel();
});

dispose

Cleans up all resources held by the client.

dart
void dispose()

Important

  • Call this when the client is no longer needed
  • Cancels all active streams
  • Closes HTTP client connections
  • Releases memory resources

Properties

config

dart
final AgUiClientConfig config;

The configuration used to initialize the client. Read-only.

activeStreams

dart
Map<String, bool> get activeStreams;

Returns a map of currently active agent IDs and their status.

Error Handling

The client throws specific exceptions for different error scenarios:

AgUiClientError

General client-side errors.

dart
try {
  await for (final event in client.runAgent('agent', input)) {
    // Process events
  }
} on AgUiClientError catch (e) {
  print('Client error: ${e.message}');
  print('Error code: ${e.code}');
}

NetworkError

Network connectivity issues.

dart
on NetworkError catch (e) {
  print('Network error: ${e.message}');
  // Implement retry logic
}

ValidationError

Input validation failures.

dart
on ValidationError catch (e) {
  print('Validation failed: ${e.message}');
  print('Failed fields: ${e.fields}');
}

ServerError

Server-side errors (5xx status codes).

dart
on ServerError catch (e) {
  print('Server error: ${e.statusCode}');
  print('Message: ${e.message}');
}

Advanced Usage

Custom HTTP Client

Provide a custom HTTP client for advanced scenarios:

dart

final retryClient = RetryClient(http.Client());

final client = AgUiClient(
  config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
  httpClient: retryClient,
);

Custom Encoding/Decoding

Implement custom encoders/decoders for specialized formats:

dart
class CustomEncoder implements Encoder {
  @override
  List<int> encode(RunAgentInput input) {
    // Custom encoding logic
    return utf8.encode(jsonEncode(input.toJson()));
  }
}

class CustomDecoder implements Decoder {
  @override
  BaseEvent decode(List<int> data) {
    // Custom decoding logic
    final json = jsonDecode(utf8.decode(data));
    return BaseEvent.fromJson(json);
  }
}

final client = AgUiClient(
  config: config,
  encoder: CustomEncoder(),
  decoder: CustomDecoder(),
);

Stream Transformations

Transform the event stream for specific use cases:

dart
// Filter only message events
final messageStream = client
    .runAgent('agent', input)
    .where((event) => event is TextMessageEvent);

// Collect all text into a single string
final completeText = await client
    .runAgent('agent', input)
    .whereType<TextMessageDeltaEvent>()
    .map((event) => event.delta)
    .join();

Concurrent Agents

Run multiple agents concurrently:

dart
final agent1 = client.runAgent('agent1', input1);
final agent2 = client.runAgent('agent2', input2);

// Process both streams
await Future.wait([
  agent1.forEach((event) => processAgent1(event)),
  agent2.forEach((event) => processAgent2(event)),
]);

Performance Considerations

Connection Pooling

The client reuses HTTP connections when possible. For high-throughput scenarios:

dart
final httpClient = http.Client();
// Configure connection pooling
final client = AgUiClient(
  config: config,
  httpClient: httpClient,
);

Memory Management

For long-running streams:

  1. Process events immediately rather than buffering
  2. Cancel streams when no longer needed
  3. Dispose of the client when done

Binary Protocol

The binary protocol is more efficient than JSON for large payloads:

dart
// Binary protocol is used automatically when supported
final stream = client.runAgent('agent', input);

Testing

Mock the client for unit tests:

dart
class MockAgUiClient implements AgUiClient {
  @override
  Stream<BaseEvent> runAgent(String agentId, RunAgentInput input) {
    return Stream.fromIterable([
      RunStartedEvent(runId: 'test-run'),
      TextMessageStartedEvent(messageId: 'msg-1'),
      TextMessageDeltaEvent(messageId: 'msg-1', delta: 'Hello'),
      TextMessageFinishedEvent(messageId: 'msg-1'),
      RunFinishedEvent(runId: 'test-run'),
    ]);
  }
}