docs/concepts/message_bus.md
The MessageBus enables communication between system components through message passing.
This design creates a loosely coupled architecture where components interact without
direct dependencies.
The messaging patterns include:
Messages exchanged via the MessageBus fall into three categories:
Nautilus keeps market data topics under the data root. Live data publications use the direct
data.<kind>... topics, for example data.book.deltas.XCME.ESZ24.
When requested, replayed, or workflow-generated data flows over the message bus as
topic-addressable data, the DataEngine publishes it under data.pipeline.<kind>....
Long requests, grouped requests, and aggregation chains can split, transform, and fan data back in
before the parent request completes. These messages are still data messages, but they do not claim
the same live ordering and timing semantics as normal real-time publications. For example, book
deltas on the pipeline path use
data.pipeline.book.deltas.XCME.ESZ24.
Correlated request responses are delivered through response handlers keyed by correlation ID. The
data.response topic is a capture channel for response publications, not the pipeline data path.
Once a message is created, its fields must not be mutated. This includes container fields such as
params maps. Components can read a message and derive local state from it, but they must not
rewrite the original.
Immutable messages keep every consumer seeing the same input, preserve what was true at emission time, and remove a class of shared-state races. Replay, debugging, and audit all depend on messages remaining stable after dispatch.
Three ownership rules follow from this:
When a component needs a derived message, it creates a new one with the required values instead of rewriting the original.
While the MessageBus is a lower-level component that users typically interact with indirectly,
Actor and Strategy classes provide convenient methods built on top of it:
def publish_data(self, data_type: DataType, data: Data) -> None:
def publish_signal(self, name: str, value, ts_event: int = 0) -> None:
These methods allow you to publish custom data and signals efficiently without needing to work directly with the MessageBus interface.
For advanced users or specialized use cases, direct access to the message bus is available within Actor and Strategy
classes through the self.msgbus reference, which provides the full message bus interface.
To publish a custom message directly, you can specify a topic as a str and any Python object as the message payload, for example:
self.msgbus.publish("MyTopic", "MyMessage")
NautilusTrader is an event-driven framework where components communicate by sending and receiving messages. Understanding the different messaging styles helps when building trading systems.
This guide explains the three primary messaging patterns available in NautilusTrader:
| Messaging Style | Purpose | Best For |
|---|---|---|
| MessageBus - Publish/Subscribe to topics | Low‑level, direct access to the message bus | Custom events, system‑level communication |
| Actor‑Based - Publish/Subscribe Data | Structured trading data exchange | Trading metrics, indicators, data needing persistence |
| Actor‑Based - Publish/Subscribe Signal | Lightweight notifications | Simple alerts, flags, status updates |
Each approach serves different purposes. This section helps you decide which pattern to use.
The MessageBus is the central hub for all messages in NautilusTrader. It enables a publish/subscribe pattern
where components can publish events to named topics, and other components can subscribe to receive those messages.
This decouples components, allowing them to interact indirectly via the message bus.
The message bus approach is ideal when you need:
Actor model.from nautilus_trader.core.message import Event
# Define a custom event
class Each10thBarEvent(Event):
TOPIC = "each_10th_bar" # Topic name
def __init__(self, bar):
self.bar = bar
# Subscribe in a component (in Strategy)
self.msgbus.subscribe(Each10thBarEvent.TOPIC, self.on_each_10th_bar)
# Publish an event (in Strategy)
event = Each10thBarEvent(bar)
self.msgbus.publish(Each10thBarEvent.TOPIC, event)
# Handler (in Strategy)
def on_each_10th_bar(self, event: Each10thBarEvent):
self.log.info(f"Received 10th bar: {event.bar}")
This approach provides a way to exchange trading specific data between Actors in the system.
(note: each Strategy inherits from Actor). It inherits from Data, which ensures proper timestamping
and ordering of events - crucial for correct backtest processing.
The Data publish/subscribe approach works well when you need:
ts_event, ts_init) crucial for backtest accuracy.@customdataclass decorator, integrating with NautilusTrader's data catalog system.Data or uses @customdataclass.Data vs. using @customdataclassInheriting from Data class:
ts_event and ts_init that must be implemented by the subclass. These ensure proper data ordering in backtests based on timestamps.The @customdataclass decorator:
ts_event and ts_init attributes if they are not already present.to_dict(), from_dict(), to_bytes(), to_arrow(), etc.from nautilus_trader.core.data import Data
from nautilus_trader.model.custom import customdataclass
@customdataclass
class GreeksData(Data):
delta: float
gamma: float
# Publish data (in Actor / Strategy)
data = GreeksData(delta=0.75, gamma=0.1, ts_event=1_630_000_000_000_000_000, ts_init=1_630_000_000_000_000_000)
self.publish_data(GreeksData, data)
# Subscribe to receiving data (in Actor / Strategy)
self.subscribe_data(GreeksData)
# Handler (this is static callback function with fixed name)
def on_data(self, data: Data):
if isinstance(data, GreeksData):
self.log.info(f"Delta: {data.delta}, Gamma: {data.gamma}")
Signals are a lightweight way to publish and subscribe to simple notifications within the actor framework. This is the simplest messaging approach, requiring no custom class definitions.
The Signal messaging approach works well when you need:
int, float, or str).publish_signal, subscribe_signal).int, float, and str. That means no support for complex data structures or other Python types.on_signal handler, you can only differentiate between signals using signal.value, as the signal name is not accessible in the handler.# Define signal constants for better organization (optional but recommended)
import types
from nautilus_trader.core.datetime import unix_nanos_to_dt
from nautilus_trader.common.enums import LogColor
signals = types.SimpleNamespace()
signals.NEW_HIGHEST_PRICE = "NewHighestPriceReached"
signals.NEW_LOWEST_PRICE = "NewLowestPriceReached"
# Subscribe to signals (in Actor/Strategy)
self.subscribe_signal(signals.NEW_HIGHEST_PRICE)
self.subscribe_signal(signals.NEW_LOWEST_PRICE)
# Publish a signal (in Actor/Strategy)
self.publish_signal(
name=signals.NEW_HIGHEST_PRICE,
value=signals.NEW_HIGHEST_PRICE, # value can be the same as name for simplicity
ts_event=bar.ts_event, # timestamp from triggering event
)
# Handler (this is static callback function with fixed name)
def on_signal(self, signal):
# IMPORTANT: We match against signal.value, not signal.name
match signal.value:
case signals.NEW_HIGHEST_PRICE:
self.log.info(
f"New highest price was reached. | "
f"Signal value: {signal.value} | "
f"Signal time: {unix_nanos_to_dt(signal.ts_event)}",
color=LogColor.GREEN
)
case signals.NEW_LOWEST_PRICE:
self.log.info(
f"New lowest price was reached. | "
f"Signal value: {signal.value} | "
f"Signal time: {unix_nanos_to_dt(signal.ts_event)}",
color=LogColor.RED
)
Here's a quick reference to help you decide which messaging style to use:
| Use Case | Recommended Approach | Setup required |
|---|---|---|
| Custom events or system‑level communication | MessageBus + Pub/Sub to topic | Topic + Handler management |
| Structured trading data | Actor + Pub/Sub Data + optional @customdataclass if serialization is needed | New class definition inheriting from Data (handler on_data is predefined) |
| Simple alerts/notifications | Actor + Pub/Sub Signal | Signal name only |
The MessageBus can write serialized messages to external streams. This section describes the
external egress and ingress sides of the external bus. Rust-native live nodes use injected
MessageBusExternalEgress and MessageBusExternalIngress surfaces, so the core node does not
depend on Redis, a broker, shared-memory implementation, or socket protocol.
:::info Redis is currently supported as one external backing for serializable messages. The minimum supported Redis version is 6.2, required for streams functionality. :::
When external egress is configured, outgoing publish messages are first dispatched to in-process
subscribers, then serialized into the existing BusMessage wire record:
topic: the exact message bus topic used by the internal publish call, for example
data.quotes.BINANCE.BTCUSDT or events.order.S-001.type: the canonical payload type name, for example QuoteTick or OrderEventAny.encoding: the payload encoding selected from the message bus encoding policy.payload: serialized bytes encoded with the selected encoding.External egress receives that record as publish(BusMessage). This outbound call must not block the
node's bus thread. Bounded egress implementations drop on a full queue instead of applying
back-pressure to the trading loop. Closing the message bus closes the configured egress.
Inbound external streams are exposed through the separate Rust MessageBusExternalIngress trait.
Ingress yields the same BusMessage { topic, type, encoding, payload } shape.
republish_external_message decodes supported inbound messages and republishes them internally
without forwarding the message back out. The inbound payload type must first be registered for
streaming on the receiving message bus; unregistered types are skipped without decoding.
For Redis, messages are transmitted via a Multiple-Producer Single-Consumer (MPSC) channel to a separate Rust task. That task writes the message to Redis streams.
Offloading I/O to a separate thread keeps the main thread unblocked.
With MessagePack or JSON, Rust-native external egress forwards serializable typed publications. This
includes instruments, quotes, trades, bars, book deltas, depth-10 snapshots, mark/index/funding
updates, option greeks, account state, portfolio snapshots, order events, position events, and
custom data. With the defi feature this also includes DeFi blocks, pools, liquidity updates, fee
collects, and flash events. Full order book snapshots, greeks data, option chain slices, and DeFi
pool swaps are not forwarded because those types do not implement Serde serialization.
With SBE or Cap'n Proto, Rust-native external egress forwards the built-in market data payloads with schema codecs: quotes, trades, bars, book deltas, depth-10 snapshots, mark price updates, index price updates, funding rate updates, and option greeks. Other payload types are dropped with a debug log when those schema encodings are selected.
Nautilus supports serialization for:
dict[str, Any] containing serializable primitives).str, int, float, bool, bytes).You can add serialization support for custom types by registering them through the serialization subpackage.
def register_serializable_type(
cls,
to_dict: Callable[[Any], dict[str, Any]],
from_dict: Callable[[dict[str, Any]], Any],
):
...
cls: The type to register.to_dict: The delegate to instantiate a dict of primitive types from the object.from_dict: The delegate to instantiate the object from a dict of primitive types.The message bus external backing technology uses a behavior config plus a technology-owned backing
config. MessageBusConfig controls message bus behavior. RedisMessageBusConfig owns Redis
connection settings and implements MessageBusBackingFactory.
use nautilus_common::{
enums::SerializationEncoding,
msgbus::{backing::MessageBusBackingFactory, config::MessageBusConfig},
};
use nautilus_infrastructure::redis::msgbus::RedisMessageBusConfig;
let config = MessageBusConfig {
encoding: SerializationEncoding::Json,
encoding_market_data: Some(SerializationEncoding::Sbe),
timestamps_as_iso8601: true,
buffer_interval_ms: Some(100),
autotrim_mins: Some(30),
use_trader_prefix: true,
use_trader_id: true,
use_instance_id: false,
streams_prefix: "streams".to_string(),
types_filter: Some(vec!["QuoteTick".to_string(), "TradeTick".to_string()]),
..Default::default()
};
let backing = RedisMessageBusConfig::default();
let message_bus_backing = backing.create(trader_id, instance_id, config.clone())?;
A RedisMessageBusConfig is required when using the built-in Redis backing. For a default Redis
setup on the local loopback you can pass RedisMessageBusConfig::default().
Redis selection is explicit in the Rust type. The config does not use a user-facing selector such
as type = "redis" or backing_type = "redis".
Rust-native callers that inject MessageBusExternalEgress pass concrete connection details when
they construct that egress surface. The core message bus does not require a RedisMessageBusConfig
for injected egress.
The Rust live runtime accepts external_streams in MessageBusConfig, and consumes inbound
BusMessages when callers inject a MessageBusExternalIngress with
LiveNodeBuilder::with_external_ingress. The config names the external stream keys; the injected
ingress is the concrete runtime source. Rust-native factory wiring from config to a backing remains
the caller's responsibility.
Rust-native external message bus egress supports these encoding names:
json)msgpack)capnp, with the Rust capnp feature)sbe, with the Rust sbe feature)Use the encoding config option to control the message writing encoding.
Use encoding_market_data to override the encoding for market data payloads backed by the external
bus binary codecs. Use encoding_builtin to override account state, portfolio snapshot, order
event, and position event payloads. Custom and unmapped payload types always use encoding.
MessageBusConfig::validate requires the default encoding to support custom payloads, so it must
be JSON or MessagePack. Category overrides must be supported by every published payload type in
that category. SBE and Cap'n Proto can currently be used only for encoding_market_data, and only
when the matching Rust feature is enabled. encoding_builtin = "sbe" and
encoding_builtin = "capnp" fail validation until those schema codecs cover the built-in event
category.
The legacy Python/Cython Redis serializer and the Redis cache payload path support MessagePack and JSON. SBE and Cap'n Proto are schema payload encodings for Rust-native external message bus egress, not Redis cache encodings.
:::tip
The json encoding is used by default for human readability and interoperability.
Use msgpack when payload size and serialization performance are a primary concern.
:::
By default timestamps are formatted as UNIX epoch nanosecond integers. Alternatively you can
configure ISO 8601 string formatting by setting the timestamps_as_iso8601 to true.
Message stream keys are essential for identifying individual trader nodes and organizing messages within streams. They can be tailored to meet your specific requirements and use cases. In the context of message bus streams, a trader key is typically structured as follows:
trader:{trader_id}:{instance_id}:{streams_prefix}
These options control Redis stream keys. They do not rewrite the topic passed to an injected
MessageBusExternalEgress; that topic remains the internal message bus publish topic. When
stream_per_topic is True, Redis egress appends the topic to the stream key. When it is
False, Redis stores all messages on the base stream key and keeps the topic as a message field.
The following options are available for configuring message stream keys:
If the key should begin with the trader string.
If the key should include the trader ID for the node.
Each trader node is assigned a unique 'instance ID,' which is a UUIDv4. This instance ID helps distinguish individual traders when messages
are distributed across multiple streams. You can include the instance ID in the trader key by setting the use_instance_id configuration option to True.
This is particularly useful when you need to track and identify traders across various streams in a multi-node trading system.
The streams_prefix string enables you to group all streams for a single trader instance or organize
messages for multiple instances. Configure this by passing a string to the streams_prefix configuration
option, ensuring other prefixes are set to false.
Indicates whether the producer will write a separate stream for each topic. This is particularly useful for Redis backings, which do not support wildcard topics when listening to streams. If set to False, all messages will be written to the same stream.
:::info Redis does not support wildcard stream topics. For better compatibility with Redis, it is recommended to set this option to False. :::
When messages are published on the message bus, they are serialized and written to a stream if a backing for the message bus is configured and enabled. To prevent flooding the stream with data like high-frequency quotes, you may filter out certain types of messages from external publication.
To enable this filtering mechanism, pass a list of type objects to the types_filter parameter in the message bus configuration,
specifying which types of messages should be excluded from external publication.
from nautilus_trader.config import MessageBusConfig
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.model.data import TradeTick
# Create a MessageBusConfig instance with types filtering
message_bus = MessageBusConfig(
types_filter=[QuoteTick, TradeTick]
)
The autotrim_mins configuration parameter allows you to specify the lookback window in minutes for automatic stream trimming in your message streams.
Automatic stream trimming helps manage the size of your message streams by removing older messages, ensuring that the streams remain manageable in terms of storage and performance.
:::info
The current Redis implementation will maintain the autotrim_mins as a maximum width (plus roughly a minute, as streams are trimmed no more than once per minute).
Rather than a maximum lookback window based on the current wall clock time.
:::
The message bus within a TradingNode (node) is referred to as the "internal message bus".
A producer node is one which publishes messages onto an external stream (see external egress and ingress).
The consumer node listens to external streams to receive and publish deserialized message payloads on its internal message bus.
flowchart TB
producer[Producer Node]
stream[Stream]
consumer1[Consumer Node 1]
consumer2[Consumer Node 2]
producer --> stream
stream --> consumer1
stream --> consumer2
:::tip
Set the LiveDataEngineConfig.external_clients with the list of client_ids intended to represent the external streaming clients.
The DataEngine will filter out subscription commands for these clients, ensuring that the external streaming provides the necessary data for any subscriptions to these clients.
When the Rust DataEngine skips an external-client subscription, it registers the corresponding streaming payload type for inbound republishing on the message bus.
:::
The following example details a streaming setup where a producer node publishes Binance data externally, and a downstream consumer node publishes these data messages onto its internal message bus.
We configure the MessageBus of the producer node to publish to a "binance" stream.
The settings use_trader_id, use_trader_prefix, and use_instance_id are all set to false
to ensure a simple and predictable stream key that the consumer nodes can register for.
let message_bus = MessageBusConfig {
use_trader_id: false,
use_trader_prefix: false,
use_instance_id: false,
streams_prefix: "binance".to_string(), // <---
stream_per_topic: false,
autotrim_mins: Some(30),
..Default::default()
};
let backing = RedisMessageBusConfig {
connection_timeout: 2,
response_timeout: 2,
..Default::default()
};
We configure the MessageBus of the consumer node to receive messages from the same "binance"
stream. The node listens to the external stream keys when a MessageBusExternalIngress is injected
into the LiveNodeBuilder, then publishes these messages onto its internal message bus. We declare
the client ID "BINANCE_EXT" as an external client so the DataEngine does not attempt to send
data commands to this client ID.
let data_engine = LiveDataEngineConfig {
external_clients: Some(vec![ClientId::from("BINANCE_EXT")]),
..Default::default()
};
let message_bus = MessageBusConfig {
external_streams: Some(vec!["binance".to_string()]), // <---
..Default::default()
};
let backing = RedisMessageBusConfig {
connection_timeout: 2,
response_timeout: 2,
..Default::default()
};