Back to Copilotkit

Encoding Overview

showcase/shell-docs/src/content/ag-ui/sdk/go/encoding/overview.mdx

1.57.09.5 KB
Original Source

Encoding Package

The encoding package provides a comprehensive system for encoding, decoding, and content negotiation in the AG-UI Go SDK. It supports multiple formats, streaming operations, and intelligent content type selection based on client preferences.

Package Overview

The encoding system is built around a set of well-defined interfaces that follow the Interface Segregation Principle, allowing components to implement only the functionality they need.

go

Core Interfaces

Encoder Interface

The Encoder interface defines methods for encoding events to bytes:

go
type Encoder interface {
    // Encode encodes a single event
    Encode(ctx context.Context, event events.Event) ([]byte, error)

    // EncodeMultiple encodes multiple events efficiently
    EncodeMultiple(ctx context.Context, events []events.Event) ([]byte, error)

    // ContentType returns the MIME type for this encoder
    ContentType() string
}

Decoder Interface

The Decoder interface defines methods for decoding events from bytes:

go
type Decoder interface {
    // Decode decodes a single event from raw data
    Decode(ctx context.Context, data []byte) (events.Event, error)

    // DecodeMultiple decodes multiple events from raw data
    DecodeMultiple(ctx context.Context, data []byte) ([]events.Event, error)

    // ContentType returns the MIME type for this decoder
    ContentType() string
}

Codec Interface

The Codec interface combines encoding and decoding capabilities:

go
type Codec interface {
    Encoder
    Decoder
    ContentTypeProvider
    StreamingCapabilityProvider
}

JSON Encoding

The JSON encoder provides high-performance JSON serialization with support for cross-SDK compatibility:

go

// Create a JSON encoder with options
encoder := json.NewJSONEncoder(&encoding.EncodingOptions{
    Pretty:                true,  // Format output for readability
    CrossSDKCompatibility: true,  // Ensure compatibility with other SDKs
    ValidateOutput:        true,  // Validate events before encoding
})

// Encode an event
data, err := encoder.Encode(ctx, event)
if err != nil {
    // Handle encoding error
}

// The encoder returns "application/json" as its content type
contentType := encoder.ContentType()

SSE Writer

The SSE (Server-Sent Events) writer is used for streaming events to clients in real-time:

go

// Create an SSE writer
writer := sse.NewSSEWriter()

// Write an event to an HTTP response writer
err := writer.WriteEvent(ctx, w, event)
if err != nil {
    // Handle write error
}

// Write an error event
err = writer.WriteErrorEvent(ctx, w, errors.New("something went wrong"), "request-123")

// Flush the writer to ensure data is sent immediately
if flusher, ok := w.(http.Flusher); ok {
    flusher.Flush()
}

Server Implementation Example

Here's a complete example of using the SSE writer in an HTTP endpoint:

go
func handleAgentStream(w http.ResponseWriter, r *http.Request) {
    // Set headers for SSE
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // Create SSE writer
    sseWriter := sse.NewSSEWriter()

    // Send events
    events := generateEvents() // Your event generation logic

    for _, event := range events {
        if err := sseWriter.WriteEvent(r.Context(), w, event); err != nil {
            log.Printf("Failed to write event: %v", err)
            return
        }

        // Flush after each event for real-time streaming
        if f, ok := w.(http.Flusher); ok {
            f.Flush()
        }
    }
}

Content Negotiation

The negotiation package implements RFC 7231 compliant content type negotiation:

go

// Create a negotiator with preferred content type
negotiator := negotiation.NewContentNegotiator("application/json")

// Negotiate based on Accept header
acceptHeader := r.Header.Get("Accept")
contentType, err := negotiator.Negotiate(acceptHeader)
if err != nil {
    // No acceptable content type found
    contentType = "application/json" // Fall back to default
}

// Check if a specific content type is supported
if negotiator.CanHandle("application/protobuf") {
    // Use protobuf encoding
}

// Get list of supported types
supportedTypes := negotiator.SupportedTypes()

Custom Content Types

You can register custom content types with their capabilities:

go
negotiator.RegisterType(&negotiation.TypeCapabilities{
    ContentType:        "application/vnd.myapp+json",
    CanStream:          true,
    CompressionSupport: []string{"gzip", "deflate"},
    Priority:           0.95,
    Extensions:         []string{".myapp.json"},
    Aliases:            []string{"application/x-myapp"},
})

Streaming Operations

For handling large volumes of events, the encoding package supports streaming operations:

go
type StreamEncoder interface {
    // EncodeStream encodes events from a channel to a writer
    EncodeStream(ctx context.Context, input <-chan events.Event, output io.Writer) error

    // Session management
    StartStream(ctx context.Context, w io.Writer) error
    EndStream(ctx context.Context) error

    // Event processing
    WriteEvent(ctx context.Context, event events.Event) error
}

Channel-Based Streaming

Stream events from a channel directly to an output writer:

go
// Create event channel
eventChan := make(chan events.Event, 100)

// Start streaming in a goroutine
go func() {
    err := streamEncoder.EncodeStream(ctx, eventChan, w)
    if err != nil {
        log.Printf("Streaming error: %v", err)
    }
}()

// Send events to the channel
for _, event := range events {
    select {
    case eventChan <- event:
        // Event sent
    case <-ctx.Done():
        // Context cancelled
        return
    }
}
close(eventChan)

Configuration Options

The encoding package provides comprehensive configuration options:

go
// EncodingOptions for encoding operations
type EncodingOptions struct {
    Pretty                bool   // Format output for readability
    Compression           string // Compression algorithm (e.g., "gzip")
    BufferSize            int    // Buffer size for streaming
    MaxSize               int64  // Maximum encoded size (0 for unlimited)
    ValidateOutput        bool   // Validate output after encoding
    CrossSDKCompatibility bool   // Ensure compatibility with other SDKs
}

// DecodingOptions for decoding operations
type DecodingOptions struct {
    Strict             bool  // Enable strict validation
    MaxSize            int64 // Maximum input size (0 for unlimited)
    BufferSize         int   // Buffer size for streaming
    AllowUnknownFields bool  // Allow unknown fields in input
    ValidateEvents     bool  // Validate events after decoding
}

Error Handling

The encoding package defines specific error types for better error handling:

go
// Handle encoding errors
data, err := encoder.Encode(ctx, event)
if err != nil {
    var encErr *encoding.EncodingError
    if errors.As(err, &encErr) {
        log.Printf("Encoding failed for format %s: %s", encErr.Format, encErr.Message)
        if encErr.Cause != nil {
            log.Printf("Underlying error: %v", encErr.Cause)
        }
    }
}

// Handle decoding errors
event, err := decoder.Decode(ctx, data)
if err != nil {
    var decErr *encoding.DecodingError
    if errors.As(err, &decErr) {
        log.Printf("Decoding failed for format %s: %s", decErr.Format, decErr.Message)
    }
}

Examples

Complete Server Endpoint

Here's a complete example of an AG-UI agent endpoint with content negotiation and SSE streaming:

go
func handleAgent(w http.ResponseWriter, r *http.Request) {
    // Content negotiation
    negotiator := negotiation.NewContentNegotiator("application/json")
    acceptHeader := r.Header.Get("Accept")
    contentType, _ := negotiator.Negotiate(acceptHeader)

    // For SSE streaming
    if contentType == "text/event-stream" || r.Header.Get("Accept") == "text/event-stream" {
        w.Header().Set("Content-Type", "text/event-stream")
        w.Header().Set("Cache-Control", "no-cache")

        sseWriter := sse.NewSSEWriter()

        // Process request and generate events
        events := processAgentRequest(r)

        for _, event := range events {
            if err := sseWriter.WriteEvent(r.Context(), w, event); err != nil {
                sseWriter.WriteErrorEvent(r.Context(), w, err, "req-123")
                return
            }

            if f, ok := w.(http.Flusher); ok {
                f.Flush()
            }
        }
        return
    }

    // For JSON response
    w.Header().Set("Content-Type", "application/json")

    encoder := json.NewJSONEncoder(&encoding.EncodingOptions{
        Pretty: r.URL.Query().Get("pretty") == "true",
    })

    events := processAgentRequest(r)
    data, err := encoder.EncodeMultiple(r.Context(), events)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Write(data)
}

Custom Codec Implementation

Implement a custom codec for a specific format:

go
type CustomCodec struct {
    options *encoding.EncodingOptions
}

func (c *CustomCodec) Encode(ctx context.Context, event events.Event) ([]byte, error) {
    // Custom encoding logic
    return customSerialize(event)
}

func (c *CustomCodec) Decode(ctx context.Context, data []byte) (events.Event, error) {
    // Custom decoding logic
    return customDeserialize(data)
}

func (c *CustomCodec) ContentType() string {
    return "application/vnd.custom+binary"
}

func (c *CustomCodec) SupportsStreaming() bool {
    return true
}