Back to Copilotkit

SSE Client

showcase/shell-docs/src/content/ag-ui/sdk/go/client/sse-client.mdx

1.57.010.4 KB
Original Source

SSE Client

The SSE Client provides real-time streaming connectivity to AG-UI agents using Server-Sent Events (SSE). It handles connection management, authentication, and event streaming with built-in error handling and timeout configuration.

go

Configuration

The SSE client is configured using a Config struct with the following fields:

FieldTypeDefaultDescription
EndpointstringRequiredThe agent endpoint URL
APIKeystringOptionalAPI key for authentication
AuthHeaderstring"Authorization"Header name for authentication
AuthSchemestring"Bearer"Authentication scheme (used with Authorization header)
ConnectTimeouttime.Duration30sTimeout for establishing connection
ReadTimeouttime.Duration5mTimeout for reading from stream
BufferSizeint100Size of the frame channel buffer
Logger*logrus.LoggerNew loggerLogger instance for debugging

Creating a Client

Initialize a new SSE client with your configuration:

go
client := sse.NewClient(sse.Config{
    Endpoint: "https://api.example.com/agent",
    APIKey:   "your-api-key",
})

With custom configuration:

go
client := sse.NewClient(sse.Config{
    Endpoint:       "https://api.example.com/agent",
    APIKey:         "your-api-key",
    AuthHeader:     "X-API-Key",
    ConnectTimeout: 60 * time.Second,
    ReadTimeout:    10 * time.Minute,
    BufferSize:     200,
    Logger:         logrus.New(),
})

Streaming

The Stream method establishes an SSE connection and returns channels for receiving frames and errors.

StreamOptions

Configure the stream with StreamOptions:

go
type StreamOptions struct {
    Context context.Context       // Context for cancellation
    Payload interface{}          // Request payload (will be JSON encoded)
    Headers map[string]string    // Additional HTTP headers
}

Stream Method

go
frames, errors, err := client.Stream(sse.StreamOptions{
    Context: context.Background(),
    Payload: map[string]interface{}{
        "threadId": "thread_123",
        "messages": []map[string]interface{}{
            {
                "role":    "user",
                "content": "Hello, agent!",
            },
        },
    },
})

The method returns:

  • frames <-chan Frame: Channel for receiving SSE frames
  • errors <-chan error: Channel for receiving stream errors
  • err error: Immediate error if connection fails

Frame Processing

The Frame struct contains SSE data and metadata:

go
type Frame struct {
    Data      []byte        // Raw SSE data
    Timestamp time.Time     // Timestamp when frame was received
}

Process frames using the event decoder:

go

decoder := events.NewEventDecoder()

for {
    select {
    case frame := <-frames:
        if frame.Data == nil {
            // Stream ended
            return
        }

        event, err := decoder.Decode(frame.Data)
        if err != nil {
            log.Printf("decode error: %v", err)
            continue
        }

        switch e := event.(type) {
        case *events.TextMessageStartEvent:
            fmt.Printf("Message started: %s\n", e.MessageID)
        case *events.TextMessageContentEvent:
            fmt.Printf("Content: %s\n", e.Delta)
        case *events.TextMessageEndEvent:
            fmt.Printf("Message ended: %s\n", e.MessageID)
        }

    case err := <-errors:
        if err != nil {
            log.Printf("stream error: %v", err)
            return
        }
    }
}

Connection Management

Context Cancellation

Use context for graceful shutdown:

go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

frames, errors, err := client.Stream(sse.StreamOptions{
    Context: ctx,
    Payload: payload,
})

// Cancel the stream when needed
cancel()

Timeouts

The client automatically manages timeouts:

  • ConnectTimeout: Applied when establishing the initial connection
  • ReadTimeout: Applied to each read operation from the stream

Graceful Shutdown

Close the client to clean up resources:

go
defer client.Close()

Authentication

The client supports multiple authentication methods:

Bearer Token (Default)

go
client := sse.NewClient(sse.Config{
    Endpoint: "https://api.example.com/agent",
    APIKey:   "your-api-key",
    // Uses Authorization: Bearer your-api-key
})

Custom Authentication Scheme

go
client := sse.NewClient(sse.Config{
    Endpoint:   "https://api.example.com/agent",
    APIKey:     "your-api-key",
    AuthScheme: "Token",
    // Uses Authorization: Token your-api-key
})

Custom Header

go
client := sse.NewClient(sse.Config{
    Endpoint:   "https://api.example.com/agent",
    APIKey:     "your-api-key",
    AuthHeader: "X-API-Key",
    // Uses X-API-Key: your-api-key
})

Additional Headers

Pass custom headers via StreamOptions:

go
frames, errors, err := client.Stream(sse.StreamOptions{
    Context: ctx,
    Payload: payload,
    Headers: map[string]string{
        "X-Request-ID": "req_123",
        "X-Session-ID": "session_456",
    },
})

Error Handling

The client provides errors through the error channel during streaming:

go
for {
    select {
    case frame := <-frames:
        // Process frame

    case err := <-errors:
        if err != nil {
            // Handle error based on type
            if strings.Contains(err.Error(), "timeout") {
                // Handle timeout
                log.Println("Stream timeout, reconnecting...")
                // Implement reconnection logic
            } else if strings.Contains(err.Error(), "read error") {
                // Handle read error
                log.Printf("Read error: %v", err)
                return
            }
        }
    }
}

Reconnection Pattern

Implement automatic reconnection for resilient streaming:

go
func streamWithReconnect(client *sse.Client, opts sse.StreamOptions) {
    maxRetries := 5
    retryDelay := time.Second

    for attempt := 0; attempt < maxRetries; attempt++ {
        if attempt > 0 {
            log.Printf("Reconnecting... (attempt %d/%d)", attempt+1, maxRetries)
            time.Sleep(retryDelay)
            retryDelay *= 2 // Exponential backoff
        }

        frames, errors, err := client.Stream(opts)
        if err != nil {
            log.Printf("Connection failed: %v", err)
            continue
        }

        // Process frames
        for {
            select {
            case frame := <-frames:
                if frame.Data == nil {
                    // Stream ended, reconnect
                    break
                }
                // Process frame

            case err := <-errors:
                if err != nil {
                    log.Printf("Stream error: %v", err)
                    break
                }

            case <-opts.Context.Done():
                log.Println("Context cancelled, stopping")
                return
            }
        }
    }

    log.Printf("Max retries exceeded, giving up")
}

Complete Example

Here's a complete example demonstrating the SSE client:

go
package main

    "context"
    "fmt"
    "log"
    "time"

    "github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/client/sse"
    "github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/core/events"
)

func main() {
    // Create SSE client
    client := sse.NewClient(sse.Config{
        Endpoint: "https://api.example.com/agent",
        APIKey:   "your-api-key",
    })
    defer client.Close()

    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    // Start streaming
    frames, errors, err := client.Stream(sse.StreamOptions{
        Context: ctx,
        Payload: map[string]interface{}{
            "threadId": events.NewThreadID(),
            "runId":    events.NewRunID(),
            "messages": []map[string]interface{}{
                {
                    "role":    "user",
                    "content": "What's the weather like?",
                },
            },
        },
    })

    if err != nil {
        log.Fatalf("Failed to start stream: %v", err)
    }

    // Create event decoder
    decoder := events.NewEventDecoder()

    // Process stream
    for {
        select {
        case frame := <-frames:
            if frame.Data == nil {
                fmt.Println("Stream completed")
                return
            }

            event, err := decoder.Decode(frame.Data)
            if err != nil {
                log.Printf("Decode error: %v", err)
                continue
            }

            // Handle different event types
            switch e := event.(type) {
            case *events.RunStartedEvent:
                fmt.Printf("Run started: %s\n", e.RunID)

            case *events.TextMessageStartEvent:
                fmt.Printf("\nAssistant: ")

            case *events.TextMessageContentEvent:
                fmt.Print(e.Delta)

            case *events.TextMessageEndEvent:
                fmt.Println()

            case *events.ToolCallStartEvent:
                fmt.Printf("Tool call: %s\n", e.FunctionName)

            case *events.ToolCallResultEvent:
                fmt.Printf("Tool result: %v\n", e.Result)

            case *events.RunFinishedEvent:
                fmt.Printf("Run completed: %s\n", e.RunID)
                return

            case *events.RunErrorEvent:
                fmt.Printf("Run error: %v\n", e.Error)
                return
            }

        case err := <-errors:
            if err != nil {
                log.Printf("Stream error: %v", err)
                return
            }

        case <-ctx.Done():
            fmt.Println("Context timeout")
            return
        }
    }
}

This example demonstrates:

  • Client creation and configuration
  • Context-based timeout management
  • Stream initialization with payload
  • Event decoding and type-based handling
  • Error handling
  • Graceful shutdown