showcase/shell-docs/src/content/ag-ui/sdk/dart/encoder/overview.mdx
The AG-UI Dart SDK includes a highly efficient binary encoding system for optimal data transmission between clients and servers. The encoder package provides serialization and deserialization of protocol messages using a compact binary format.
The encoding system consists of three main components:
The Encoder class handles serialization of AG-UI protocol objects to binary format.
abstract class Encoder {
/// Encodes RunAgentInput to binary format
List<int> encode(RunAgentInput input);
/// Encodes a single message
List<int> encodeMessage(Message message);
/// Encodes tool definitions
List<int> encodeTools(List<Tool> tools);
/// Encodes arbitrary JSON data
List<int> encodeJson(Map<String, dynamic> json);
}
The standard encoder implementation:
class DefaultEncoder implements Encoder {
final bool compressed;
final Encoding encoding;
DefaultEncoder({
this.compressed = true,
this.encoding = Encoding.msgpack,
});
@override
List<int> encode(RunAgentInput input) {
final data = _serialize(input);
return compressed ? _compress(data) : data;
}
}
final encoder = DefaultEncoder();
final input = SimpleRunAgentInput(
messages: [
UserMessage(id: 'msg_1', content: 'Hello'),
],
);
final encoded = encoder.encode(input);
print('Encoded size: ${encoded.length} bytes');
The Decoder class handles deserialization of binary data to AG-UI events.
abstract class Decoder {
/// Decodes binary data to a BaseEvent
BaseEvent decode(List<int> data);
/// Decodes a batch of events
List<BaseEvent> decodeBatch(List<int> data);
/// Attempts to decode partial data
DecodedResult? tryDecode(List<int> data);
}
The standard decoder implementation:
class DefaultDecoder implements Decoder {
final bool compressed;
final Encoding encoding;
DefaultDecoder({
this.compressed = true,
this.encoding = Encoding.msgpack,
});
@override
BaseEvent decode(List<int> data) {
final decompressed = compressed ? _decompress(data) : data;
return _deserialize(decompressed);
}
@override
DecodedResult? tryDecode(List<int> data) {
try {
final event = decode(data);
return DecodedResult(event: event, remainingData: []);
} catch (e) {
if (e is IncompleteDataError) {
return null; // Need more data
}
rethrow;
}
}
}
final decoder = DefaultDecoder();
// Decode single event
final eventData = receivedFromServer();
final event = decoder.decode(eventData);
switch (event) {
case TextMessageDeltaEvent(:final delta):
print('Received text: $delta');
case ToolCallStartedEvent(:final name):
print('Tool called: $name');
}
// Decode batch
final batchData = receivedBatchFromServer();
final events = decoder.decodeBatch(batchData);
for (final event in events) {
processEvent(event);
}
Combines encoder and decoder for bidirectional communication:
class ClientCodec {
final Encoder encoder;
final Decoder decoder;
ClientCodec({
Encoder? encoder,
Decoder? decoder,
}) : encoder = encoder ?? DefaultEncoder(),
decoder = decoder ?? DefaultDecoder();
/// Encodes input for sending to server
List<int> encodeRequest(RunAgentInput input) {
return encoder.encode(input);
}
/// Decodes response from server
BaseEvent decodeResponse(List<int> data) {
return decoder.decode(data);
}
/// Handles streaming responses
Stream<BaseEvent> decodeStream(Stream<List<int>> dataStream) async* {
final buffer = BytesBuilder();
await for (final chunk in dataStream) {
buffer.add(chunk);
while (true) {
final result = decoder.tryDecode(buffer.toBytes());
if (result == null) break; // Need more data
yield result.event;
buffer.clear();
if (result.remainingData.isNotEmpty) {
buffer.add(result.remainingData);
}
}
}
}
}
final codec = ClientCodec();
// Encode request
final input = SimpleRunAgentInput(messages: messages);
final requestData = codec.encodeRequest(input);
// Send to server and receive response stream
final responseStream = sendToServer(requestData);
// Decode streaming response
await for (final event in codec.decodeStream(responseStream)) {
handleEvent(event);
}
The SDK supports multiple encoding formats:
Efficient binary serialization format:
final encoder = DefaultEncoder(
encoding: Encoding.msgpack,
);
Advantages:
Human-readable format for debugging:
final encoder = DefaultEncoder(
encoding: Encoding.json,
compressed: false, // Optional: disable compression for readability
);
Use cases:
Type-safe binary format:
final encoder = DefaultEncoder(
encoding: Encoding.protobuf,
);
Advantages:
The encoder supports optional compression:
// Enable compression (default)
final encoder = DefaultEncoder(compressed: true);
// Disable compression
final encoder = DefaultEncoder(compressed: false);
// Custom compression level
final encoder = DefaultEncoder(
compressed: true,
compressionLevel: CompressionLevel.best,
);
enum CompressionLevel {
none, // No compression
fast, // Fast compression, larger size
balanced, // Balance between speed and size (default)
best, // Best compression, slower
}
The EventStreamAdapter handles SSE to binary event conversion:
class EventStreamAdapter {
final Decoder decoder;
EventStreamAdapter({Decoder? decoder})
: decoder = decoder ?? DefaultDecoder();
/// Converts SSE messages to events
Stream<BaseEvent> adaptSseStream(Stream<SseMessage> sseStream) async* {
await for (final message in sseStream) {
if (message.data != null) {
final bytes = base64Decode(message.data!);
yield decoder.decode(bytes);
}
}
}
/// Converts raw byte stream to events
Stream<BaseEvent> adaptByteStream(Stream<List<int>> byteStream) {
return ClientCodec(decoder: decoder).decodeStream(byteStream);
}
}
final adapter = EventStreamAdapter();
// From SSE
final sseClient = SseClient(url);
final eventStream = adapter.adaptSseStream(sseClient.stream);
// From raw bytes
final socket = await WebSocket.connect(url);
final eventStream = adapter.adaptByteStream(socket);
The encoder/decoder system includes comprehensive error handling:
Thrown when encoding fails:
try {
final encoded = encoder.encode(input);
} on EncodingError catch (e) {
print('Encoding failed: ${e.message}');
print('Object type: ${e.objectType}');
}
Thrown when decoding fails:
try {
final event = decoder.decode(data);
} on DecodingError catch (e) {
print('Decoding failed: ${e.message}');
print('Data length: ${e.dataLength}');
print('Error position: ${e.position}');
}
Thrown when partial data is received:
try {
final event = decoder.decode(partialData);
} on IncompleteDataError catch (e) {
print('Need more data: ${e.expectedBytes} bytes');
// Buffer and wait for more data
}
Efficient buffer handling for streaming:
class OptimizedDecoder extends DefaultDecoder {
final _buffer = BytesBuilder(copy: false);
Stream<BaseEvent> decodeOptimized(Stream<List<int>> input) async* {
await for (final chunk in input) {
_buffer.add(chunk);
// Try to decode multiple events from buffer
while (_buffer.length > 4) { // Minimum event size
final result = tryDecode(_buffer.toBytes());
if (result == null) break;
yield result.event;
_buffer.clear();
if (result.remainingData.isNotEmpty) {
_buffer.add(result.remainingData);
}
}
}
}
}
Reuse encoder/decoder instances:
class CodecPool {
final _pool = <ClientCodec>[];
final int maxSize;
CodecPool({this.maxSize = 10});
ClientCodec acquire() {
if (_pool.isNotEmpty) {
return _pool.removeLast();
}
return ClientCodec();
}
void release(ClientCodec codec) {
if (_pool.length < maxSize) {
_pool.add(codec);
}
}
}
Create custom encoders for specific requirements:
class CustomEncoder implements Encoder {
@override
List<int> encode(RunAgentInput input) {
// Custom encoding logic
final json = input.toJson();
// Add custom headers
final header = [0xFF, 0xAG, 0x01]; // Magic bytes + version
// Encode payload
final payload = utf8.encode(jsonEncode(json));
// Add checksum
final checksum = calculateChecksum(payload);
return [...header, ...payload, ...checksum];
}
List<int> calculateChecksum(List<int> data) {
// Implement checksum algorithm
return []; // Placeholder
}
}
Test encoder/decoder implementations:
test('encodes and decodes correctly', () {
final encoder = DefaultEncoder();
final decoder = DefaultDecoder();
final original = SimpleRunAgentInput(
messages: [
UserMessage(id: 'test', content: 'Hello'),
],
);
final encoded = encoder.encode(original);
final decoded = decoder.decode(encoded);
expect(decoded, equals(original));
});
test('handles compression', () {
final uncompressed = DefaultEncoder(compressed: false);
final compressed = DefaultEncoder(compressed: true);
final input = largeInput();
final uncompressedSize = uncompressed.encode(input).length;
final compressedSize = compressed.encode(input).length;
expect(compressedSize, lessThan(uncompressedSize));
});