showcase/shell-docs/src/content/ag-ui/sdk/go/client/sse-client.mdx
The SSE Client provides real-time streaming connectivity to AG-UI agents using Server-Sent Events (SSE). It handles connection management, authentication, and event streaming with built-in error handling and timeout configuration.
The SSE client is configured using a Config struct with the following fields:
| Field | Type | Default | Description |
|---|---|---|---|
Endpoint | string | Required | The agent endpoint URL |
APIKey | string | Optional | API key for authentication |
AuthHeader | string | "Authorization" | Header name for authentication |
AuthScheme | string | "Bearer" | Authentication scheme (used with Authorization header) |
ConnectTimeout | time.Duration | 30s | Timeout for establishing connection |
ReadTimeout | time.Duration | 5m | Timeout for reading from stream |
BufferSize | int | 100 | Size of the frame channel buffer |
Logger | *logrus.Logger | New logger | Logger instance for debugging |
Initialize a new SSE client with your configuration:
client := sse.NewClient(sse.Config{
Endpoint: "https://api.example.com/agent",
APIKey: "your-api-key",
})
With custom configuration:
client := sse.NewClient(sse.Config{
Endpoint: "https://api.example.com/agent",
APIKey: "your-api-key",
AuthHeader: "X-API-Key",
ConnectTimeout: 60 * time.Second,
ReadTimeout: 10 * time.Minute,
BufferSize: 200,
Logger: logrus.New(),
})
The Stream method establishes an SSE connection and returns channels for receiving frames and errors.
Configure the stream with StreamOptions:
type StreamOptions struct {
Context context.Context // Context for cancellation
Payload interface{} // Request payload (will be JSON encoded)
Headers map[string]string // Additional HTTP headers
}
frames, errors, err := client.Stream(sse.StreamOptions{
Context: context.Background(),
Payload: map[string]interface{}{
"threadId": "thread_123",
"messages": []map[string]interface{}{
{
"role": "user",
"content": "Hello, agent!",
},
},
},
})
The method returns:
frames <-chan Frame: Channel for receiving SSE frameserrors <-chan error: Channel for receiving stream errorserr error: Immediate error if connection failsThe Frame struct contains SSE data and metadata:
type Frame struct {
Data []byte // Raw SSE data
Timestamp time.Time // Timestamp when frame was received
}
Process frames using the event decoder:
decoder := events.NewEventDecoder()
for {
select {
case frame := <-frames:
if frame.Data == nil {
// Stream ended
return
}
event, err := decoder.Decode(frame.Data)
if err != nil {
log.Printf("decode error: %v", err)
continue
}
switch e := event.(type) {
case *events.TextMessageStartEvent:
fmt.Printf("Message started: %s\n", e.MessageID)
case *events.TextMessageContentEvent:
fmt.Printf("Content: %s\n", e.Delta)
case *events.TextMessageEndEvent:
fmt.Printf("Message ended: %s\n", e.MessageID)
}
case err := <-errors:
if err != nil {
log.Printf("stream error: %v", err)
return
}
}
}
Use context for graceful shutdown:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
frames, errors, err := client.Stream(sse.StreamOptions{
Context: ctx,
Payload: payload,
})
// Cancel the stream when needed
cancel()
The client automatically manages timeouts:
ConnectTimeout: Applied when establishing the initial connectionReadTimeout: Applied to each read operation from the streamClose the client to clean up resources:
defer client.Close()
The client supports multiple authentication methods:
client := sse.NewClient(sse.Config{
Endpoint: "https://api.example.com/agent",
APIKey: "your-api-key",
// Uses Authorization: Bearer your-api-key
})
client := sse.NewClient(sse.Config{
Endpoint: "https://api.example.com/agent",
APIKey: "your-api-key",
AuthScheme: "Token",
// Uses Authorization: Token your-api-key
})
client := sse.NewClient(sse.Config{
Endpoint: "https://api.example.com/agent",
APIKey: "your-api-key",
AuthHeader: "X-API-Key",
// Uses X-API-Key: your-api-key
})
Pass custom headers via StreamOptions:
frames, errors, err := client.Stream(sse.StreamOptions{
Context: ctx,
Payload: payload,
Headers: map[string]string{
"X-Request-ID": "req_123",
"X-Session-ID": "session_456",
},
})
The client provides errors through the error channel during streaming:
for {
select {
case frame := <-frames:
// Process frame
case err := <-errors:
if err != nil {
// Handle error based on type
if strings.Contains(err.Error(), "timeout") {
// Handle timeout
log.Println("Stream timeout, reconnecting...")
// Implement reconnection logic
} else if strings.Contains(err.Error(), "read error") {
// Handle read error
log.Printf("Read error: %v", err)
return
}
}
}
}
Implement automatic reconnection for resilient streaming:
func streamWithReconnect(client *sse.Client, opts sse.StreamOptions) {
maxRetries := 5
retryDelay := time.Second
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
log.Printf("Reconnecting... (attempt %d/%d)", attempt+1, maxRetries)
time.Sleep(retryDelay)
retryDelay *= 2 // Exponential backoff
}
frames, errors, err := client.Stream(opts)
if err != nil {
log.Printf("Connection failed: %v", err)
continue
}
// Process frames
for {
select {
case frame := <-frames:
if frame.Data == nil {
// Stream ended, reconnect
break
}
// Process frame
case err := <-errors:
if err != nil {
log.Printf("Stream error: %v", err)
break
}
case <-opts.Context.Done():
log.Println("Context cancelled, stopping")
return
}
}
}
log.Printf("Max retries exceeded, giving up")
}
Here's a complete example demonstrating the SSE client:
package main
"context"
"fmt"
"log"
"time"
"github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/client/sse"
"github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/core/events"
)
func main() {
// Create SSE client
client := sse.NewClient(sse.Config{
Endpoint: "https://api.example.com/agent",
APIKey: "your-api-key",
})
defer client.Close()
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Start streaming
frames, errors, err := client.Stream(sse.StreamOptions{
Context: ctx,
Payload: map[string]interface{}{
"threadId": events.NewThreadID(),
"runId": events.NewRunID(),
"messages": []map[string]interface{}{
{
"role": "user",
"content": "What's the weather like?",
},
},
},
})
if err != nil {
log.Fatalf("Failed to start stream: %v", err)
}
// Create event decoder
decoder := events.NewEventDecoder()
// Process stream
for {
select {
case frame := <-frames:
if frame.Data == nil {
fmt.Println("Stream completed")
return
}
event, err := decoder.Decode(frame.Data)
if err != nil {
log.Printf("Decode error: %v", err)
continue
}
// Handle different event types
switch e := event.(type) {
case *events.RunStartedEvent:
fmt.Printf("Run started: %s\n", e.RunID)
case *events.TextMessageStartEvent:
fmt.Printf("\nAssistant: ")
case *events.TextMessageContentEvent:
fmt.Print(e.Delta)
case *events.TextMessageEndEvent:
fmt.Println()
case *events.ToolCallStartEvent:
fmt.Printf("Tool call: %s\n", e.FunctionName)
case *events.ToolCallResultEvent:
fmt.Printf("Tool result: %v\n", e.Result)
case *events.RunFinishedEvent:
fmt.Printf("Run completed: %s\n", e.RunID)
return
case *events.RunErrorEvent:
fmt.Printf("Run error: %v\n", e.Error)
return
}
case err := <-errors:
if err != nil {
log.Printf("Stream error: %v", err)
return
}
case <-ctx.Done():
fmt.Println("Context timeout")
return
}
}
}
This example demonstrates: