src/Azure/Orleans.Streaming.EventHubs/README.md
Microsoft Orleans Stream Provider for Azure Event Hubs enables Orleans applications to leverage Azure Event Hubs for reliable, scalable event processing. This provider allows you to use Event Hubs as a streaming backbone for your Orleans application to both produce and consume streams of events.
To use this package, install it via NuGet:
dotnet add package Microsoft.Orleans.Streaming.EventHubs
using Microsoft.Extensions.Hosting;
using Orleans.Configuration;
using Orleans.Hosting;
namespace ExampleGrains;
var builder = Host.CreateApplicationBuilder(args)
.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
// Configure Azure Event Hubs as a stream provider
.AddEventHubStreams(
"EventHubStreamProvider",
configurator =>
{
configurator.ConfigureEventHub(builder => builder.Configure(options =>
{
options.ConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
options.ConsumerGroup = "YOUR_CONSUMER_GROUP"; // Default is "$Default"
options.Path = "YOUR_EVENT_HUB_NAME";
}));
configurator.UseAzureTableCheckpointer(builder => builder.Configure(options =>
{
options.ConnectionString = "YOUR_STORAGE_CONNECTION_STRING";
options.TableName = "EventHubCheckpoints"; // Optional
}));
});
});
// Run the host
await builder.RunAsync();
// Grain interface
public interface IStreamProcessingGrain : IGrainWithGuidKey
{
Task StartProcessing();
}
// Grain implementation
public class StreamProcessingGrain : Grain, IStreamProcessingGrain
{
private IStreamProvider _streamProvider;
private IAsyncStream<MyEvent> _stream;
private StreamSubscriptionHandle<MyEvent> _subscription;
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
// Get the stream provider
_streamProvider = GetStreamProvider("EventHubStreamProvider");
// Get a reference to a specific stream
_stream = _streamProvider.GetStream<MyEvent>(this.GetPrimaryKey(), "MyStreamNamespace");
await base.OnActivateAsync(cancellationToken);
}
public async Task StartProcessing()
{
// Subscribe to the stream to process events
_subscription = await _stream.SubscribeAsync(OnNextAsync);
}
private Task OnNextAsync(MyEvent evt, StreamSequenceToken token)
{
Console.WriteLine($"Received event: {evt.Data}");
return Task.CompletedTask;
}
// Produce an event to the stream
public Task SendEvent(MyEvent evt)
{
return _stream.OnNextAsync(evt);
}
}
// Event class
public class MyEvent
{
public string Data { get; set; }
}
For more comprehensive documentation, please refer to: