Back to Copilotkit

Encoder

showcase/shell-docs/src/content/ag-ui/sdk/dart/encoder/overview.mdx

1.57.010.3 KB
Original Source

Encoder/Decoder

The AG-UI Dart SDK includes a highly efficient binary encoding system for optimal data transmission between clients and servers. The encoder package provides serialization and deserialization of protocol messages using a compact binary format.

Overview

The encoding system consists of three main components:

  1. Encoder: Serializes Dart objects to binary format
  2. Decoder: Deserializes binary data to Dart objects
  3. ClientCodec: Combines encoder and decoder for bidirectional communication

Encoder

The Encoder class handles serialization of AG-UI protocol objects to binary format.

dart
abstract class Encoder {
  /// Encodes RunAgentInput to binary format
  List<int> encode(RunAgentInput input);

  /// Encodes a single message
  List<int> encodeMessage(Message message);

  /// Encodes tool definitions
  List<int> encodeTools(List<Tool> tools);

  /// Encodes arbitrary JSON data
  List<int> encodeJson(Map<String, dynamic> json);
}

DefaultEncoder

The standard encoder implementation:

dart
class DefaultEncoder implements Encoder {
  final bool compressed;
  final Encoding encoding;

  DefaultEncoder({
    this.compressed = true,
    this.encoding = Encoding.msgpack,
  });

  @override
  List<int> encode(RunAgentInput input) {
    final data = _serialize(input);
    return compressed ? _compress(data) : data;
  }
}

Usage Example

dart
final encoder = DefaultEncoder();

final input = SimpleRunAgentInput(
  messages: [
    UserMessage(id: 'msg_1', content: 'Hello'),
  ],
);

final encoded = encoder.encode(input);
print('Encoded size: ${encoded.length} bytes');

Decoder

The Decoder class handles deserialization of binary data to AG-UI events.

dart
abstract class Decoder {
  /// Decodes binary data to a BaseEvent
  BaseEvent decode(List<int> data);

  /// Decodes a batch of events
  List<BaseEvent> decodeBatch(List<int> data);

  /// Attempts to decode partial data
  DecodedResult? tryDecode(List<int> data);
}

DefaultDecoder

The standard decoder implementation:

dart
class DefaultDecoder implements Decoder {
  final bool compressed;
  final Encoding encoding;

  DefaultDecoder({
    this.compressed = true,
    this.encoding = Encoding.msgpack,
  });

  @override
  BaseEvent decode(List<int> data) {
    final decompressed = compressed ? _decompress(data) : data;
    return _deserialize(decompressed);
  }

  @override
  DecodedResult? tryDecode(List<int> data) {
    try {
      final event = decode(data);
      return DecodedResult(event: event, remainingData: []);
    } catch (e) {
      if (e is IncompleteDataError) {
        return null; // Need more data
      }
      rethrow;
    }
  }
}

Usage Example

dart
final decoder = DefaultDecoder();

// Decode single event
final eventData = receivedFromServer();
final event = decoder.decode(eventData);

switch (event) {
  case TextMessageDeltaEvent(:final delta):
    print('Received text: $delta');
  case ToolCallStartedEvent(:final name):
    print('Tool called: $name');
}

// Decode batch
final batchData = receivedBatchFromServer();
final events = decoder.decodeBatch(batchData);
for (final event in events) {
  processEvent(event);
}

ClientCodec

Combines encoder and decoder for bidirectional communication:

dart
class ClientCodec {
  final Encoder encoder;
  final Decoder decoder;

  ClientCodec({
    Encoder? encoder,
    Decoder? decoder,
  }) : encoder = encoder ?? DefaultEncoder(),
       decoder = decoder ?? DefaultDecoder();

  /// Encodes input for sending to server
  List<int> encodeRequest(RunAgentInput input) {
    return encoder.encode(input);
  }

  /// Decodes response from server
  BaseEvent decodeResponse(List<int> data) {
    return decoder.decode(data);
  }

  /// Handles streaming responses
  Stream<BaseEvent> decodeStream(Stream<List<int>> dataStream) async* {
    final buffer = BytesBuilder();

    await for (final chunk in dataStream) {
      buffer.add(chunk);

      while (true) {
        final result = decoder.tryDecode(buffer.toBytes());
        if (result == null) break; // Need more data

        yield result.event;
        buffer.clear();
        if (result.remainingData.isNotEmpty) {
          buffer.add(result.remainingData);
        }
      }
    }
  }
}

Usage Example

dart
final codec = ClientCodec();

// Encode request
final input = SimpleRunAgentInput(messages: messages);
final requestData = codec.encodeRequest(input);

// Send to server and receive response stream
final responseStream = sendToServer(requestData);

// Decode streaming response
await for (final event in codec.decodeStream(responseStream)) {
  handleEvent(event);
}

Encoding Formats

The SDK supports multiple encoding formats:

MessagePack (Default)

Efficient binary serialization format:

dart
final encoder = DefaultEncoder(
  encoding: Encoding.msgpack,
);

Advantages:

  • Compact binary format
  • Fast encoding/decoding
  • Schema-less flexibility
  • Wide language support

JSON

Human-readable format for debugging:

dart
final encoder = DefaultEncoder(
  encoding: Encoding.json,
  compressed: false, // Optional: disable compression for readability
);

Use cases:

  • Debugging and development
  • Logging and auditing
  • Systems requiring human-readable data

Protocol Buffers

Type-safe binary format:

dart
final encoder = DefaultEncoder(
  encoding: Encoding.protobuf,
);

Advantages:

  • Strong typing
  • Excellent performance
  • Schema evolution support
  • Smallest message size

Compression

The encoder supports optional compression:

dart
// Enable compression (default)
final encoder = DefaultEncoder(compressed: true);

// Disable compression
final encoder = DefaultEncoder(compressed: false);

// Custom compression level
final encoder = DefaultEncoder(
  compressed: true,
  compressionLevel: CompressionLevel.best,
);

Compression Strategies

dart
enum CompressionLevel {
  none,      // No compression
  fast,      // Fast compression, larger size
  balanced,  // Balance between speed and size (default)
  best,      // Best compression, slower
}

Stream Adapter

The EventStreamAdapter handles SSE to binary event conversion:

dart
class EventStreamAdapter {
  final Decoder decoder;

  EventStreamAdapter({Decoder? decoder})
      : decoder = decoder ?? DefaultDecoder();

  /// Converts SSE messages to events
  Stream<BaseEvent> adaptSseStream(Stream<SseMessage> sseStream) async* {
    await for (final message in sseStream) {
      if (message.data != null) {
        final bytes = base64Decode(message.data!);
        yield decoder.decode(bytes);
      }
    }
  }

  /// Converts raw byte stream to events
  Stream<BaseEvent> adaptByteStream(Stream<List<int>> byteStream) {
    return ClientCodec(decoder: decoder).decodeStream(byteStream);
  }
}

Usage Example

dart
final adapter = EventStreamAdapter();

// From SSE
final sseClient = SseClient(url);
final eventStream = adapter.adaptSseStream(sseClient.stream);

// From raw bytes
final socket = await WebSocket.connect(url);
final eventStream = adapter.adaptByteStream(socket);

Error Handling

The encoder/decoder system includes comprehensive error handling:

EncodingError

Thrown when encoding fails:

dart
try {
  final encoded = encoder.encode(input);
} on EncodingError catch (e) {
  print('Encoding failed: ${e.message}');
  print('Object type: ${e.objectType}');
}

DecodingError

Thrown when decoding fails:

dart
try {
  final event = decoder.decode(data);
} on DecodingError catch (e) {
  print('Decoding failed: ${e.message}');
  print('Data length: ${e.dataLength}');
  print('Error position: ${e.position}');
}

IncompleteDataError

Thrown when partial data is received:

dart
try {
  final event = decoder.decode(partialData);
} on IncompleteDataError catch (e) {
  print('Need more data: ${e.expectedBytes} bytes');
  // Buffer and wait for more data
}

Performance Optimization

Buffer Management

Efficient buffer handling for streaming:

dart
class OptimizedDecoder extends DefaultDecoder {
  final _buffer = BytesBuilder(copy: false);

  Stream<BaseEvent> decodeOptimized(Stream<List<int>> input) async* {
    await for (final chunk in input) {
      _buffer.add(chunk);

      // Try to decode multiple events from buffer
      while (_buffer.length > 4) { // Minimum event size
        final result = tryDecode(_buffer.toBytes());
        if (result == null) break;

        yield result.event;
        _buffer.clear();
        if (result.remainingData.isNotEmpty) {
          _buffer.add(result.remainingData);
        }
      }
    }
  }
}

Pooling and Reuse

Reuse encoder/decoder instances:

dart
class CodecPool {
  final _pool = <ClientCodec>[];
  final int maxSize;

  CodecPool({this.maxSize = 10});

  ClientCodec acquire() {
    if (_pool.isNotEmpty) {
      return _pool.removeLast();
    }
    return ClientCodec();
  }

  void release(ClientCodec codec) {
    if (_pool.length < maxSize) {
      _pool.add(codec);
    }
  }
}

Custom Implementations

Create custom encoders for specific requirements:

dart
class CustomEncoder implements Encoder {
  @override
  List<int> encode(RunAgentInput input) {
    // Custom encoding logic
    final json = input.toJson();

    // Add custom headers
    final header = [0xFF, 0xAG, 0x01]; // Magic bytes + version

    // Encode payload
    final payload = utf8.encode(jsonEncode(json));

    // Add checksum
    final checksum = calculateChecksum(payload);

    return [...header, ...payload, ...checksum];
  }

  List<int> calculateChecksum(List<int> data) {
    // Implement checksum algorithm
    return []; // Placeholder
  }
}

Testing

Test encoder/decoder implementations:

dart
test('encodes and decodes correctly', () {
  final encoder = DefaultEncoder();
  final decoder = DefaultDecoder();

  final original = SimpleRunAgentInput(
    messages: [
      UserMessage(id: 'test', content: 'Hello'),
    ],
  );

  final encoded = encoder.encode(original);
  final decoded = decoder.decode(encoded);

  expect(decoded, equals(original));
});

test('handles compression', () {
  final uncompressed = DefaultEncoder(compressed: false);
  final compressed = DefaultEncoder(compressed: true);

  final input = largeInput();

  final uncompressedSize = uncompressed.encode(input).length;
  final compressedSize = compressed.encode(input).length;

  expect(compressedSize, lessThan(uncompressedSize));
});