docs/advanced-guide/grpc-streaming/page.md
GoFr provides comprehensive support for gRPC streaming, enabling efficient real-time communication between services. Streaming is particularly useful for scenarios where you need to send or receive multiple messages over a single connection, such as chat applications, real-time data feeds, or large file transfers.
GoFr supports three types of gRPC streaming:
All streaming methods in GoFr include built-in tracing, metrics, and logging support, ensuring seamless observability for your streaming operations.
Before implementing gRPC streaming, ensure you have:
protoc) installed (version 3+)go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]
export PATH="$PATH:$(go env GOPATH)/bin"
go install gofr.dev/cli/gofr@latest
For detailed setup instructions, refer to the gRPC with GoFr documentation.
To use streaming in your gRPC service, define your RPC methods with the stream keyword in your .proto file:
syntax = "proto3";
option go_package = "path/to/your/proto/file";
message Request {
string message = 1;
}
message Response {
string message = 1;
}
service ChatService {
// Server-side streaming: client sends one request, server sends multiple responses
rpc ServerStream(Request) returns (stream Response);
// Client-side streaming: client sends multiple requests, server sends one response
rpc ClientStream(stream Request) returns (Response);
// Bidirectional streaming: both client and server can send multiple messages
rpc BiDiStream(stream Request) returns (stream Response);
}
GoFr CLI automatically generates streaming-aware server templates. Use the gofr wrap grpc server command:
gofr wrap grpc server -proto=./path/to/your/proto/file
This command generates:
<SERVICE_NAME>_server.go: Template file with streaming method signatures<SERVICE_NAME>_gofr.go: Generated wrapper with streaming instrumentationrequest_gofr.go: Request wrapper for context bindinghealth_gofr.go: Health check server integrationServer-side streaming allows the server to send multiple responses to a single client request. This is useful for scenarios like real-time notifications or progressive data delivery.
Example Implementation:
func (s *ChatServiceGoFrServer) ServerStream(ctx *gofr.Context, stream ChatService_ServerStreamServer) error {
// Bind the initial request
req := Request{}
if err := ctx.Bind(&req); err != nil {
return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
}
// Send multiple responses
for i := 0; i < 5; i++ {
// Check if context is canceled
select {
case <-stream.Context().Done():
return status.Error(codes.Canceled, "client disconnected")
default:
}
resp := &Response{
Message: fmt.Sprintf("Server stream %d: %s", i, req.Message),
}
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Internal, "error sending stream: %v", err)
}
time.Sleep(1 * time.Second) // Simulate processing delay
}
return nil
}
Key Points:
ctx.Bind() to extract the initial requeststream.Send() to send each response messagenil when streaming is complete, or an error if something goes wrongClient-side streaming allows the client to send multiple requests before receiving a single response. This is useful for batch processing or aggregating data from the client.
Example Implementation:
func (s *ChatServiceGoFrServer) ClientStream(ctx *gofr.Context, stream ChatService_ClientStreamServer) error {
var messageCount int
var finalMessage strings.Builder
// Receive multiple messages from client
for {
// Check if context is canceled before receiving
select {
case <-stream.Context().Done():
return status.Error(codes.Canceled, "client disconnected")
default:
}
req, err := stream.Recv()
if err == io.EOF {
// Client has finished sending, send final response
return stream.SendAndClose(&Response{
Message: fmt.Sprintf("Received %d messages. Final: %s",
messageCount, finalMessage.String()),
})
}
if err != nil {
return status.Errorf(codes.Internal, "error receiving stream: %v", err)
}
// Process each message
messageCount++
finalMessage.WriteString(req.Message + " ")
}
}
Key Points:
stream.Recv() in a loop to receive messagesio.EOF to detect when the client has finished sendingstream.SendAndClose() to send the final response and close the streamBidirectional streaming allows both client and server to send messages independently. This is useful for real-time chat applications or interactive protocols.
Example Implementation:
func (s *ChatServiceGoFrServer) BiDiStream(ctx *gofr.Context, stream ChatService_BiDiStreamServer) error {
errChan := make(chan error)
// Handle incoming messages in a goroutine
go func() {
for {
// Check if context is canceled
select {
case <-stream.Context().Done():
errChan <- status.Error(codes.Canceled, "client disconnected")
return
default:
}
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
errChan <- status.Errorf(codes.Internal, "error receiving stream: %v", err)
return
}
// Process request and send response
resp := &Response{Message: "Echo: " + req.Message}
if err := stream.Send(resp); err != nil {
errChan <- status.Errorf(codes.Internal, "error sending stream: %v", err)
return
}
}
errChan <- nil
}()
// Wait for completion or cancellation
select {
case err := <-errChan:
return err
case <-stream.Context().Done():
return status.Error(codes.Canceled, "client disconnected")
}
}
Key Points:
stream.Recv() to receive messagesstream.Send() to send responsesstream.Context().Done() to handle client disconnectionsGenerate the client code using:
gofr wrap grpc client -proto=./path/to/your/proto/file
This generates <SERVICE_NAME>_client.go with streaming client interfaces.
Example Implementation:
func (c *ChatHandler) ServerStreamHandler(ctx *gofr.Context) (any, error) {
// Initiate server stream
stream, err := c.chatClient.ServerStream(ctx, &client.Request{
Message: "stream request",
})
if err != nil {
return nil, fmt.Errorf("failed to initiate server stream: %v", err)
}
var responses []Response
// Receive all streamed responses
for {
res, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break // Stream completed
}
return nil, fmt.Errorf("stream receive error: %v", err)
}
responses = append(responses, res)
ctx.Logger.Infof("Received: %s", res.Message)
}
return responses, nil
}
Example Implementation:
func (c *ChatHandler) ClientStreamHandler(ctx *gofr.Context) (any, error) {
// Initiate client stream
stream, err := c.chatClient.ClientStream(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initiate client stream: %v", err)
}
// Get messages from request body
var requests []*client.Request
if err := ctx.Bind(&requests); err != nil {
return nil, fmt.Errorf("failed to bind requests: %v", err)
}
// Send multiple messages
for _, req := range requests {
if err := stream.Send(req); err != nil {
return nil, fmt.Errorf("failed to send request: %v", err)
}
}
// Close stream and receive final response
response, err := stream.CloseAndRecv()
if err != nil {
return nil, fmt.Errorf("failed to receive final response: %v", err)
}
return response, nil
}
Example Implementation:
func (c *ChatHandler) BiDiStreamHandler(ctx *gofr.Context) (any, error) {
// Initiate bidirectional stream
stream, err := c.chatClient.BiDiStream(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initiate bidirectional stream: %v", err)
}
respChan := make(chan Response)
errChan := make(chan error)
// Receive responses in a goroutine
go func() {
for {
res, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
errChan <- nil
} else {
errChan <- err
}
return
}
respChan <- res
}
}()
// Send messages
messages := []string{"message 1", "message 2", "message 3"}
for _, msg := range messages {
if err := stream.Send(&client.Request{Message: msg}); err != nil {
return nil, fmt.Errorf("failed to send message: %v", err)
}
}
// Close send side
if err := stream.CloseSend(); err != nil {
return nil, fmt.Errorf("failed to close send: %v", err)
}
// Collect responses
var responses []Response
for {
select {
case err := <-errChan:
return responses, err
case resp := <-respChan:
responses = append(responses, resp)
case <-time.After(5 * time.Second):
return nil, errors.New("timeout waiting for responses")
}
}
}
Register your streaming service in main.go just like unary services:
package main
import (
"gofr.dev/examples/grpc/grpc-streaming-server/server"
"gofr.dev/pkg/gofr"
)
func main() {
app := gofr.New()
// Register streaming service
server.RegisterChatServiceServerWithGofr(app, server.NewChatServiceGoFrServer())
app.Run()
}
GoFr automatically provides observability for all streaming operations:
The following metrics are automatically registered:
Each streaming operation (Send, Recv, SendAndClose, CloseSend) automatically creates spans for distributed tracing, allowing you to track the flow of messages through your system.
Streaming operations are automatically logged with:
io.EOF: Indicates the stream has ended normally
SendAndClose()Context Cancellation: Stream was canceled or timed out
stream.Context().Done() for cancellationNetwork Errors: Connection issues during streaming
Example Error Handling:
func (s *ChatServiceGoFrServer) ServerStream(ctx *gofr.Context, stream ChatService_ServerStreamServer) error {
req := Request{}
if err := ctx.Bind(&req); err != nil {
return status.Errorf(codes.InvalidArgument, "invalid request: %v", err)
}
for i := 0; i < 5; i++ {
// Check if context is canceled
select {
case <-stream.Context().Done():
return status.Error(codes.Canceled, "client disconnected")
default:
}
resp := &Response{Message: fmt.Sprintf("Message %d", i)}
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Internal, "error sending stream: %v", err)
}
}
return nil
}
For streaming RPCs (client-stream, server-stream, or bidirectional), GoFr allows you to add stream interceptors using AddGRPCServerStreamInterceptors. These are useful for handling logic that needs to span the entire lifetime of a stream.
func main() {
app := gofr.New()
app.AddGRPCServerStreamInterceptors(streamAuthInterceptor)
// ... register your service
app.Run()
}
func streamAuthInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Example: Validate metadata for the entire stream
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok || !isValidToken(md["auth-token"]) {
return status.Errorf(codes.Unauthenticated, "invalid stream token")
}
// If valid, continue processing the stream
return handler(srv, ss)
}
For more details on adding additional interceptors and server options, refer to the official gRPC Go package.
io.EOF: This is the normal way streams endstream.Context().Done() to detect client disconnectionsCloseSend() when done sending in bidirectional streamsctx.Logger to log stream lifecycle eventsComplete working examples are available in the GoFr repository:
gofr/examples/grpc/grpc-streaming-servergofr/examples/grpc/grpc-streaming-clientThese examples demonstrate all three types of streaming with detailed error handling and logging.