website/src/docs/mocha/v16/transports/rabbitmq.md
The RabbitMQ transport connects Mocha to a RabbitMQ broker for production messaging. It manages connections, provisions exchanges and queues automatically, handles message acknowledgement, and supports request/reply with dedicated reply endpoints. When you need durable, distributed messaging across multiple services, this is the transport to use.
By the end of this section, you will have a Mocha bus connected to RabbitMQ with automatic topology provisioning.
dotnet add package Mocha.Transport.RabbitMQ
The most common setup uses the Aspire RabbitMQ component for connection management:
dotnet add package Aspire.RabbitMQ.Client
using Mocha;
using Mocha.Transport.RabbitMQ;
var builder = WebApplication.CreateBuilder(args);
// Aspire registers IConnectionFactory from the "rabbitmq" connection resource
builder.AddRabbitMQClient("rabbitmq");
// Register the message bus with RabbitMQ transport
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ();
var app = builder.Build();
app.Run();
The Aspire component reads the connection string from configuration (typically ConnectionStrings:rabbitmq), handles health checks, and integrates with the Aspire dashboard for observability.
.AddRabbitMQ() picks up the IConnectionFactory from DI (registered by Aspire) and uses it to establish connections to the broker. Default conventions automatically create exchanges, queues, and bindings for your registered handlers.
If you are not using Aspire, register the IConnectionFactory directly:
using Mocha;
using Mocha.Transport.RabbitMQ;
using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
// Register IConnectionFactory manually
builder.Services.AddSingleton<IConnectionFactory>(_ =>
new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
VirtualHost = "/",
UserName = "guest",
Password = "guest"
});
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ();
var app = builder.Build();
app.Run();
To use a connection string from configuration:
builder.Services.AddSingleton<IConnectionFactory>(_ =>
{
var factory = new ConnectionFactory();
factory.Uri = new Uri(builder.Configuration.GetConnectionString("rabbitmq")!);
return factory;
});
For full control over connection lifecycle, provide a custom IRabbitMQConnectionProvider:
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConnectionProvider(sp =>
{
return sp.GetRequiredService<MyCustomConnectionProvider>();
});
});
The IRabbitMQConnectionProvider interface exposes Host, Port, VirtualHost, and a CreateAsync method. When no custom provider is registered, the transport falls back to resolving IConnectionFactory from DI and wrapping it in a default provider.
Add an endpoint that publishes through the bus and verify the handler executes:
app.MapPost("/orders", async (IMessageBus bus) =>
{
await bus.PublishAsync(new OrderPlacedEvent
{
OrderId = Guid.NewGuid(),
CustomerId = "customer-1",
TotalAmount = 99.99m
}, CancellationToken.None);
return Results.Ok();
});
Send a POST request to /orders and check your application logs. You should see the handler process the event. You can also inspect the RabbitMQ management UI at http://localhost:15672 to see the auto-provisioned exchanges and queues.
Mocha opens two connections to the broker: one for consuming and one for dispatching.
This design prevents back-pressure from slow consumers from blocking outbound message publishing. When a consumer processes messages slowly, the RabbitMQ client applies back-pressure on that connection. Without separation, a slow consumer could prevent your application from publishing new messages entirely. With separate connections, each direction operates independently.
When the transport starts, it provisions topology on the broker automatically. Here is how message types map to RabbitMQ resources:
graph LR
P[Publisher] -->|publish| E[Exchange
order-placed-event]
E -->|binding| Q[Queue
billing-service]
Q -->|consume| C[Consumer]
Events (publish/subscribe): Each event type gets a fanout exchange. Each service that subscribes creates a queue bound to that exchange. Publishing sends the message to the exchange, which fans it out to all bound queues.
Commands (send): Each command type gets a direct exchange bound to a single queue. Sending delivers the message to exactly one consumer.
Request/reply: The transport creates a temporary reply queue per service instance. The reply address is embedded in the request message so the responder knows where to send the reply.
:::warning Message loss warning. Messages published before the transport completes its Start phase may be lost if no queue is bound to the exchange yet. During deployment, ensure consuming services start before publishing services, or use publisher confirms to detect lost messages.
If a message is published to an exchange with no bound queue - for example, when no consumer has started - that message is dropped. Mocha auto-provisions topology, but the window between exchange creation and queue binding is a real operational risk. :::
Mocha's RabbitMQ transport uses publisher confirms on dispatch, which means the broker acknowledges each published message before the publish call completes. This provides at-least-once delivery guarantees for outbound messages: if the broker does not confirm, the publish fails with an exception. See the RabbitMQ Reliability Guide for a full treatment of delivery guarantees.
When you register an event handler with AddEventHandler<T>(), the RabbitMQ transport creates this topology:
A fanout exchange named after the message type fans out to per-service exchanges, which bind to per-service queues. This allows multiple services to each receive a copy of every published event.
When you register a request handler with AddRequestHandler<T>() for send (fire-and-forget), the transport creates a single queue:
Send messages go to a dedicated queue. Only one handler processes each message - this is the point-to-point guarantee.
You can set defaults that apply to all auto-provisioned queues and exchanges. This is useful when you want consistent settings across all resources without configuring each one individually.
Use ConfigureDefaults to set queue and exchange defaults:
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConfigureDefaults(defaults =>
{
// All queues will be quorum with a delivery limit of 5
defaults.Queue.QueueType = RabbitMQQueueType.Quorum;
defaults.Queue.Arguments["x-delivery-limit"] = 5;
// All exchanges will use topic routing
defaults.Exchange.Type = RabbitMQExchangeType.Topic;
});
});
For example, to enable quorum queues with a specific initial group size:
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConfigureDefaults(defaults =>
{
defaults.Queue.QueueType = RabbitMQQueueType.Quorum;
defaults.Queue.Arguments["x-quorum-initial-group-size"] = 3;
});
});
Or to use stream queues for append-only log semantics:
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
transport.ConfigureDefaults(defaults =>
{
defaults.Queue.QueueType = RabbitMQQueueType.Stream;
});
});
Available queue defaults:
| Property | Type | Description |
|---|---|---|
QueueType | string | Queue type: RabbitMQQueueType.Classic, .Quorum, or .Stream |
Durable | bool? | Whether queues survive broker restarts (default: true) |
AutoDelete | bool? | Whether queues are auto-deleted when unused (default: false) |
Arguments | Dictionary<string, object> | Additional arguments (e.g., x-delivery-limit, x-max-priority) |
Available exchange defaults:
| Property | Type | Description |
|---|---|---|
Type | string | Exchange type: RabbitMQExchangeType.Fanout, .Direct, .Topic, or .Headers |
Durable | bool? | Whether exchanges survive broker restarts (default: true) |
AutoDelete | bool? | Whether exchanges are auto-deleted when unused (default: false) |
Arguments | Dictionary<string, object> | Additional arguments (e.g., alternate-exchange) |
Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call ConfigureDefaults multiple times - each call accumulates settings on the same defaults object.
Mocha auto-provisions topology by default. To declare additional exchanges, queues, or bindings:
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
// Declare an exchange
transport.DeclareExchange("order-events")
.Type(RabbitMQExchangeType.Fanout)
.Durable()
.AutoProvision();
// Declare a queue (use quorum type for production)
transport.DeclareQueue("billing-orders")
.Durable()
.AutoProvision()
.WithArgument("x-queue-type", "quorum");
// Bind the exchange to the queue
transport.DeclareBinding("order-events", "billing-orders")
.AutoProvision();
});
All explicitly declared topology is provisioned when the transport starts, before receive endpoints begin consuming.
By default, the transport auto-provisions all topology resources (exchanges, queues, bindings) on the broker at startup. In production environments where infrastructure is managed externally - for example by Terraform, Ansible, or the RabbitMQ Messaging Topology Operator on Kubernetes - you can disable auto-provisioning so the transport expects resources to already exist.
Turn off auto-provisioning for the entire transport:
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ(transport =>
{
transport.AutoProvision(false);
});
With auto-provisioning disabled, the transport will not create any exchanges, queues, or bindings. All resources must already exist on the broker before the transport starts.
Individual resources can override the transport-level setting. This is useful when most topology is managed externally but a few resources need to be created dynamically:
builder.Services
.AddMessageBus()
.AddRabbitMQ(transport =>
{
// Disable globally
transport.AutoProvision(false);
// This exchange already exists on the broker - skip provisioning
transport.DeclareExchange("order-events");
// This queue should be created by the transport
transport.DeclareQueue("billing-orders")
.AutoProvision(true);
// This binding should also be created
transport.DeclareBinding("order-events", "billing-orders")
.AutoProvision(true);
});
The effective auto-provision value for each resource follows a cascading pattern:
| Resource setting | Transport setting | Result |
|---|---|---|
true | any | Provisioned |
false | any | Not provisioned |
| not set | true (default) | Provisioned |
| not set | false | Not provisioned |
When a resource does not specify AutoProvision, it inherits the transport-level default. When the transport does not specify AutoProvision, it defaults to true.
Fully managed infrastructure: Disable auto-provisioning globally and declare all resources without AutoProvision. The transport will use existing broker resources without attempting to create them.
transport.AutoProvision(false);
transport.DeclareExchange("order-events");
transport.DeclareQueue("billing-orders");
transport.DeclareBinding("order-events", "billing-orders");
Selective provisioning: Disable globally but enable for specific resources that are owned by this service.
transport.AutoProvision(false);
transport.DeclareExchange("shared-events"); // managed externally
transport.DeclareQueue("my-service-queue")
.AutoProvision(true); // owned by this service
transport.DeclareBinding("shared-events", "my-service-queue")
.AutoProvision(true); // owned by this service
Kubernetes with the Messaging Topology Operator: When the RabbitMQ Messaging Topology Operator manages your exchanges, queues, and bindings as Kubernetes custom resources, disable auto-provisioning entirely. The operator declares topology through CRDs, and the transport simply uses the existing resources:
# Kubernetes CRD - managed by the Messaging Topology Operator
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: billing-orders
spec:
name: billing-orders
durable: true
rabbitmqClusterReference:
name: my-cluster
// Application code - topology already exists on the broker
transport.AutoProvision(false);
transport.DeclareExchange("order-events");
transport.DeclareQueue("billing-orders");
transport.DeclareBinding("order-events", "billing-orders");
Opt-out individual resources: Keep auto-provisioning enabled but skip specific resources that are managed elsewhere.
transport.DeclareExchange("platform-events")
.AutoProvision(false); // managed by platform team
transport.DeclareQueue("my-queue"); // auto-provisioned (default)
transport.DeclareBinding("platform-events", "my-queue"); // auto-provisioned (default)
Use transport.Handler<T>() to claim a handler and configure prefetch and concurrency on its convention-named endpoint:
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ(transport =>
{
transport.Handler<OrderPlacedEventHandler>()
.ConfigureEndpoint(e => e.MaxPrefetch(50).MaxConcurrency(10));
});
This keeps the convention-derived endpoint name while tuning the consumer settings. ConfigureEndpoint() can be called multiple times - actions compose in declaration order:
transport.Handler<OrderPlacedEventHandler>()
.ConfigureEndpoint(e => e.MaxPrefetch(50))
.ConfigureEndpoint(e => e.MaxConcurrency(10))
.ConfigureEndpoint(e => e.FaultEndpoint("order-errors"));
For full control over the endpoint name and queue, use explicit binding with Endpoint("name"):
builder.Services
.AddMessageBus()
.AddEventHandler<OrderPlacedEventHandler>()
.AddRabbitMQ(transport =>
{
transport.BindHandlersExplicitly();
transport.Endpoint("order-processing")
.Queue("orders.processing")
.MaxPrefetch(50)
.MaxConcurrency(10)
.Handler<OrderPlacedEventHandler>();
});
MaxPrefetch controls how many unacknowledged messages RabbitMQ delivers to the consumer at once. Default: 100. Lower values reduce memory pressure under high load. Higher values improve throughput for fast handlers.
MaxConcurrency controls how many messages the endpoint processes in parallel. Set this based on your handler's throughput characteristics.
A good starting point: set MaxPrefetch equal to or slightly higher than MaxConcurrency. For slow handlers (long database operations, external API calls), lower MaxPrefetch to 10–20 to prevent messages from piling up in the consumer's unacknowledged buffer. For quorum queues specifically, avoid setting MaxPrefetch to 1 - a prefetch of 1 starves consumers while acknowledgements flow through the consensus mechanism and significantly reduces throughput.
For prefetch tuning guidance from first principles, see CloudAMQP Best Practices.
| Resource | Naming convention | Created when |
|---|---|---|
| Exchange (event) | Message type name (e.g., OrderPlacedEvent) | First publish or subscribe |
| Exchange (command) | Message type name (e.g., ReserveInventoryCommand) | First send or handler registration |
| Queue | Endpoint name derived from handler registration | Handler is bound to the transport |
| Reply queue | Instance-specific name | Transport starts |
| Bindings | Exchange-to-queue | Endpoint discovery phase |
All auto-provisioned resources are durable by default and survive broker restarts.
RabbitMQ uses a routing_key field on every published message to decide which queues receive it. When you publish to a topic exchange, the broker compares the message's routing key against binding patterns on each queue. Queues whose pattern matches get the message. Queues that don't match never see it.
Direct exchanges work the same way, but require an exact match instead of a pattern.
Fanout exchanges ignore routing keys entirely - every bound queue gets every message.
Routing keys are useful when you need to split a single message stream across different consumers based on a property of the message itself:
tenant-a.orders, tenant-b.orders)us.east, eu.west)priority.high, priority.low)For a full treatment of topic exchange routing, see the RabbitMQ Topics Tutorial.
To set a routing key on published messages, call UseRabbitMQRoutingKey<T>() when registering the message type:
builder.Services
.AddMessageBus()
.AddMessage<OrderEvent>(m => m
.UseRabbitMQRoutingKey<OrderEvent>(msg => msg.Region))
.AddRabbitMQ();
The extractor function runs at dispatch time for each message. It receives the message instance and returns the routing key string. Return null to publish without a routing key.
UseRabbitMQRoutingKey<T>() is configured on AddMessage<T>(), not on the transport or endpoint. This keeps routing key logic next to the message definition where it belongs.
Combine multiple properties into a single routing key using string interpolation:
builder.Services
.AddMessageBus()
.AddMessage<OrderEvent>(m => m
.UseRabbitMQRoutingKey<OrderEvent>(msg => $"{msg.TenantId}.{msg.Region}"))
.AddRabbitMQ();
This produces routing keys like acme.us.east or contoso.eu.west, which you can match with topic exchange binding patterns like acme.# or *.eu.*.
This example routes region-tagged events to different queues based on their routing key. The US queue receives messages matching us.*, and the EU queue receives messages matching eu.*.
graph LR
P[Publisher] -->|"region = us.east"| E[Topic Exchange
region-events]
E -->|"us.* ✓"| QA[Queue
us-orders]
E -->|"eu.* ✗"| QB[Queue
eu-orders]
QA --> CA[US Consumer]
style QB stroke-dasharray: 5 5
public sealed class RegionEvent
{
public required string Region { get; init; }
public required string Payload { get; init; }
}
builder.Services
.AddMessageBus()
.AddConsumer<UsRegionConsumer>()
.AddConsumer<EuRegionConsumer>()
.AddMessage<RegionEvent>(m => m
.UseRabbitMQRoutingKey<RegionEvent>(msg => msg.Region))
.AddRabbitMQ(transport =>
{
transport.BindHandlersExplicitly();
// Declare topology: one topic exchange, two queues, two bindings with patterns
transport.DeclareExchange("region-events")
.Type(RabbitMQExchangeType.Topic);
transport.DeclareQueue("us-orders");
transport.DeclareQueue("eu-orders");
transport.DeclareBinding("region-events", "us-orders")
.RoutingKey("us.*");
transport.DeclareBinding("region-events", "eu-orders")
.RoutingKey("eu.*");
// Bind consumers to queues
transport.Endpoint("us-ep")
.Consumer<UsRegionConsumer>()
.Queue("us-orders");
transport.Endpoint("eu-ep")
.Consumer<EuRegionConsumer>()
.Queue("eu-orders");
// Dispatch to the topic exchange
transport.DispatchEndpoint("region-dispatch")
.ToExchange("region-events")
.Publish<RegionEvent>();
});
When you publish a RegionEvent with Region = "us.east", the routing key middleware extracts "us.east" from the message and sets it on the AMQP publish. The topic exchange matches "us.east" against us.* (match) and eu.* (no match). Only the US queue receives the message.
| Pattern | Matches | Does not match |
|---|---|---|
us.* | us.east, us.west | us.east.az1, eu.west |
eu.# | eu.west, eu.west.az1 | us.east |
# | Everything | - |
*.*.az1 | us.east.az1, eu.west.az1 | us.east |
* matches exactly one word. # matches zero or more words. Words are separated by dots.
Direct exchanges use exact-match routing keys instead of patterns. A message with routing key "priority-high" reaches only queues bound with exactly "priority-high".
builder.Services
.AddMessageBus()
.AddConsumer<HighPriorityConsumer>()
.AddMessage<TaskEvent>(m => m
.UseRabbitMQRoutingKey<TaskEvent>(msg => $"priority-{msg.Priority}"))
.AddRabbitMQ(transport =>
{
transport.BindHandlersExplicitly();
transport.DeclareExchange("task-routing")
.Type(RabbitMQExchangeType.Direct);
transport.DeclareQueue("high-priority-tasks");
transport.DeclareBinding("task-routing", "high-priority-tasks")
.RoutingKey("priority-high");
transport.Endpoint("high-priority-ep")
.Consumer<HighPriorityConsumer>()
.Queue("high-priority-tasks");
transport.DispatchEndpoint("task-dispatch")
.ToExchange("task-routing")
.Publish<TaskEvent>();
});
Messages with Priority = "high" reach the queue. Messages with any other priority are dropped by the exchange (unless another queue is bound with a matching routing key).
Runnable example: RabbitMQ
Full demo: All three Demo services use RabbitMQ in production mode with .NET Aspire. See Demo.AppHost for the Aspire orchestration and Demo.Catalog for a complete service using
.AddRabbitMQ()with outbox, inbox, sagas, and multiple handler types.