Back to Orleans

Microsoft Orleans Stream Provider for NATS

src/Orleans.Streaming.NATS/README.md

10.1.04.3 KB
Original Source

Microsoft Orleans Stream Provider for NATS

Introduction

Microsoft Orleans Stream Provider for NATS enables Orleans applications to leverage NATS JetStream for reliable, scalable event processing. This provider allows you to use NATS JetStream as a streaming backbone for your Orleans application to both produce and consume streams of events.

Getting Started

To use this package, install it via NuGet:

shell
dotnet add package Microsoft.Orleans.Streaming.NATS

Example - Configuring NATS Stream Provider

csharp
using Microsoft.Extensions.Hosting;
using Orleans.Hosting;
using Orleans.Streaming.NATS.Hosting;

var builder = Host.CreateApplicationBuilder(args)
    .UseOrleans(siloBuilder =>
    {
        siloBuilder
            .UseLocalhostClustering()
            // Configure NATS JetStream as a stream provider
            .AddNatsStreams(
                "NatsStreamProvider",
                options =>
                {
                    options.StreamName = "orleans-stream";
                    // Optional: Configure NATS client options
                    // options.NatsClientOptions = new NatsOpts { Url = "nats://localhost:4222" };
                    // Optional: Configure batch size (default: 100)
                    // options.BatchSize = 100;
                    // Optional: Configure partition count (default: 8)
                    // options.PartitionCount = 8;
                });
    });

// Run the host
await builder.RunAsync();

Example - Configuring NATS Streams on Client

csharp
using Microsoft.Extensions.Hosting;
using Orleans.Streaming.NATS.Hosting;

var builder = Host.CreateApplicationBuilder(args)
    .UseOrleansClient(clientBuilder =>
    {
        clientBuilder
            .UseLocalhostClustering()
            .AddNatsStreams(
                "NatsStreamProvider",
                options =>
                {
                    options.StreamName = "orleans-stream";
                });
    });

await builder.RunAsync();

Example - Using NATS Streams in a Grain

csharp
using Orleans;
using Orleans.Streams;

// Grain interface
public interface IStreamProcessingGrain : IGrainWithGuidKey
{
    Task StartProcessing();
    Task SendEvent(MyEvent evt);
}

// Grain implementation
public class StreamProcessingGrain : Grain, IStreamProcessingGrain
{
    private IStreamProvider _streamProvider;
    private IAsyncStream<MyEvent> _stream;
    private StreamSubscriptionHandle<MyEvent> _subscription;

    public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        // Get the stream provider
        _streamProvider = this.GetStreamProvider("NatsStreamProvider");
        
        // Get a reference to a specific stream
        _stream = _streamProvider.GetStream<MyEvent>(
            StreamId.Create("MyStreamNamespace", this.GetPrimaryKey()));
        
        await base.OnActivateAsync(cancellationToken);
    }

    public async Task StartProcessing()
    {
        // Subscribe to the stream to process events
        _subscription = await _stream.SubscribeAsync(OnNextAsync);
    }

    private Task OnNextAsync(MyEvent evt, StreamSequenceToken token)
    {
        Console.WriteLine($"Received event: {evt.Data}");
        return Task.CompletedTask;
    }

    // Produce an event to the stream
    public Task SendEvent(MyEvent evt)
    {
        return _stream.OnNextAsync(evt);
    }
}

// Event class
public class MyEvent
{
    public string Data { get; set; }
}

Documentation

For more comprehensive documentation, please refer to:

Feedback & Contributing