src/Orleans.DurableJobs/README.md
Microsoft Orleans Durable Jobs provides a distributed, scalable system for scheduling one-time jobs that execute at a specific time. Unlike Orleans Reminders which are designed for recurring tasks, Durable Jobs are ideal for one-time future events such as appointment notifications, delayed processing, scheduled workflow steps, and time-based triggers.
Key Features:
To use this package, install it via NuGet:
dotnet add package Microsoft.Orleans.DurableJobs
For production scenarios with persistence, also install a storage provider:
dotnet add package Microsoft.Orleans.DurableJobs.AzureStorage
using Microsoft.Extensions.Hosting;
using Orleans.Hosting;
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
// Configure in-memory Durable Jobs (no persistence)
.UseInMemoryDurableJobs();
});
await builder.Build().RunAsync();
using Microsoft.Extensions.Hosting;
using Orleans.Hosting;
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
// Configure Azure Storage Durable Jobs
.UseAzureStorageDurableJobs(options =>
{
options.Configure(o =>
{
o.BlobServiceClient = new Azure.Storage.Blobs.BlobServiceClient("YOUR_CONNECTION_STRING");
o.ContainerName = "durable-jobs";
});
});
});
await builder.Build().RunAsync();
builder.UseOrleans(siloBuilder =>
{
siloBuilder
.UseLocalhostClustering()
.UseInMemoryDurableJobs()
.ConfigureServices(services =>
{
services.Configure<DurableJobsOptions>(options =>
{
// Duration of each job shard (jobs are partitioned by time)
options.ShardDuration = TimeSpan.FromMinutes(5);
// Maximum number of jobs that can execute concurrently on each silo
options.MaxConcurrentJobsPerSilo = 100;
// Custom retry policy
options.ShouldRetry = (context, exception) =>
{
// Retry up to 3 times with exponential backoff
if (context.DequeueCount < 3)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, context.DequeueCount));
return DateTimeOffset.UtcNow.Add(delay);
}
return null; // Don't retry
};
});
});
});
using Orleans;
using Orleans.DurableJobs;
public interface INotificationGrain : IGrainWithStringKey
{
Task ScheduleNotification(string message, DateTimeOffset sendTime);
Task CancelScheduledNotification();
}
public class NotificationGrain : Grain, INotificationGrain, IDurableJobHandler
{
private readonly ILocalDurableJobManager _jobManager;
private readonly ILogger<NotificationGrain> _logger;
private IDurableJob? _durableJob;
public NotificationGrain(
ILocalDurableJobManager jobManager,
ILogger<NotificationGrain> logger)
{
_jobManager = jobManager;
_logger = logger;
}
public async Task ScheduleNotification(string message, DateTimeOffset sendTime)
{
var userId = this.GetPrimaryKeyString();
var metadata = new Dictionary<string, string>
{
["Message"] = message
};
_durableJob = await _jobManager.ScheduleJobAsync(
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = "SendNotification",
DueTime = sendTime,
Metadata = metadata
},
CancellationToken.None);
_logger.LogInformation(
"Scheduled notification for user {UserId} at {SendTime} (JobId: {JobId})",
userId, sendTime, _durableJob.Id);
}
public async Task CancelScheduledNotification()
{
if (_durableJob is null)
{
_logger.LogWarning("No scheduled notification to cancel");
return;
}
var canceled = await _jobManager.TryCancelDurableJobAsync(_durableJob);
_logger.LogInformation("Notification {JobId} canceled: {Canceled}", _durableJob.Id, canceled);
if (canceled)
{
_durableJob = null;
}
}
// This method is called when the durable job executes
public Task ExecuteJobAsync(IJobRunContext context, CancellationToken cancellationToken)
{
var userId = this.GetPrimaryKeyString();
var message = context.Job.Metadata?["Message"];
_logger.LogInformation(
"Sending notification to user {UserId}: {Message} (Job: {JobId}, Run: {RunId}, Attempt: {DequeueCount})",
userId, message, context.Job.Id, context.RunId, context.DequeueCount);
// Send the notification here
// If this throws an exception, the job can be retried based on your retry policy
_durableJob = null;
return Task.CompletedTask;
}
}
public interface IOrderGrain : IGrainWithGuidKey
{
Task PlaceOrder(OrderDetails details);
Task CancelOrder();
}
public class OrderGrain : Grain, IOrderGrain, IDurableJobHandler
{
private readonly ILocalDurableJobManager _jobManager;
private readonly IOrderService _orderService;
private readonly IGrainFactory _grainFactory;
private readonly ILogger<OrderGrain> _logger;
public OrderGrain(
ILocalDurableJobManager jobManager,
IOrderService orderService,
IGrainFactory grainFactory,
ILogger<OrderGrain> logger)
{
_jobManager = jobManager;
_orderService = orderService;
_grainFactory = grainFactory;
_logger = logger;
}
public async Task PlaceOrder(OrderDetails details)
{
var orderId = this.GetPrimaryKey();
// Create the order
await _orderService.CreateOrderAsync(orderId, details);
// Schedule delivery reminder for 24 hours before delivery
var reminderTime = details.DeliveryDate.AddHours(-24);
await _jobManager.ScheduleJobAsync(
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = "DeliveryReminder",
DueTime = reminderTime,
Metadata = new Dictionary<string, string>
{
["Step"] = "DeliveryReminder",
["CustomerId"] = details.CustomerId,
["OrderNumber"] = details.OrderNumber
}
},
CancellationToken.None);
// Schedule order expiration if payment not received
var expirationTime = DateTimeOffset.UtcNow.AddHours(24);
await _jobManager.ScheduleJobAsync(
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = "OrderExpiration",
DueTime = expirationTime,
Metadata = new Dictionary<string, string>
{
["Step"] = "OrderExpiration"
}
},
CancellationToken.None);
}
public async Task CancelOrder()
{
var orderId = this.GetPrimaryKey();
await _orderService.CancelOrderAsync(orderId);
}
public async Task ExecuteJobAsync(IJobRunContext context, CancellationToken cancellationToken)
{
var step = context.Job.Metadata!["Step"];
var orderId = this.GetPrimaryKey();
switch (step)
{
case "DeliveryReminder":
await HandleDeliveryReminder(context, cancellationToken);
break;
case "OrderExpiration":
await HandleOrderExpiration(cancellationToken);
break;
}
}
private async Task HandleDeliveryReminder(IJobRunContext context, CancellationToken ct)
{
var customerId = context.Job.Metadata!["CustomerId"];
var orderNumber = context.Job.Metadata["OrderNumber"];
var notificationGrain = _grainFactory.GetGrain<INotificationGrain>(customerId);
await notificationGrain.ScheduleNotification(
$"Your order #{orderNumber} will be delivered tomorrow!",
DateTimeOffset.UtcNow);
}
private async Task HandleOrderExpiration(CancellationToken ct)
{
var orderId = this.GetPrimaryKey();
var order = await _orderService.GetOrderAsync(orderId, ct);
if (order?.Status == OrderStatus.Pending)
{
await _orderService.CancelOrderAsync(orderId, ct);
_logger.LogInformation("Order {OrderId} expired and canceled", orderId);
}
}
}
public class PaymentProcessorGrain : Grain, IDurableJobHandler
{
private readonly IPaymentService _paymentService;
private readonly ILogger<PaymentProcessorGrain> _logger;
public Task ExecuteJobAsync(IJobRunContext context, CancellationToken cancellationToken)
{
var paymentId = context.Job.Metadata?["PaymentId"];
_logger.LogInformation(
"Processing payment {PaymentId} (Attempt {Attempt})",
paymentId, context.DequeueCount);
try
{
await _paymentService.ProcessPaymentAsync(paymentId, cancellationToken);
return Task.CompletedTask;
}
catch (TransientException ex)
{
_logger.LogWarning(ex, "Payment processing failed with transient error, will retry");
throw; // Let the retry policy handle it
}
catch (Exception ex)
{
_logger.LogError(ex, "Payment processing failed with permanent error");
throw; // This will not be retried if the retry policy returns null
}
}
}
public class WorkflowGrain : Grain, IDurableJobHandler
{
private readonly Dictionary<string, TaskCompletionSource> _pendingJobs = new();
public async Task<IDurableJob> ScheduleWorkflowStep(string stepName, DateTimeOffset executeAt)
{
var job = await _jobManager.ScheduleJobAsync(
new ScheduleJobRequest
{
Target = this.GetGrainId(),
JobName = stepName,
DueTime = executeAt,
Metadata = null
},
CancellationToken.None);
_pendingJobs[job.Id] = new TaskCompletionSource();
return job;
}
public async Task WaitForJobCompletion(string jobId, TimeSpan timeout)
{
if (_pendingJobs.TryGetValue(jobId, out var tcs))
{
using var cts = new CancellationTokenSource(timeout);
await tcs.Task.WaitAsync(cts.Token);
}
}
public Task ExecuteJobAsync(IDurableJobContext context, CancellationToken cancellationToken)
{
// Execute the workflow step...
// Mark as complete
if (_pendingJobs.TryRemove(context.Job.Id, out var tcs))
{
tcs.SetResult();
}
return Task.CompletedTask;
}
}
MaxConcurrentJobsPerSilo setting limits concurrent job execution┌─────────────┐
│ Scheduled │ ──▶ Job is created and added to appropriate shard
└─────────────┘
│
▼
┌─────────────┐
│ Waiting │ ──▶ Job waits in queue until due time
└─────────────┘
│
▼
┌─────────────┐
│ Executing │ ──▶ Job handler is invoked on target grain
└─────────────┘
│
├──▶ Success ──▶ Job is removed
│
└──▶ Failure ──▶ Retry policy decides:
• Retry: Job is re-queued with new due time
• No Retry: Job is removed
| Property | Type | Default | Description |
|---|---|---|---|
ShardDuration | TimeSpan | 1 minute | Duration of each job shard. Smaller values reduce latency but increase overhead. |
MaxConcurrentJobsPerSilo | int | 100 | Maximum number of jobs that can execute simultaneously on a silo. |
ShouldRetry | Func<IDurableJobContext, Exception, DateTimeOffset?> | 3 retries with exp. backoff | Determines if a failed job should be retried. Return the new due time or null to not retry. |
Set Reasonable Concurrency Limits: Prevent resource exhaustion
options.MaxConcurrentJobsPerSilo = 100; // Adjust based on your workload
Implement Idempotent Job Handlers: Jobs may be retried, ensure handlers are idempotent
public async Task ExecuteJobAsync(IDurableJobContext context, CancellationToken ct)
{
var jobId = context.Job.Id;
// Check if already processed
if (await _state.IsProcessed(jobId))
return;
// Process job...
await _state.MarkProcessed(jobId);
}
Use Metadata Wisely: Keep metadata lightweight
// Good: Store IDs
var metadata = new Dictionary<string, string> { ["OrderId"] = "12345" };
// Bad: Store large objects
var metadata = new Dictionary<string, string> { ["Order"] = JsonSerializer.Serialize(largeOrder) };
Handle Cancellation: Respect the cancellation token
public async Task ExecuteJobAsync(IDurableJobContext context, CancellationToken ct)
{
await SomeLongRunningOperation(ct);
}
For more comprehensive documentation, please refer to: