docs/concepts/data.md
Common built-in data types include:
OrderBookDelta (L1/L2/L3): Represents the most granular order book updates.OrderBookDeltas (L1/L2/L3): Batches multiple order book deltas for more efficient processing.OrderBookDepth10: Aggregated order book snapshot (up to 10 levels per bid and ask side).QuoteTick: Represents the best bid and ask prices along with their sizes at the top-of-book.TradeTick: A single trade/match event between counterparties.Bar: OHLCV (Open, High, Low, Close, Volume) bar/candle, aggregated using a specified aggregation method.MarkPriceUpdate: The current mark price for an instrument (typically used in derivatives trading).IndexPriceUpdate: The index price for an instrument (underlying price used for mark price calculations).FundingRateUpdate: The funding rate for perpetual contracts (periodic payments between long and short positions).InstrumentStatus: An instrument-level status event.InstrumentClose: The closing price of an instrument.See the API reference for the full set of built-in data classes and wrappers.
NautilusTrader operates primarily on granular order book data for the highest realism in execution simulations. Backtests can also run on any supported market data type, depending on the desired simulation fidelity.
A high-performance order book implemented in Rust is available to maintain order book state based on provided data.
OrderBook instances are maintained per instrument for both backtesting and live trading, with the following book types available:
L3_MBO: Market by order (MBO) or L3 data, uses every order book event at every price level, keyed by order ID.L2_MBP: Market by price (MBP) or L2 data, aggregates order book events by price level.L1_MBP: Market by price (MBP) or L1 data, also known as best bid and offer (BBO), captures only top-level updates.:::note
Top-of-book data, such as QuoteTick, TradeTick and Bar, can also be used for backtesting, with markets operating on L1_MBP book types.
:::
Each OrderBookDelta carries a flags field using RecordFlag bitmask values
to signal event boundaries to the DataEngine:
F_LAST: Marks the final delta in a logical event group. When buffer_deltas
is enabled, the DataEngine accumulates deltas and only publishes to
subscribers when it encounters F_LAST. Every event group must end with
a delta that has F_LAST set.F_SNAPSHOT: Marks deltas that belong to a snapshot (as opposed to an
incremental update). Snapshot sequences begin with a Clear action followed
by Add deltas reconstructing the full book state. The last delta in a
snapshot has both F_SNAPSHOT | F_LAST set.:::warning
A missing F_LAST on the final delta in an event group causes buffered consumers
to accumulate deltas indefinitely without publishing. This applies to incremental
updates and snapshots alike, including empty book snapshots where only a Clear
delta is emitted.
:::
NautilusTrader supports a variety of instrument types across spot, derivatives, and specialty markets:
flowchart TD
I[Instrument Types]
I --> Spot
I --> Derivatives
I --> Other
Spot --> Equity
Spot --> CurrencyPair
Spot --> Commodity
Spot --> IndexInstrument
Derivatives --> Futures
Derivatives --> Options
Derivatives --> Cfd
Futures --> FuturesContract
Futures --> FuturesSpread
Futures --> CryptoFuture
Futures --> CryptoPerpetual
Futures --> PerpetualContract
Options --> OptionContract
Options --> OptionSpread
Options --> CryptoOption
Options --> BinaryOption
Other --> BettingInstrument
Other --> SyntheticInstrument
| Instrument | Description |
|---|---|
Equity | Generic equity instrument. |
CurrencyPair | Currency pair in a spot/cash market. |
Commodity | Commodity in a spot/cash market. |
IndexInstrument | Spot index (reference price, not directly tradable). |
FuturesContract | Generic deliverable futures contract. |
FuturesSpread | Deliverable futures spread. |
CryptoFuture | Deliverable futures with crypto assets as underlying and settlement. |
CryptoPerpetual | Crypto perpetual futures (perpetual swap). |
PerpetualContract | Asset‑class agnostic perpetual swap (any underlying). |
OptionContract | Generic option contract. |
OptionSpread | Generic option spread. |
CryptoOption | Crypto option contract. |
BinaryOption | Binary option instrument. |
Cfd | Contract for Difference (CFD). |
BettingInstrument | Instrument in a betting market. |
SyntheticInstrument | Synthetic instrument with prices derived from component instruments via formula. |
A bar (also known as a candle, candlestick or kline) is a data structure that represents price and volume information over a specific period, including:
The system generates bars using an aggregation method that groups data by specific criteria.
Data aggregation in NautilusTrader transforms granular market data into structured bars or candles for several reasons:
The platform implements various aggregation methods:
| Name | Description | Category |
|---|---|---|
TICK | Aggregation of a number of ticks. | Threshold |
TICK_IMBALANCE | Aggregation of the buy/sell imbalance of ticks. | Threshold |
TICK_RUNS | Aggregation of sequential buy/sell runs of ticks. | Information |
VOLUME | Aggregation of traded volume. | Threshold |
VOLUME_IMBALANCE | Aggregation of the buy/sell imbalance of traded volume. | Threshold |
VOLUME_RUNS | Aggregation of sequential runs of buy/sell traded volume. | Information |
VALUE | Aggregation of the notional value of trades (also known as "Dollar bars"). | Threshold |
VALUE_IMBALANCE | Aggregation of the buy/sell imbalance of trading by notional value. | Threshold |
VALUE_RUNS | Aggregation of sequential buy/sell runs of trading by notional value. | Information |
RENKO | Aggregation based on fixed price movements (brick size in ticks). | Threshold |
MILLISECOND | Aggregation of time intervals with millisecond granularity. | Time |
SECOND | Aggregation of time intervals with second granularity. | Time |
MINUTE | Aggregation of time intervals with minute granularity. | Time |
HOUR | Aggregation of time intervals with hour granularity. | Time |
DAY | Aggregation of time intervals with day granularity. | Time |
WEEK | Aggregation of time intervals with week granularity. | Time |
MONTH | Aggregation of time intervals with month granularity. | Time |
YEAR | Aggregation of time intervals with year granularity. | Time |
Information-driven bars adapt their sampling frequency to market activity rather than using fixed intervals. They are based on the concept of aggressor side (whether the trade initiator was a buyer or seller) and come in two families: imbalance and runs.
Imbalance bars close when the net buy/sell activity reaches a threshold. Each trade contributes a signed value: positive for buyer-initiated trades and negative for seller-initiated. The bar closes when the absolute imbalance reaches the configured step. This means that opposing trades cancel each other out, so imbalance bars tend to form more slowly in balanced markets and faster during directional moves.
Runs bars close when consecutive activity from the same aggressor side reaches a threshold. Unlike imbalance bars, runs bars reset their counter when the aggressor side changes. This makes them sensitive to sustained one-sided pressure rather than net imbalance.
Both families have three variants based on what is measured:
| Variant | Imbalance | Runs | What is measured |
|---|---|---|---|
| Tick | TICK_IMBALANCE | TICK_RUNS | Number of trades (each trade counts as 1) |
| Volume | VOLUME_IMBALANCE | VOLUME_RUNS | Traded volume (quantity) |
| Value | VALUE_IMBALANCE | VALUE_RUNS | Notional value (price x quantity) |
:::note
Information-driven bars require TradeTick data because they need the aggressor_side field
to classify each trade. They cannot be aggregated from QuoteTick data alone.
:::
NautilusTrader implements three distinct data aggregation methods:
Trade-to-bar aggregation: Creates bars from TradeTick objects (executed trades)
LAST price type in the bar specification.Quote-to-bar aggregation: Creates bars from QuoteTick objects (bid/ask prices)
BID, ASK, or MID price types in the bar specification.Bar-to-bar aggregation: Creates larger-timeframe Bar objects from smaller-timeframe Bar objects
@ symbol in the specification.NautilusTrader defines a unique bar type (BarType class) based on the following components:
InstrumentId): Specifies the particular instrument for the bar.BarSpecification):
step: Defines the interval or frequency of each bar.aggregation: Specifies the method used for data aggregation (see the above table).price_type: Indicates the price basis of the bar (e.g., bid, ask, mid, last).AggregationSource): Indicates whether the bar was aggregated internally (within Nautilus).Bar types can also be classified as either standard or composite:
Bar data aggregation can be either internal or external:
INTERNAL: The bar is aggregated inside the local Nautilus system boundary.EXTERNAL: The bar is aggregated outside the local Nautilus system boundary (typically by a trading venue or data provider).For bar-to-bar aggregation, the target bar type is always INTERNAL (since you're doing the aggregation within NautilusTrader),
but the source bars can be either INTERNAL or EXTERNAL, i.e., you can aggregate externally provided bars or already
aggregated internal bars.
You can define standard bar types from strings using the following convention:
{instrument_id}-{step}-{aggregation}-{price_type}-{INTERNAL | EXTERNAL}
For example, to define a BarType for AAPL trades (last price) on Nasdaq (XNAS) using a 5-minute interval
aggregated from trades locally by Nautilus:
bar_type = BarType.from_str("AAPL.XNAS-5-MINUTE-LAST-INTERNAL")
Composite bars are derived by aggregating higher-granularity bars into the desired bar type. To define a composite bar, use this convention:
{instrument_id}-{step}-{aggregation}-{price_type}-INTERNAL@{step}-{aggregation}-{INTERNAL | EXTERNAL}
Notes:
INTERNAL aggregation source (since this is how the bar is aggregated).INTERNAL or EXTERNAL aggregation sources.For example, to define a BarType for AAPL trades (last price) on Nasdaq (XNAS) using a 5-minute interval
aggregated locally by Nautilus, from 1-minute interval bars aggregated externally:
bar_type = BarType.from_str("AAPL.XNAS-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL")
The BarType string format encodes both the target bar type and, optionally, the source data type:
{instrument_id}-{step}-{aggregation}-{price_type}-{source}@{step}-{aggregation}-{source}
The part after the @ symbol is optional and only used for bar-to-bar aggregation:
@: Aggregates from TradeTick objects (when price_type is LAST) or QuoteTick objects (when price_type is BID, ASK, or MID).@: Aggregates from existing Bar objects (specifying the source bar type).def on_start(self) -> None:
# Define a bar type for aggregating from TradeTick objects
# Uses price_type=LAST which indicates TradeTick data as source
bar_type = BarType.from_str("6EH4.XCME-50-VOLUME-LAST-INTERNAL")
start = self.clock.utc_now() - timedelta(days=30)
# Request historical data (will receive bars in on_historical_data handler)
self.request_bars(bar_type, start=start)
# Subscribe to live data (will receive bars in on_bar handler)
self.subscribe_bars(bar_type)
def on_start(self) -> None:
# Create 1-minute bars from ASK prices (in QuoteTick objects)
bar_type_ask = BarType.from_str("6EH4.XCME-1-MINUTE-ASK-INTERNAL")
# Create 1-minute bars from BID prices (in QuoteTick objects)
bar_type_bid = BarType.from_str("6EH4.XCME-1-MINUTE-BID-INTERNAL")
# Create 1-minute bars from MID prices (middle between ASK and BID prices in QuoteTick objects)
bar_type_mid = BarType.from_str("6EH4.XCME-1-MINUTE-MID-INTERNAL")
start = self.clock.utc_now() - timedelta(days=30)
# Request historical data and subscribe to live data
self.request_bars(bar_type_ask, start=start) # Historical bars processed in on_historical_data
self.subscribe_bars(bar_type_ask) # Live bars processed in on_bar
def on_start(self) -> None:
# Create 5-minute bars from 1-minute bars (Bar objects)
# Format: target_bar_type@source_bar_type
# Note: price type (LAST) is only needed on the left target side, not on the source side
bar_type = BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL")
start = self.clock.utc_now() - timedelta(days=30)
# Request historical data by providing the dependency-ordered aggregation chain
self.request_aggregated_bars([bar_type], start=start)
# Subscribe to live updates (processed in on_bar(...) handler)
self.subscribe_bars(bar_type)
You can create complex aggregation chains where you aggregate from already aggregated bars:
# First create 1-minute bars from TradeTick objects (LAST indicates TradeTick source)
primary_bar_type = BarType.from_str("6EH4.XCME-1-MINUTE-LAST-INTERNAL")
# Then create 5-minute bars from 1-minute bars
# Note the @1-MINUTE-INTERNAL part identifying the source bars
intermediate_bar_type = BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-INTERNAL")
# Then create hourly bars from 5-minute bars
# Note the @5-MINUTE-INTERNAL part identifying the source bars
hourly_bar_type = BarType.from_str("6EH4.XCME-1-HOUR-LAST-INTERNAL@5-MINUTE-INTERNAL")
NautilusTrader provides two distinct operations for working with bars:
request_bars(): Fetches historical data for a standard BarType, processed by the
on_historical_data() handler.request_aggregated_bars(): Fetches historical data for a dependency-ordered list of bar
types, building internal bars on the fly.subscribe_bars(): Establishes a real-time data feed processed by the on_bar() handler.These methods work together in a typical workflow:
request_bars() loads historical data to initialize indicators or state of strategy with past market behavior.subscribe_bars() ensures the strategy continues receiving new bars as they form in real-time.Example usage in on_start():
def on_start(self) -> None:
# Define bar type
bar_type = BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL")
start = self.clock.utc_now() - timedelta(days=30)
# Register indicators before requesting history so they receive historical updates too
self.register_indicator_for_bars(bar_type, self.my_indicator)
# Request historical data to initialize indicators
# These bars will be delivered to the on_historical_data(...) handler in strategy
self.request_bars(bar_type, start=start)
# Subscribe to real-time updates
# New bars will be delivered to the on_bar(...) handler in strategy
self.subscribe_bars(bar_type)
Required handlers in your strategy to receive the data:
def on_historical_data(self, data):
# Processes historical Data objects from request_bars() or request_aggregated_bars()
# Note: indicators registered with register_indicator_for_bars
# are updated automatically with historical data
pass
def on_bar(self, bar):
# Processes individual bars in real-time from subscribe_bars()
# Indicators registered with this bar type will update automatically and they will be updated before this handler is called
pass
When requesting historical bars for backtesting or initializing indicators, use
request_bars() for standard bar types and request_aggregated_bars() for
on-the-fly aggregation:
start = self.clock.utc_now() - timedelta(days=30)
# Request raw 1-minute bars (aggregated from TradeTick objects as indicated by LAST price type)
self.request_bars(
BarType.from_str("6EH4.XCME-1-MINUTE-LAST-EXTERNAL"),
start=start,
)
# Request bars that are aggregated from historical trade ticks
self.request_aggregated_bars(
[BarType.from_str("6EH4.XCME-100-VOLUME-LAST-INTERNAL")],
start=start,
)
# Request 5-minute bars aggregated from 1-minute bars
self.request_aggregated_bars(
[BarType.from_str("6EH4.XCME-5-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL")],
start=start,
)
Register indicators before requesting data: Ensure indicators are registered before requesting historical data so they get updated properly.
start = self.clock.utc_now() - timedelta(days=30)
# Correct order
self.register_indicator_for_bars(bar_type, self.ema)
self.request_bars(bar_type, start=start)
# Incorrect order
self.request_bars(bar_type, start=start) # Indicator won't receive historical data
self.register_indicator_for_bars(bar_type, self.ema)
Bar aggregators track OHLC prices via the fixed-point Price type. Threshold comparisons for
tick and volume aggregators use integer arithmetic, while value-based and imbalance/runs aggregators
currently use f64 for notional value and signed accumulation (these are being migrated to
fixed-point integer arithmetic). The choice of aggregation method has a modest impact on per-update
overhead:
Time bar behavior is controlled through DataEngineConfig. The following options
apply to all time-based aggregation (millisecond through year):
| Option | Type | Default | Description |
|---|---|---|---|
time_bars_interval_type | str | "left-open" | "left-open": start excluded, end included. "right-open": start included, end excluded. |
time_bars_timestamp_on_close | bool | True | When True, ts_event is the bar close time. When False, ts_event is the bar open time. |
time_bars_skip_first_non_full_bar | bool | False | Skip emitting a bar when aggregation starts mid‑interval, avoiding partial bars on startup. |
time_bars_build_with_no_updates | bool | True | When True, bars are emitted even if no market updates arrived during the interval. |
time_bars_origin_offset | dict | None | Maps BarAggregation types to pd.Timedelta or pd.DateOffset values for shifting bar alignment (e.g., align to 09:30 market open). |
time_bars_build_delay | int | 0 | Delay in microseconds before building a bar. Useful in backtests to ensure data at bar boundary timestamps is processed before the timer fires. |
from nautilus_trader.data.config import DataEngineConfig
config = DataEngineConfig(
time_bars_timestamp_on_close=True,
time_bars_build_with_no_updates=False,
time_bars_skip_first_non_full_bar=True,
)
The platform uses two fundamental timestamp fields that appear across many objects, including market data, orders, and events. These timestamps serve distinct purposes and help maintain precise timing information throughout the system:
ts_event: UNIX timestamp (nanoseconds) representing when an event actually occurred.ts_init: UNIX timestamp (nanoseconds) representing when Nautilus created the internal object representing that event.| Event Type | ts_event | ts_init |
|---|---|---|
TradeTick | Time when trade occurred at the exchange. | Time when Nautilus received the trade data. |
QuoteTick | Time when quote occurred at the exchange. | Time when Nautilus received the quote data. |
OrderBookDelta | Time when order book update occurred at the exchange. | Time when Nautilus received the order book update. |
Bar | Time of the bar's closing (exact minute/hour). | Time when Nautilus generated (for internal bars) or received the bar data (for external bars). |
OrderFilled | Time when order was filled at the exchange. | Time when Nautilus received and processed the fill confirmation. |
OrderCanceled | Time when cancellation was processed at the exchange. | Time when Nautilus received and processed the cancellation confirmation. |
NewsEvent | Time when the news was published. | Time when the event object was created (if internal event) or received (if external event) in Nautilus. |
| Custom event | Time when event conditions actually occurred. | Time when the event object was created (if internal event) or received (if external event) in Nautilus. |
:::note
The ts_init field represents a more general concept than "time of reception" for events.
It denotes the timestamp when an object, such as a data point or command, was initialized within Nautilus.
This distinction is important because ts_init is not exclusive to "received events". It applies to any internal
initialization process.
For example, the ts_init field is also used for commands, where the concept of reception does not apply.
This broader definition ensures consistent handling of initialization timestamps across various object types in the system.
:::
The dual timestamp system enables latency analysis within the platform:
ts_init - ts_event.ts_init using a stable sort.ts_init is typically when Nautilus creates the local object after receiving the update.ts_event reflects the time the event occurred externally, enabling accurate comparisons between external event timing and system reception.ts_init and ts_event to detect network or processing delays.ts_init is usually the local receipt or normalization time,
but clock skew means it is not guaranteed to be greater than or equal to ts_event.ts_init and ts_event can be the same because the object is initialized at the same time the event happens.ts_init field necessarily has a ts_event field. This reflects cases where:
The ts_init field preserves the original initialization timestamp. For venue data this is
typically receipt time; for internally created data it is the creation time of that object.
From the DataEngine onward, data follows the same pathway regardless of
environment context (backtest, sandbox,
live). In live and sandbox modes a venue adapter creates a normalized data
object and sends it through a channel; in backtests the engine feeds data
directly. Either way the DataEngine stores it in the Cache (for cached
types) and publishes it on the MessageBus to subscribed handlers.
For a step-by-step trace with a sequence diagram, see
Data flow: life of a quote tick.
For users who need more flexibility, the platform also supports the creation of custom data types. For details on how to implement user-defined data types, see the Custom Data section below.
NautilusTrader supports data loading and conversion for three main use cases:
BacktestEngine to run backtests.ParquetDataCatalog.write_data(...) to be later used with a BacktestNode.Regardless of the destination, the process remains the same: converting diverse external data formats into Nautilus data structures.
To achieve this, two main components are necessary:
pd.DataFrame with the correct schema for the desired Nautilus object.pd.DataFrame and returns a list[Data] of Nautilus objects.Data loader components are typically specific for the raw source/format and per integration. For instance, Binance order book data is stored in its raw CSV file form with an entirely different format to Databento Binary Encoding (DBN) files.
Data wranglers are implemented per specific Nautilus data type, and can be found in the nautilus_trader.persistence.wranglers module.
Common v1 wranglers include:
OrderBookDeltaDataWranglerQuoteTickDataWranglerTradeTickDataWranglerBarDataWranglerFor Arrow v2 / PyO3 workflows, the v2 module also provides OrderBookDepth10DataWranglerV2.
:::warning
There are a number of DataWrangler v2 components, which will take a pd.DataFrame typically
with a different fixed width Nautilus Arrow v2 schema, and output PyO3 Nautilus objects which are only compatible with the new version
of the Nautilus core, currently in development.
These PyO3 data objects are not compatible where v1 legacy Cython objects are expected (e.g., adding directly to a BacktestEngine).
:::
NautilusTrader uses fixed-point arithmetic for Price and Quantity types for precise financial calculations without floating-point errors. Understanding how raw values work is essential when creating data or working with catalogs.
When constructing Price or Quantity using from_raw(), the raw value must be a valid multiple of the scale factor for the given precision. Valid raw values should come from:
.raw field of an existing value (e.g., price.raw).:::warning
Raw values that are not valid multiples will cause a panic. The raw value must be divisible by 10^(FIXED_PRECISION - precision) where FIXED_PRECISION is 9 (standard mode) or 16 (high-precision mode).
:::
Catalog data can contain raw values with floating-point precision errors.
This happens when raw values are produced with int(value * FIXED_SCALAR)
instead of precision-aware conversion:
int(value * FIXED_SCALAR) # Introduces floating-point errors
round(value * 10**precision) * scale # Correct precision-aware conversion
For example, int(0.67068 * 1e9) produces 670680000000001 instead of
the expected 670680000000000.
The Arrow decode path automatically corrects these values by rounding to the nearest valid multiple, so affected catalogs work without data migration.
:::note This correction adds a small amount of overhead during data decoding. :::
Process flow:
pd.DataFrame.pd.DataFrame to generate a list of Nautilus objects.list[Data] is the output of the data loading process.The following diagram illustrates how raw data is transformed into Nautilus data structures:
flowchart LR
raw["Raw data (CSV)"]
loader[DataLoader]
wrangler[DataWrangler]
output["Nautilus list[Data]"]
raw --> loader
loader -->|"pd.DataFrame"| wrangler
wrangler --> output
Concretely, this would involve:
BinanceOrderBookDeltaDataLoader.load(...) which reads CSV files provided by Binance from disk, and returns a pd.DataFrame.OrderBookDeltaDataWrangler.process(...) which takes the pd.DataFrame and returns list[OrderBookDelta].The following example shows how to accomplish the above in Python:
from nautilus_trader import TEST_DATA_DIR
from nautilus_trader.adapters.binance.loaders import BinanceOrderBookDeltaDataLoader
from nautilus_trader.persistence.wranglers import OrderBookDeltaDataWrangler
from nautilus_trader.test_kit.providers import TestInstrumentProvider
# Load raw data
data_path = TEST_DATA_DIR / "binance" / "btcusdt-depth-snap.csv"
df = BinanceOrderBookDeltaDataLoader.load(data_path)
# Set up a wrangler
instrument = TestInstrumentProvider.btcusdt_binance()
wrangler = OrderBookDeltaDataWrangler(instrument)
# Process to a list `OrderBookDelta` Nautilus objects
deltas = wrangler.process(df)
The data catalog is a central store for Nautilus data, persisted in the Parquet file format. It is the primary data management system for both backtesting and live trading scenarios, providing efficient storage, retrieval, and streaming capabilities for market data.
The NautilusTrader data catalog is built on a dual-backend architecture that combines the performance of Rust with the flexibility of Python:
Core components:
OrderBookDelta,
OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick, Bar,
MarkPriceUpdate) and registered same-binary Rust custom data.Key benefits:
Storage format advantages:
The Arrow schemas used for the Parquet format are defined in two places: the Rust model and persistence crates for core market data types, and the Python serialization/arrow/schema.py module for additional types.
The data catalog can be initialized from a NAUTILUS_PATH environment variable, or by explicitly passing in a path like object.
:::note[NAUTILUS_PATH environment variable]
The NAUTILUS_PATH environment variable should point to the root directory containing your Nautilus data. The catalog will automatically append /catalog to this path.
For example:
NAUTILUS_PATH=/home/user/trading_data./home/user/trading_data/catalog.This is a common pattern when using ParquetDataCatalog.from_env() - make sure your NAUTILUS_PATH points to the parent directory, not the catalog directory itself.
:::
The following example shows how to initialize a data catalog where there is pre-existing data already written to disk at the given path.
from pathlib import Path
from nautilus_trader.persistence.catalog import ParquetDataCatalog
CATALOG_PATH = Path.cwd() / "catalog"
# Create a new catalog instance
catalog = ParquetDataCatalog(CATALOG_PATH)
# Alternative: Environment-based initialization
catalog = ParquetDataCatalog.from_env() # Uses NAUTILUS_PATH environment variable
The catalog supports multiple filesystem protocols through fsspec integration, working across local and cloud storage systems.
Local filesystem (file):
catalog = ParquetDataCatalog(
path="/path/to/catalog",
fs_protocol="file", # Default protocol
)
Amazon S3 (s3):
catalog = ParquetDataCatalog(
path="s3://my-bucket/nautilus-data/",
fs_protocol="s3",
fs_storage_options={
"key": "your-access-key-id",
"secret": "your-secret-access-key",
"endpoint_url": "https://s3.amazonaws.com", # Optional custom endpoint
}
)
Google Cloud Storage (gcs):
catalog = ParquetDataCatalog(
path="gcs://my-bucket/nautilus-data/",
fs_protocol="gcs",
fs_storage_options={
"project": "my-project-id",
"token": "/path/to/service-account.json", # Or "cloud" for default credentials
}
)
Azure Blob Storage :
abfs protocol
catalog = ParquetDataCatalog(
path="abfs://[email protected]/nautilus-data/",
fs_protocol="abfs",
fs_storage_options={
"account_name": "your-storage-account",
"account_key": "your-account-key",
# Or use SAS token: "sas_token": "your-sas-token"
}
)
az protocol
catalog = ParquetDataCatalog(
path="az://container/nautilus-data/",
fs_protocol="az",
fs_storage_options={
"account_name": "your-storage-account",
"account_key": "your-account-key",
# Or use SAS token: "sas_token": "your-sas-token"
}
)
For convenience, you can use URI strings that automatically parse protocol and storage options:
# Local filesystem
catalog = ParquetDataCatalog.from_uri("/path/to/catalog")
# S3 bucket
catalog = ParquetDataCatalog.from_uri("s3://my-bucket/nautilus-data/")
# With storage options
catalog = ParquetDataCatalog.from_uri(
"s3://my-bucket/nautilus-data/",
fs_storage_options={
"access_key_id": "your-key",
"secret_access_key": "your-secret"
}
)
Store data in the catalog using the write_data() method. All Nautilus built-in Data objects are supported, and any data which inherits from Data can be written.
# Write a list of data objects
catalog.write_data(quote_ticks)
# Write with custom timestamp range
catalog.write_data(
trade_ticks,
start=1704067200000000000, # Optional start timestamp override (UNIX nanoseconds)
end=1704153600000000000, # Optional end timestamp override (UNIX nanoseconds)
)
# Skip disjoint check for overlapping data
catalog.write_data(bars, skip_disjoint_check=True)
The catalog automatically generates filenames based on the timestamp range of the data being
written. Files are named using the pattern {start_timestamp}_{end_timestamp}.parquet, where
each timestamp is an ISO 8601 value converted to a filename-safe form by replacing : and .
with -.
Data is organized in directories by data type and identifier
(instrument ID, bar type, or custom identifier). Identifiers are made URI-safe by removing /:
catalog/
├── data/
│ ├── quote_ticks/
│ │ └── EURUSD.SIM/
│ │ └── 2024-01-01T00-00-00-000000000Z_2024-01-01T23-59-59-999999999Z.parquet
│ └── trade_ticks/
│ └── BTCUSD.BINANCE/
│ └── 2024-01-01T00-00-00-000000000Z_2024-01-01T23-59-59-999999999Z.parquet
Rust backend data types (enhanced performance):
The following data types use optimized Rust implementations:
OrderBookDelta.OrderBookDeltas.OrderBookDepth10.QuoteTick.TradeTick.Bar.MarkPriceUpdate.:::warning
By default, overlapping writes raise a ValueError to maintain data integrity.
Use skip_disjoint_check=True in write_data() to bypass this check when needed.
:::
Use the query() method to read data back from the catalog:
from nautilus_trader.model import QuoteTick, TradeTick
# Query quote ticks for a specific instrument and time range
quotes = catalog.query(
data_cls=QuoteTick,
identifiers=["EUR/USD.SIM"],
start="2024-01-01T00:00:00Z",
end="2024-01-02T00:00:00Z"
)
# Query trade ticks for a specific instrument and time range
trades = catalog.query(
data_cls=TradeTick,
identifiers=["BTC/USD.BINANCE"],
start="2024-01-01",
end="2024-01-02",
)
BacktestDataConfig - data specification for backtestsThe BacktestDataConfig class is the primary mechanism for specifying data requirements before a backtest starts. It defines what data should be loaded from the catalog and how it should be filtered and processed during the backtest execution.
Required parameters:
catalog_path: Path to the data catalog directory.data_cls: The data type class (e.g., QuoteTick, TradeTick, OrderBookDelta, Bar).Optional parameters:
catalog_fs_protocol: Filesystem protocol ('file', 's3', 'gcs', etc.).catalog_fs_storage_options: Storage-specific options (credentials, region, etc.).catalog_fs_rust_storage_options: Storage-specific options for the Rust backend.instrument_id: Specific instrument to load data for.instrument_ids: List of instruments (alternative to single instrument_id).start_time: Start time for data filtering (ISO string or UNIX nanoseconds).end_time: End time for data filtering (ISO string or UNIX nanoseconds).filter_expr: Additional PyArrow filter expressions.client_id: Client ID for custom data types.metadata: Additional metadata for data queries.bar_spec: Bar specification for bar data (e.g., "1-MINUTE-LAST"). When combined
with instrument_id or instrument_ids, this builds ...-EXTERNAL bar identifiers.bar_types: Explicit list of full bar types. Use this for INTERNAL bars or composite bars.optimize_file_loading: Load directories instead of individual files when supported.Loading quote ticks:
from nautilus_trader.config import BacktestDataConfig
from nautilus_trader.model import QuoteTick, InstrumentId
data_config = BacktestDataConfig(
catalog_path="/path/to/catalog",
data_cls=QuoteTick,
instrument_id=InstrumentId.from_str("EUR/USD.SIM"),
start_time="2024-01-01T00:00:00Z",
end_time="2024-01-02T00:00:00Z",
)
Loading multiple instruments:
data_config = BacktestDataConfig(
catalog_path="/path/to/catalog",
data_cls=TradeTick,
instrument_ids=["BTC/USD.BINANCE", "ETH/USD.BINANCE"],
start_time="2024-01-01T00:00:00Z",
end_time="2024-01-02T00:00:00Z",
)
Loading Bar Data:
data_config = BacktestDataConfig(
catalog_path="/path/to/catalog",
data_cls=Bar,
instrument_id=InstrumentId.from_str("AAPL.NASDAQ"),
bar_spec="5-MINUTE-LAST", # Loads AAPL.NASDAQ-5-MINUTE-LAST-EXTERNAL
start_time="2024-01-01",
end_time="2024-01-31",
)
Cloud Storage with Custom Filtering:
data_config = BacktestDataConfig(
catalog_path="s3://my-bucket/nautilus-data/",
catalog_fs_protocol="s3",
catalog_fs_storage_options={
"key": "your-access-key",
"secret": "your-secret-key",
"region": "us-east-1"
},
data_cls=OrderBookDelta,
instrument_id=InstrumentId.from_str("BTC/USD.COINBASE"),
start_time="2024-01-01T09:30:00Z",
end_time="2024-01-01T16:00:00Z",
)
Custom Data with Client ID:
data_config = BacktestDataConfig(
catalog_path="/path/to/catalog",
data_cls="my_package.data.NewsEventData",
client_id="NewsClient",
metadata={"source": "reuters", "category": "earnings"},
start_time="2024-01-01",
end_time="2024-01-31",
)
The BacktestDataConfig objects are integrated into the backtesting framework through BacktestRunConfig:
from nautilus_trader.config import BacktestRunConfig, BacktestVenueConfig
# Define multiple data configurations
data_configs = [
BacktestDataConfig(
catalog_path="/path/to/catalog",
data_cls=QuoteTick,
instrument_id="EUR/USD.SIM",
start_time="2024-01-01",
end_time="2024-01-02",
),
BacktestDataConfig(
catalog_path="/path/to/catalog",
data_cls=TradeTick,
instrument_id="EUR/USD.SIM",
start_time="2024-01-01",
end_time="2024-01-02",
),
]
# Create backtest run configuration
run_config = BacktestRunConfig(
venues=[BacktestVenueConfig(name="SIM", oms_type="HEDGING")],
data=data_configs, # List of data configurations
start="2024-01-01T00:00:00Z",
end="2024-01-02T00:00:00Z",
)
When a backtest runs, the BacktestNode processes each BacktestDataConfig:
ParquetDataCatalog instance from the config.The system automatically handles:
The DataCatalogConfig class provides configuration for on-the-fly data loading scenarios, particularly useful for backtests where the number of possible instruments is vast,
Unlike BacktestDataConfig which pre-specifies data for backtests, DataCatalogConfig enables flexible catalog access during runtime.
Catalogs defined this way can also be used for requesting historical data.
Required Parameters:
path: Path to the data catalog directory.Optional Parameters:
fs_protocol: Filesystem protocol ('file', 's3', 'gcs', 'azure', etc.).fs_storage_options: Protocol-specific storage options.fs_rust_storage_options: Protocol-specific storage options for the Rust backend.name: Optional name identifier for the catalog configuration.Local Catalog Configuration:
from nautilus_trader.persistence.config import DataCatalogConfig
catalog_config = DataCatalogConfig(
path="/path/to/catalog",
fs_protocol="file",
name="local_market_data"
)
# Convert to catalog instance
catalog = catalog_config.as_catalog()
Cloud storage configuration:
catalog_config = DataCatalogConfig(
path="s3://my-bucket/market-data/",
fs_protocol="s3",
fs_storage_options={
"key": "your-access-key",
"secret": "your-secret-key",
"region": "us-west-2",
"endpoint_url": "https://s3.us-west-2.amazonaws.com"
},
name="cloud_market_data"
)
DataCatalogConfig is commonly used in live trading configurations for historical data access:
from nautilus_trader.config import TradingNodeConfig
from nautilus_trader.persistence.config import DataCatalogConfig
# Configure catalog for live system
catalog_config = DataCatalogConfig(
path="/data/nautilus/catalog",
fs_protocol="file",
name="historical_data"
)
# Use in trading node configuration
node_config = TradingNodeConfig(
# ... other configurations
catalogs=[catalog_config], # Enable historical data access
)
For streaming data to catalogs during live trading or backtesting, use StreamingConfig:
from nautilus_trader.persistence.config import StreamingConfig, RotationMode
import pandas as pd
streaming_config = StreamingConfig(
catalog_path="/path/to/streaming/catalog",
fs_protocol="file",
flush_interval_ms=1000, # Flush every second
replace_existing=False,
rotation_mode=RotationMode.INTERVAL,
rotation_interval=pd.Timedelta(hours=1),
max_file_size=1024 * 1024 * 100, # 100MB max file size
)
Historical Data Analysis:
Dynamic data loading:
Research and development:
The catalog's query system uses a dual-backend architecture that selects the query engine based on data type and query parameters.
Rust backend (high performance):
files parameter is None (automatic file discovery).PyArrow backend (flexible):
files parameter is specified.Core query parameters:
catalog.query(
data_cls=QuoteTick, # Data type to query
identifiers=["EUR/USD.SIM"], # Instrument identifiers
start="2024-01-01T00:00:00Z", # Start time (various formats supported)
end="2024-01-02T00:00:00Z", # End time
files=None, # Leave unset for automatic file discovery
)
where= passes a DataFusion SQL predicate to Rust-backed queries.filter_expr= passes a parsed PyArrow dataset expression to PyArrow-backed queries.Time format support:
"2024-01-01T00:00:00Z".1704067200000000000 (or ISO format: "2024-01-01T00:00:00Z").pd.Timestamp("2024-01-01", tz="UTC").Filtering notes:
where= for Rust-backed built-in market data queries.filter_expr= for PyArrow-backed queries, including custom data and queries
forced onto the PyArrow path with files=.The catalog provides several operation functions for maintaining and organizing data files. These operations help optimize storage, improve query performance, and maintain data integrity.
Reset parquet file names to match their actual content timestamps. This ensures filename-based filtering works correctly.
Reset all files in catalog:
# Reset all parquet files in the catalog
catalog.reset_all_file_names()
Reset specific data type:
# Reset filenames for all quote tick files
catalog.reset_data_file_names(QuoteTick)
# Reset filenames for specific instrument's trade files
catalog.reset_data_file_names(TradeTick, "BTC/USD.BINANCE")
Combine multiple small parquet files into larger files to improve query performance and reduce storage overhead.
Consolidate entire catalog:
# Consolidate all files in the catalog
catalog.consolidate_catalog()
# Consolidate files within a specific time range
catalog.consolidate_catalog(
start="2024-01-01T00:00:00Z",
end="2024-01-02T00:00:00Z",
ensure_contiguous_files=True
)
Consolidate specific data type:
# Consolidate all quote tick files
catalog.consolidate_data(QuoteTick)
# Consolidate specific instrument's files
catalog.consolidate_data(
TradeTick,
identifier="BTC/USD.BINANCE",
start="2024-01-01",
end="2024-01-31"
)
Split data files into fixed time periods for standardized file organization.
Consolidate entire catalog by period:
import pandas as pd
# Consolidate all files by 1-day periods
catalog.consolidate_catalog_by_period(
period=pd.Timedelta(days=1)
)
# Consolidate by 1-hour periods within time range
catalog.consolidate_catalog_by_period(
period=pd.Timedelta(hours=1),
start="2024-01-01T00:00:00Z",
end="2024-01-02T00:00:00Z"
)
Consolidate specific data by period:
# Consolidate quote data by 4-hour periods
catalog.consolidate_data_by_period(
data_cls=QuoteTick,
period=pd.Timedelta(hours=4)
)
# Consolidate specific instrument by 30-minute periods
catalog.consolidate_data_by_period(
data_cls=TradeTick,
identifier="EUR/USD.SIM",
period=pd.Timedelta(minutes=30),
start="2024-01-01",
end="2024-01-31"
)
Remove data within a specified time range for specific data types and instruments. This operation permanently deletes data and handles file intersections intelligently.
Delete entire catalog range:
# Delete all data within a time range across the entire catalog
catalog.delete_catalog_range(
start="2024-01-01T00:00:00Z",
end="2024-01-02T00:00:00Z"
)
# Delete all data from the beginning up to a specific time
catalog.delete_catalog_range(end="2024-01-01T00:00:00Z")
Delete specific data type:
# Delete all quote tick data for a specific instrument
catalog.delete_data_range(
data_cls=QuoteTick,
identifier="BTC/USD.BINANCE"
)
# Delete trade data within a specific time range
catalog.delete_data_range(
data_cls=TradeTick,
identifier="EUR/USD.SIM",
start="2024-01-01T00:00:00Z",
end="2024-01-31T23:59:59Z"
)
:::warning Delete operations permanently remove data and cannot be undone. Files that partially overlap the deletion range are split to preserve data outside the range. :::
The catalog supports streaming data to temporary feather files during backtests, which can then be converted to permanent parquet format for efficient querying.
Example: option greeks streaming
from option_trader.greeks import GreeksData
from nautilus_trader.persistence.config import StreamingConfig
# 1. Configure streaming for custom data
streaming = StreamingConfig(
catalog_path=catalog.path,
include_types=[GreeksData],
flush_interval_ms=1000,
)
# 2. Run backtest with streaming enabled
engine_config = BacktestEngineConfig(streaming=streaming)
results = node.run()
# 3. Convert streamed data to permanent catalog
catalog.convert_stream_to_data(
results[0].instance_id,
GreeksData,
)
# 4. Query converted data
greeks_data = catalog.query(
data_cls=GreeksData,
start="2024-01-01",
end="2024-01-31",
where="delta > 0.5",
)
The NautilusTrader data catalog provides market data management:
Core features:
Key use cases:
NautilusTrader defines an internal data format specified in the nautilus_model crate.
These models are serialized into Arrow record batches and written to Parquet files.
Nautilus backtesting is most efficient when using these Nautilus-format Parquet files.
However, migrating the data model between precision modes and schema changes can be challenging. This guide explains how to handle data migrations using our utility tools.
The nautilus_persistence crate provides two key utilities:
to_jsonConverts Parquet files to JSON while preserving metadata:
Creates two files:
<input>.json: Contains the deserialized data<input>.metadata.json: Contains schema metadata and row group configurationAutomatically detects data type from filename:
OrderBookDelta (contains "deltas" or "order_book_delta")QuoteTick (contains "quotes" or "quote_tick")TradeTick (contains "trades" or "trade_tick")Bar (contains "bars")to_parquetConverts JSON back to Parquet format:
<input>.parquet.The following migration examples both use trades data (you can also migrate the other data types in the same way).
All commands should be run from the root of the persistence crate directory.
This example describes a scenario where you want to migrate from standard-precision schema to high-precision schema.
:::note
If you're migrating from a catalog that used the Int64 and UInt64 Arrow data types for prices and sizes,
be sure to check out commit e284162
before compiling the code that writes the initial JSON.
:::
1. Convert from standard-precision Parquet to JSON:
cargo run --bin to_json trades.parquet
This will create trades.json and trades.metadata.json files.
2. Convert from JSON to high-precision Parquet:
Add the --features high-precision flag to write data as high-precision (128-bit) schema Parquet.
cargo run --features high-precision --bin to_parquet trades.json
This will create a trades.parquet file with high-precision schema data.
This example describes a scenario where you want to migrate from one schema version to another.
1. Convert from old schema Parquet to JSON:
Add the --features high-precision flag if the source data uses a high-precision (128-bit) schema.
cargo run --bin to_json trades.parquet
This will create trades.json and trades.metadata.json files.
2. Switch to new schema version:
git checkout <new-version>
3. Convert from JSON back to new schema Parquet:
cargo run --features high-precision --bin to_parquet trades.json
This will create a trades.parquet file with the new schema.
Due to the modular nature of the Nautilus design, it is possible to set up systems with very flexible data streams, including custom user-defined data types. This guide covers some possible use cases for this functionality.
It's possible to create custom data types within the Nautilus system. First you
will need to define your data by subclassing from Data.
:::info
As Data holds no state, it is not strictly necessary to call super().__init__().
:::
from nautilus_trader.core import Data
class MyDataPoint(Data):
"""
This is an example of a user-defined data class, inheriting from the base class `Data`.
The fields `label`, `x`, `y`, and `z` in this class are examples of arbitrary user data.
"""
def __init__(
self,
label: str,
x: int,
y: int,
z: int,
ts_event: int,
ts_init: int,
) -> None:
self.label = label
self.x = x
self.y = y
self.z = z
self._ts_event = ts_event
self._ts_init = ts_init
@property
def ts_event(self) -> int:
"""
UNIX timestamp (nanoseconds) when the data event occurred.
Returns
-------
int
"""
return self._ts_event
@property
def ts_init(self) -> int:
"""
UNIX timestamp (nanoseconds) when the object was initialized.
Returns
-------
int
"""
return self._ts_init
The Data abstract base class acts as a contract within the system and requires two properties
for all types of data: ts_event and ts_init. These represent the UNIX nanosecond timestamps
for when the event occurred and when the object was initialized, respectively.
The recommended approach to satisfy the contract is to assign ts_event and ts_init
to backing fields, and then implement the @property for each as shown above
(for completeness, the docstrings are copied from the Data base class).
:::info
These timestamps enable Nautilus to correctly order data streams for backtests
using monotonically increasing ts_init UNIX nanoseconds.
:::
We can now work with this data type for backtesting and live trading. For instance,
we could now create an adapter which is able to parse and create objects of this
type - and send them back to the DataEngine for consumption by subscribers.
You can publish a custom data type within your actor/strategy using the message bus in the following way:
self.publish_data(
DataType(MyDataPoint, metadata={"some_optional_category": 1}),
MyDataPoint(...),
)
The metadata dictionary optionally adds more granular information that is used in the
topic name to publish data with the message bus.
Extra metadata information can also be passed to a BacktestDataConfig configuration object in order to
enrich and describe custom data objects used in a backtesting context:
from nautilus_trader.config import BacktestDataConfig
data_config = BacktestDataConfig(
catalog_path=str(catalog.path),
data_cls=MyDataPoint,
metadata={"some_optional_category": 1},
)
You can subscribe to custom data types within your actor/strategy in the following way:
self.subscribe_data(
data_type=DataType(MyDataPoint,
metadata={"some_optional_category": 1}),
client_id=ClientId("MY_ADAPTER"),
)
The client_id provides an identifier to route the data subscription to a specific client.
This will result in your actor/strategy passing these received MyDataPoint
objects to your on_data method. You will need to check the type, as this
method acts as a flexible handler for all custom data.
def on_data(self, data: Data) -> None:
# First check the type of data
if isinstance(data, MyDataPoint):
# Do something with the data
Here is an example of publishing and receiving signal data using the MessageBus from an actor or strategy.
A signal is an automatically generated custom data identified by a name containing only one value of a basic type
(str, float, int, bool or bytes).
self.publish_signal("signal_name", value, ts_event)
self.subscribe_signal("signal_name")
def on_signal(self, signal):
print("Signal", signal)
This example demonstrates how to create a custom data type for option Greeks, specifically the delta.
By following these steps, you can create custom data types, subscribe to them, publish them, and store
them in the Cache or ParquetDataCatalog for efficient retrieval.
import msgspec
from nautilus_trader.core import Data
from nautilus_trader.core.datetime import unix_nanos_to_iso8601
from nautilus_trader.model import DataType
from nautilus_trader.serialization.base import register_serializable_type
from nautilus_trader.serialization.arrow.serializer import register_arrow
import pyarrow as pa
from nautilus_trader.model import InstrumentId
from nautilus_trader.core.datetime import dt_to_unix_nanos, unix_nanos_to_dt, format_iso8601
class GreeksData(Data):
def __init__(
self, instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
ts_event: int = 0,
ts_init: int = 0,
delta: float = 0.0,
) -> None:
self.instrument_id = instrument_id
self._ts_event = ts_event
self._ts_init = ts_init
self.delta = delta
def __repr__(self):
return (f"GreeksData(ts_init={unix_nanos_to_iso8601(self._ts_init)}, instrument_id={self.instrument_id}, delta={self.delta:.2f})")
@property
def ts_event(self):
return self._ts_event
@property
def ts_init(self):
return self._ts_init
def to_dict(self):
return {
"instrument_id": self.instrument_id.value,
"ts_event": self._ts_event,
"ts_init": self._ts_init,
"delta": self.delta,
}
@classmethod
def from_dict(cls, data: dict):
return GreeksData(InstrumentId.from_str(data["instrument_id"]), data["ts_event"], data["ts_init"], data["delta"])
def to_bytes(self):
return msgspec.msgpack.encode(self.to_dict())
@classmethod
def from_bytes(cls, data: bytes):
return cls.from_dict(msgspec.msgpack.decode(data))
def to_catalog(self):
return pa.RecordBatch.from_pylist([self.to_dict()], schema=GreeksData.schema())
@classmethod
def from_catalog(cls, table: pa.Table):
return [GreeksData.from_dict(d) for d in table.to_pylist()]
@classmethod
def schema(cls):
return pa.schema(
{
"instrument_id": pa.string(),
"ts_event": pa.int64(),
"ts_init": pa.int64(),
"delta": pa.float64(),
}
)
Here is an example of publishing and receiving data using the MessageBus from an actor or strategy:
register_serializable_type(GreeksData, GreeksData.to_dict, GreeksData.from_dict)
def publish_greeks(self, greeks_data: GreeksData):
self.publish_data(DataType(GreeksData), greeks_data)
def subscribe_to_greeks(self):
self.subscribe_data(DataType(GreeksData))
def on_data(self, data):
if isinstance(data, GreeksData):
print("Data", data)
Here is an example of writing and reading data using the Cache from an actor or strategy:
def greeks_key(instrument_id: InstrumentId):
return f"{instrument_id}_GREEKS"
def cache_greeks(self, greeks_data: GreeksData):
self.cache.add(greeks_key(greeks_data.instrument_id), greeks_data.to_bytes())
def greeks_from_cache(self, instrument_id: InstrumentId):
return GreeksData.from_bytes(self.cache.get(greeks_key(instrument_id)))
For streaming custom data to feather files or writing it to parquet files in a catalog
(register_arrow needs to be used):
register_arrow(GreeksData, GreeksData.schema(), GreeksData.to_catalog, GreeksData.from_catalog)
from nautilus_trader.persistence.catalog import ParquetDataCatalog
catalog = ParquetDataCatalog('.')
catalog.write_data([GreeksData()])
The @customdataclass decorator enables the creation of a custom data class with default
implementations for all the features described above.
Each method can also be overridden if needed. Here is an example of its usage:
from nautilus_trader.model.custom import customdataclass
@customdataclass
class GreeksTestData(Data):
instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX")
delta: float = 0.0
GreeksTestData(
instrument_id=InstrumentId.from_str("CL.GLBX"),
delta=1000.0,
ts_event=1,
ts_init=2,
)
To use custom data with the Rust-backed catalog (ParquetDataCatalog from nautilus_pyo3), use the
@customdataclass_pyo3() decorator instead of @customdataclass. This adds the methods the Rust catalog
expects (JSON and Arrow IPC serialization). After defining your class, register it once. You can pass either
the type (recommended) or a sample instance:
from nautilus_trader.core.nautilus_pyo3 import ParquetDataCatalog
from nautilus_trader.core.nautilus_pyo3.model import CustomData
from nautilus_trader.core.nautilus_pyo3.model import DataType
from nautilus_trader.core.nautilus_pyo3.model import register_custom_data_class
from nautilus_trader.model.custom import customdataclass_pyo3
@customdataclass_pyo3()
class MarketTickPython:
symbol: str = ""
price: float = 0.0
volume: int = 0
# Register by type (no instance needed; call once, e.g. at startup)
register_custom_data_class(MarketTickPython)
catalog = ParquetDataCatalog("/path/to/catalog")
data_type = DataType("MarketTickPython", metadata={"exchange": "NASDAQ"})
wrapped = [
CustomData(
data_type,
MarketTickPython(ts_event=1, ts_init=1, symbol="AAPL", price=150.5, volume=1000),
),
]
catalog.write_custom_data(wrapped)
result = catalog.query("MarketTickPython")
ticks = [item.data for item in result]
See nautilus_trader.model.custom.customdataclass_pyo3 for details.
For better IDE code suggestions, you can create a .pyi
stub file with the proper constructor signature for your custom data types as well as type hints for attributes.
This is particularly useful when the constructor is dynamically generated at runtime, as it allows the IDE to recognize
and provide suggestions for the class's methods and attributes.
For instance, if you have a custom data class defined in greeks.py, you can create a corresponding greeks.pyi file
with the following constructor signature:
from nautilus_trader.core import Data
from nautilus_trader.model import InstrumentId
class GreeksData(Data):
instrument_id: InstrumentId
delta: float
def __init__(
self,
ts_event: int = 0,
ts_init: int = 0,
instrument_id: InstrumentId = InstrumentId.from_str("ES.GLBX"),
delta: float = 0.0,
) -> GreeksData: ...