docs/developer_guide/adapters.md
This developer guide provides specifications for how to build an integration adapter for the NautilusTrader platform.
Adapters connect to trading venues and data providers, translating their native APIs into the platform’s unified interface and normalized domain model.
NautilusTrader adapters follow a layered architecture pattern with:
crates/adapters/your_adapter/)The Rust layer handles:
Typical Rust structure:
crates/adapters/your_adapter/
├── src/
│ ├── common/ # Shared types and utilities
│ │ ├── consts.rs # Venue constants / broker IDs
│ │ ├── credential.rs # API key storage and signing helpers
│ │ ├── enums.rs # Venue enums mirrored in REST/WS payloads
│ │ ├── error.rs # Adapter-level error aggregation (when applicable)
│ │ ├── models.rs # Shared model types
│ │ ├── parse.rs # Shared parsing helpers
│ │ ├── retry.rs # Retry classification (when applicable)
│ │ ├── urls.rs # Environment & product aware base-url resolvers
│ │ └── testing.rs # Fixtures reused across unit tests
│ ├── http/ # HTTP client implementation
│ │ ├── client.rs # HTTP client with authentication
│ │ ├── error.rs # HTTP-specific error types
│ │ ├── models.rs # Structs for REST payloads
│ │ ├── parse.rs # Response parsing functions
│ │ └── query.rs # Request and query builders
│ ├── websocket/ # WebSocket implementation
│ │ ├── client.rs # WebSocket client
│ │ ├── dispatch.rs # Execution event dispatch and order routing
│ │ ├── enums.rs # WebSocket-specific enums
│ │ ├── error.rs # WebSocket-specific error types
│ │ ├── handler.rs # Feed handler (I/O boundary)
│ │ ├── messages.rs # Frame and message enums
│ │ ├── parse.rs # Message parsing functions
│ │ └── subscription.rs # Subscription topic helpers (optional)
│ ├── python/ # PyO3 Python bindings
│ │ ├── enums.rs # Python-exposed enums
│ │ ├── http.rs # Python HTTP client bindings
│ │ ├── urls.rs # Python URL helpers
│ │ ├── websocket.rs # Python WebSocket client bindings
│ │ └── mod.rs # Module exports
│ ├── config.rs # Configuration structures
│ ├── data.rs # Data client implementation
│ ├── execution.rs # Execution client implementation
│ ├── factories.rs # Factory functions
│ └── lib.rs # Library entry point
├── tests/ # Integration tests with mock servers
│ ├── data_client.rs # Data client integration tests
│ ├── exec_client.rs # Execution client integration tests
│ ├── http.rs # HTTP client integration tests
│ └── websocket.rs # WebSocket client integration tests
└── test_data/ # Canonical venue payloads
nautilus_trader/adapters/your_adapter)The Python layer provides the integration interface through these components:
InstrumentProvider.LiveDataClient and LiveMarketDataClient.LiveExecutionClient.Typical Python structure:
nautilus_trader/adapters/your_adapter/
├── config.py # Configuration classes
├── constants.py # Adapter constants
├── data.py # LiveDataClient/LiveMarketDataClient
├── execution.py # LiveExecutionClient
├── factories.py # Instrument factories
├── providers.py # InstrumentProvider
└── __init__.py # Package initialization
Follow this dependency-driven order when building an adapter. Each phase builds on the previous one. Implement the Rust core before any Python layer.
Build the low-level networking and parsing foundation.
| Step | Component | Description |
|---|---|---|
| 1.1 | HTTP error types | Define HTTP‑specific error enum with retryable/non‑retryable variants (http/error.rs). |
| 1.2 | HTTP client | Implement credentials, request signing, rate limiting, and retry logic. |
| 1.3 | HTTP API models | Define request/response structs for REST endpoints (http/models.rs, http/query.rs). |
| 1.4 | HTTP parsing | Convert venue responses to Nautilus domain models (http/parse.rs, common/parse.rs). |
| 1.5 | WebSocket error types | Define WebSocket‑specific error enum (websocket/error.rs). |
| 1.6 | WebSocket client | Implement connection lifecycle, authentication, heartbeat, and reconnection. |
| 1.7 | WebSocket messages | Define streaming payload types (websocket/messages.rs). |
| 1.8 | WebSocket parsing | Convert stream messages to Nautilus domain models (websocket/parse.rs). |
| 1.9 | Python bindings | Expose Rust functionality via PyO3 (python/mod.rs). |
Milestone: Rust crate compiles, unit tests pass, HTTP/WebSocket clients can authenticate and stream/request raw data.
Instruments are the foundation: both data and execution clients depend on them.
| Step | Component | Description |
|---|---|---|
| 2.1 | Instrument parsing | Parse venue instrument definitions into Nautilus types (spot, perpetual, future, option). |
| 2.2 | Instrument provider | Implement InstrumentProvider to load, filter, and cache instruments. |
| 2.3 | Symbol mapping | Handle venue‑specific symbol formats and Nautilus InstrumentId conversion. |
Milestone: InstrumentProvider.load_all_async() returns valid Nautilus instruments.
Build data subscriptions and historical data requests.
| Step | Component | Description |
|---|---|---|
| 3.1 | Public WebSocket streams | Subscribe to order books, trades, tickers, and other public channels. |
| 3.2 | Historical data requests | Fetch historical bars, trades, and order book snapshots via HTTP. |
| 3.3 | Data client (Python) | Implement LiveDataClient or LiveMarketDataClient wiring Rust clients to the data engine. |
Milestone: Data client connects, subscribes to instruments, and emits market data to the platform.
Build order management and account state.
| Step | Component | Description |
|---|---|---|
| 4.1 | Private WebSocket streams | Subscribe to order updates, fills, positions, and account balance changes. |
| 4.2 | Basic order submission | Implement market and limit orders via HTTP or WebSocket. |
| 4.3 | Order modification/cancel | Implement order amendment and cancellation. |
| 4.4 | Execution client (Python) | Implement LiveExecutionClient wiring Rust clients to the execution engine. |
| 4.5 | Execution reconciliation | Generate order, fill, and position status reports for startup reconciliation. |
Milestone: Execution client submits orders, receives fills, and reconciles state on connect.
Extend coverage based on venue capabilities.
| Step | Component | Description |
|---|---|---|
| 5.1 | Advanced order types | Conditional orders, stop‑loss, take‑profit, trailing stops, iceberg, etc. |
| 5.2 | Batch operations | Batch order submission, batch cancellation, mass cancel. |
| 5.3 | Venue‑specific features | Options chains, funding rates, liquidations, or other venue‑specific data. |
Wire everything together for production usage.
| Step | Component | Description |
|---|---|---|
| 6.1 | Configuration classes | Create LiveDataClientConfig and LiveExecClientConfig subclasses. |
| 6.2 | Factory functions | Implement factory functions to instantiate clients from configuration. |
| 6.3 | Environment variables | Support credential resolution from environment variables. |
Validate the integration and document usage.
| Step | Component | Description |
|---|---|---|
| 7.1 | Rust unit tests | Test parsers, signing helpers, and business logic in #[cfg(test)] blocks. |
| 7.2 | Rust integration tests | Test HTTP/WebSocket clients against mock Axum servers in tests/. |
| 7.3 | Python integration tests | Test data/execution clients in tests/integration_tests/adapters/<adapter>/. |
| 7.4 | Example scripts | Provide runnable examples demonstrating data subscription and order execution. |
See the Testing section for detailed test organization guidelines.
common/)Group venue constants, credential helpers, enums, and reusable parsers under src/common.
Adapters such as OKX keep submodules like consts, credential, enums, and urls alongside a testing module
for fixtures, providing a single place for cross-cutting pieces.
When an adapter has multiple environments or product categories, add a dedicated common::urls helper so
REST/WebSocket base URLs stay in sync with the Python layer.
common/symbol.rs)When a venue uses a different symbol format than Nautilus InstrumentId, place bidirectional
conversion helpers in common/symbol.rs. Two functions form the standard interface:
format_instrument_id(venue_symbol, product_type) converts a venue symbol string to a
Nautilus InstrumentId, appending or transforming product-type suffixes as needed
(e.g., "BTCUSDT" + Linear becomes "BTCUSDT-LINEAR.BYBIT").format_venue_symbol(instrument_id) strips Nautilus suffixes to recover the venue-native
symbol for API calls.Common patterns across adapters:
-SPOT, -LINEAR, -INVERSE, -OPTION.
A BybitSymbol wrapper validates the suffix and normalizes to uppercase on construction.-PERP at the Nautilus layer
while COIN-M keeps the venue's existing _PERP suffix.Ustr interning: Store normalized symbols as Ustr for zero-cost comparison.For venues where the raw symbol maps 1:1 to an InstrumentId (no suffix gymnastics), inline
helpers in common/parse.rs are sufficient and a dedicated symbol.rs is not needed.
Define URL constants and resolution functions in common/urls.rs:
const VENUE_WS_URL: &str = "wss://stream.venue.com/ws";
const VENUE_TESTNET_WS_URL: &str = "wss://testnet-stream.venue.com/ws";
pub const fn get_ws_base_url(testnet: bool) -> &'static str {
if testnet { VENUE_TESTNET_WS_URL } else { VENUE_WS_URL }
}
Config structs should provide override fields (base_url_http, base_url_ws, etc.) that fall back
to these defaults when unset.
config.rs)Expose typed config structs in src/config.rs so Python callers toggle venue-specific behaviour
(see how OKX wires demo URLs, retries, and channel flags).
Keep defaults minimal and delegate URL selection to helpers in common::urls.
For the user-facing design rationale, see the Configuration
concept guide.
Config structs derive bon::Builder and implement Default. The builder owns all default
values via #[builder(default = value)] annotations. The Default impl delegates to the
builder so defaults are defined in exactly one place:
#[derive(Clone, Debug, bon::Builder)]
pub struct VenueDataClientConfig {
pub api_key: Option<String>,
#[builder(default = 60)]
pub http_timeout_secs: u64,
#[builder(default = 3)]
pub max_retries: u32,
}
impl Default for VenueDataClientConfig {
fn default() -> Self {
Self::builder().build()
}
}
This prevents drift between builder defaults and Default output. Never duplicate
default values in the Default impl body.
Bon always defaults Option<T> fields to None. For the rare case where an
Option<T> field should default to Some(value), override it in the Default impl
and delegate everything else to the builder:
impl Default for VenueDataClientConfig {
fn default() -> Self {
Self {
poll_interval_secs: Some(60),
..Self::builder().build()
}
}
}
Use plain T with #[builder(default = value)] when a field always has a sensible
default and downstream code consumes the value directly:
#[builder(default = 60)]
pub http_timeout_secs: u64,
Use Option<T> (no builder annotation) when None carries distinct meaning such as
"feature disabled", "unbounded", or "inherit from environment":
/// Interval in seconds between open order checks.
/// When `None`, open order polling is disabled.
pub open_check_interval_secs: Option<f64>,
Choose the type based on the config's own semantics, not downstream function
signatures. If None means "this feature is off" at the config level, use
Option<T>. If the field always resolves to a concrete value, use plain T
even when a downstream constructor still accepts Option<T> and the call site
wraps with Some(config.field).
The py_new constructor accepts Option<T> for all configurable fields (Python callers
pass None to mean "use default"). For plain T fields, unwrap against the default:
fn py_new(http_timeout_secs: Option<u64>) -> Self {
let defaults = Self::default();
Self {
http_timeout_secs: http_timeout_secs.unwrap_or(defaults.http_timeout_secs),
..
}
}
For Option<T> fields, use .or() to fall back to the default option value.
When the default is None, this preserves the caller's None. When the default
is Some(value), this fills in the default if the caller passed None:
open_check_interval_secs: open_check_interval_secs.or(defaults.open_check_interval_secs),
Use sensible production defaults: credentials as None (resolved from environment at
runtime), mainnet URLs, standard timeouts. For trader_id and account_id, use
placeholder values like TraderId::from("TRADER-001") and AccountId::from("VENUE-001").
The ..Default::default() pattern keeps examples and tests focused on fields that
differ from defaults:
let config = VenueExecClientConfig {
trader_id,
account_id,
environment: VenueEnvironment::Testnet,
..Default::default()
};
common/error.rs)For adapters with multiple client types, define an adapter-level error enum in common/error.rs that
aggregates component errors:
#[derive(Debug, thiserror::Error)]
pub enum VenueError {
#[error("HTTP error: {0}")]
Http(#[from] VenueHttpError),
#[error("WebSocket error: {0}")]
WebSocket(#[from] VenueWsError),
#[error("Build error: {0}")]
Build(#[from] VenueBuildError),
}
This enables unified error handling at the adapter boundary while preserving component-specific error details for debugging.
common/retry.rs)When an adapter needs sophisticated retry logic, define a retry classification module in common/retry.rs
that distinguishes between retryable, non-retryable, and fatal errors:
#[derive(Debug, thiserror::Error)]
pub enum VenueError {
#[error("Retryable error: {source}")]
Retryable {
#[source]
source: VenueRetryableError,
retry_after: Option<Duration>,
},
#[error("Non-retryable error: {source}")]
NonRetryable {
#[source]
source: VenueNonRetryableError,
},
#[error("Fatal error: {source}")]
Fatal {
#[source]
source: VenueFatalError,
},
}
Include helper methods like from_http_status(), from_rate_limit_headers(), is_retryable(),
is_fatal(), and retry_after() to enable consistent error classification across the adapter.
See BitMEX and Bybit adapters for reference implementations.
python/mod.rs)Mirror the Rust surface area through PyO3 modules by re-exporting clients, enums, and helper functions.
When new functionality lands in Rust, add it to python/mod.rs so the Python layer stays in sync
(the OKX adapter is a good reference).
python/)Expose Rust functionality to Python through PyO3.
Mark venue-specific structs that need Python access with #[pyclass] and implement #[pymethods] blocks with
#[getter] attributes for field access.
For async methods in the HTTP client, use pyo3_async_runtimes::tokio::future_into_py to convert Rust futures
into Python awaitables.
When returning lists of custom types, map each item with Py::new(py, item) before constructing the Python list.
Register all exported classes and enums in python/mod.rs using m.add_class::<YourType>() so they're available
to Python code.
Follow the pattern established in other adapters: prefixing Python-facing methods with py_* in Rust while using
#[pyo3(name = "method_name")] to expose them without the prefix.
When delivering instruments from WebSocket to Python, use instrument_any_to_pyobject() which returns PyO3 types
for caching.
For the reverse direction (Python→Rust), use pyobject_to_instrument_any() in cache_instrument() methods.
Never call .into_py_any() directly on InstrumentAny as it doesn't implement the required trait.
Adapter-specific types (enums, structs) and Nautilus domain types should not be fully qualified.
Import them at the module level and use short names (e.g., OKXContractType instead of
crate::common::enums::OKXContractType, InstrumentId instead of nautilus_model::identifiers::InstrumentId).
This keeps code concise and readable.
Only fully qualify types from anyhow and tokio to avoid ambiguity with similarly-named types from other crates.
Use ustr::Ustr for any non-unique strings the platform stores repeatedly (venues, symbols, instrument IDs) to
minimise allocations and comparisons.
All clients that cache instruments must implement three methods with standardized names: cache_instruments()
(plural, bulk replace), cache_instrument() (singular, upsert), and get_instrument() (retrieve by symbol).
WebSocket clients store instruments in Arc<DashMap<Ustr, InstrumentAny>> on the outer client for
thread-safe access across clones.
common/testing.rs)Store shared fixtures and payload loaders in src/common/testing.rs for use across HTTP and WebSocket unit tests.
This keeps #[cfg(test)] helpers out of production modules and encourages reuse.
common/status.rs)When a data client polls instrument status via REST, place the diff logic in common/status.rs
rather than inlining it in the data client. The standard function signature is:
pub fn diff_and_emit_statuses(
new_statuses: &AHashMap<InstrumentId, MarketStatusAction>,
cached_statuses: &mut AHashMap<InstrumentId, MarketStatusAction>,
subscriptions: Option<&DashSet<InstrumentId>>,
sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
ts_event: UnixNanos,
ts_init: UnixNanos,
)
The function compares each entry in new_statuses against cached_statuses, emitting an
InstrumentStatus event for any instrument whose MarketStatusAction changed. Instruments
present in the cache but absent from the new snapshot are treated as removed and emit
NotAvailableForTrading. The cache always reflects the full API state.
Pass subscriptions as Some(&set) to restrict emissions to subscribed instruments, or
None to emit all changes unconditionally. The data client stores the cache in an
Arc<RwLock<AHashMap<InstrumentId, MarketStatusAction>>> and calls this function on each
poll cycle.
factories.rs)Complex adapters may define a factories.rs module for converting venue data to Nautilus types.
This centralizes transformation logic that would otherwise be scattered across HTTP and WebSocket
parsers:
// factories.rs
pub fn create_instrument(
venue_instrument: &VenueInstrument,
ts_init: UnixNanos,
) -> anyhow::Result<InstrumentAny> {
match venue_instrument.instrument_type {
InstrumentType::Perpetual => parse_perpetual(venue_instrument, ts_init),
InstrumentType::Future => parse_future(venue_instrument, ts_init),
InstrumentType::Option => parse_option(venue_instrument, ts_init),
}
}
Use this pattern when the same venue data structures are parsed in multiple places (HTTP responses, WebSocket updates, historical data).
connect)Both data and execution clients follow a strict initialization order during connect() to prevent
race conditions with reconciliation and strategy startup. The platform waits for all clients to
signal connected before running reconciliation or starting strategies, so all initialization must
complete within connect().
Data clients emit events to the platform through an unbounded channel obtained at construction:
let data_sender = get_data_event_sender();
The DataEvent enum carries all data types the client produces:
| Variant | Usage |
|---|---|
DataEvent::Instrument | Instrument definitions during bootstrap and updates. |
DataEvent::InstrumentStatus | Market status changes from polling or WS streams. |
DataEvent::Data | Market data (trades, quotes, book deltas, bars). |
DataEvent::Response | Responses to historical data requests. |
DataEvent::FundingRate | Funding rate updates for derivatives. |
Send events with self.data_sender.send(DataEvent::Instrument(instrument)). Log warnings
on send failure but do not propagate the error since a closed receiver means the system
is shutting down. Clone the sender for spawned tasks that emit data from async work.
bootstrap_instruments() or equivalent.DataEvent::Instrument via data_sender.
These events are queued during startup and processed before reconciliation runs.ws.cache_instruments() so the handler can parse messages.async fn connect(&mut self) -> anyhow::Result<()> {
let instruments = self.bootstrap_instruments().await?;
ws.cache_instruments(instruments);
ws.connect().await?;
ws.wait_until_active(10.0).await?;
// ...
}
ensure_instruments_initialized_async() which checks
self.core.instruments_initialized() and returns early if instruments are already cached.
Otherwise it fetches instruments via REST and caches them to the HTTP client, WebSocket
client, and any broadcaster clients.refresh_account_state() which requests balances and
margins via REST, builds an AccountState, and emits it through the
ExecutionEventEmitter.await_account_registered(timeout_secs) which polls
self.core.cache().account(&account_id) at 10ms intervals until the account appears or
the timeout expires. This step blocks connect so the portfolio can process orders during
reconciliation.self.core.set_connected().async fn connect(&mut self) -> anyhow::Result<()> {
self.ensure_instruments_initialized_async().await?;
self.ws_client.connect().await?;
self.ws_client.wait_until_active(10.0).await?;
// ... subscribe channels, start stream ...
self.refresh_account_state().await?;
self.await_account_registered(30.0).await?;
self.core.set_connected();
Ok(())
}
The ExecutionEventEmitter provides two methods for emitting account state:
emit_account_state(balances, margins, reported, ts_event) builds an AccountState
from raw parameters using the internal OrderEventFactory, then dispatches it. Use
this when the adapter has individual balance and margin values to combine.send_account_state(state) dispatches a pre-built AccountState. Use this when the
adapter already has a fully constructed state from parsing an HTTP or WebSocket payload.Adapters use a two-layer HTTP client architecture: a raw client for low-level API operations and a domain client for high-level logic. The split also enables efficient cloning for Python bindings.
The architecture consists of two complementary clients:
MyRawHttpClient) - Low-level API methods matching venue endpoints.MyHttpClient) - High-level methods using Nautilus domain types.use std::sync::Arc;
use nautilus_network::http::HttpClient;
// Raw HTTP client - low-level API methods matching venue endpoints
pub struct MyRawHttpClient {
base_url: String,
client: HttpClient, // Use nautilus_network::http::HttpClient, not reqwest directly
credential: Option<Credential>,
retry_manager: RetryManager<MyHttpError>,
cancellation_token: CancellationToken,
}
// Domain HTTP client - wraps raw client with Arc, provides high-level API
pub struct MyHttpClient {
pub(crate) inner: Arc<MyRawHttpClient>,
// Additional domain-specific state (e.g., instrument cache)
instruments: DashMap<InstrumentId, InstrumentAny>,
}
Key points:
MyRawHttpClient) contains low-level HTTP methods named to match venue endpoints
(e.g., get_instruments, get_balance, place_order). These methods take venue-specific query
objects and return venue-specific response types.MyHttpClient) wraps the raw client in an Arc for efficient cloning (required
for Python bindings). It provides high-level methods that accept Nautilus domain types
(e.g., InstrumentId, ClientOrderId) and return domain objects. It may also cache instruments
or other venue metadata.nautilus_network::http::HttpClient instead of reqwest::Client directly for rate limiting,
retry logic, and consistent error handling.Parser functions convert venue-specific data structures into Nautilus domain objects. Place them in
common/parse.rs for cross-cutting conversions (instruments, trades, bars) or http/parse.rs for
REST-specific transformations. Each parser takes venue data plus context (account IDs, timestamps,
instrument references) and returns a Nautilus domain type wrapped in Result.
Standard patterns:
.parse::<f64>() and anyhow::Context."" instead of omitting fields.match statements rather than implementing automatic conversions that could hide mapping errors.parse_position_status_report, parse_order_status_report, parse_trade_tick.Place parsing helpers (parse_price_with_precision, parse_timestamp) in the same module as private functions when they're reused across multiple parsers.
The raw client mirrors venue endpoints with venue-specific parameter and response types. The domain client wraps it and exposes high-level methods that accept Nautilus domain types.
Naming conventions:
get_instruments, get_balance, place_order). These methods are internal to the raw client and take venue-specific types (builders, JSON values).request_instruments, submit_order, cancel_order). These are the methods exposed to Python and take Nautilus domain objects (InstrumentId, ClientOrderId, OrderSide, etc.).Domain method flow:
Domain methods follow a three-step pattern: build venue-specific parameters from Nautilus types, call the corresponding raw client method, then parse the response. For endpoints returning domain objects (positions, orders, trades), call parser functions from common/parse. For endpoints returning raw venue data (fee rates, balances), extract the result directly from the response envelope. Methods prefixed with request_* indicate they return domain data, while methods like submit_*, cancel_*, or modify_* perform actions and return acknowledgments.
The domain client wraps the raw client in an Arc for efficient cloning required by Python bindings.
Use the derive_builder crate with proper defaults and ergonomic Option handling:
use derive_builder::Builder;
#[derive(Clone, Debug, Deserialize, Serialize, Builder)]
#[serde(rename_all = "camelCase")]
#[builder(setter(into, strip_option), default)]
pub struct InstrumentsInfoParams {
pub category: ProductType,
#[serde(skip_serializing_if = "Option::is_none")]
pub symbol: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<u32>,
}
impl Default for InstrumentsInfoParams {
fn default() -> Self {
Self {
category: ProductType::Linear,
symbol: None,
limit: None,
}
}
}
Key attributes:
#[builder(setter(into, strip_option), default)] - enables clean API: .symbol("BTCUSDT") instead of .symbol(Some("BTCUSDT".to_string())).#[serde(skip_serializing_if = "Option::is_none")] - omits optional fields from query strings.Default for builder parameters.Keep signing logic in a Credential struct under common/credential.rs:
Ustr for efficient comparison, secrets in Box<[u8]> with #[zeroize].sign() and sign_bytes() methods that compute HMAC-SHA256 signatures.For WebSocket authentication, the handler constructs login messages using the same Credential::sign() method with a WebSocket-specific timestamp format.
Each adapter's common/credential.rs must provide two things:
credential_env_vars() free function: returns environment variable names as a tuple.Credential::resolve() method: resolves credentials from config values or environment
variables using resolve_env_var_pair from nautilus_core::env.Config structs are DTOs and must not contain credential resolution logic. All resolution
belongs in credential.rs.
Standard layout:
use nautilus_core::env::resolve_env_var_pair;
/// Returns the environment variable names for API credentials.
pub fn credential_env_vars(is_testnet: bool) -> (&'static str, &'static str) {
if is_testnet {
("{VENUE}_TESTNET_API_KEY", "{VENUE}_TESTNET_API_SECRET")
} else {
("{VENUE}_API_KEY", "{VENUE}_API_SECRET")
}
}
impl Credential {
/// Resolves credentials from provided values or environment variables.
pub fn resolve(
api_key: Option<String>,
api_secret: Option<String>,
is_testnet: bool,
) -> Option<Self> {
let (key_var, secret_var) = credential_env_vars(is_testnet);
let (k, s) = resolve_env_var_pair(api_key, api_secret, key_var, secret_var)?;
Some(Self::new(k, s))
}
}
Adapters load API credentials from environment variables when not provided directly, avoiding hardcoded secrets.
Naming conventions:
| Environment | API Key Variable | API Secret Variable |
|---|---|---|
| Mainnet/Live | {VENUE}_API_KEY | {VENUE}_API_SECRET |
| Testnet | {VENUE}_TESTNET_API_KEY | {VENUE}_TESTNET_API_SECRET |
| Demo | {VENUE}_DEMO_API_KEY | {VENUE}_DEMO_API_SECRET |
Some venues require additional credentials:
OKX_API_PASSPHRASEKey principles:
credential_env_vars(), never
duplicated as string literals across files.get_or_env_var_opt for optional credentials (returns None if missing).get_or_env_var when credentials are required (returns error if missing).Use the RetryManager from nautilus_network for consistent retry behavior.
Configure rate limiting through HttpClient using LazyLock<Quota> static variables.
Naming conventions:
{VENUE}_REST_QUOTA (e.g., OKX_REST_QUOTA, BYBIT_REST_QUOTA){VENUE}_WS_{OPERATION}_QUOTA (e.g., OKX_WS_CONNECTION_QUOTA, OKX_WS_ORDER_QUOTA){VENUE}_RATE_LIMIT_KEY_{OPERATION} (e.g., OKX_RATE_LIMIT_KEY_SUBSCRIPTION, OKX_RATE_LIMIT_KEY_ORDER)Standard rate limit keys for WebSocket:
| Key | Operations |
|---|---|
*_RATE_LIMIT_KEY_SUBSCRIPTION | Subscribe, unsubscribe, login. |
*_RATE_LIMIT_KEY_ORDER | Place orders (regular and algo). |
*_RATE_LIMIT_KEY_CANCEL | Cancel orders, mass cancel. |
*_RATE_LIMIT_KEY_AMEND | Amend/modify orders. |
Example:
pub static OKX_REST_QUOTA: LazyLock<Quota> =
LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
pub static OKX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
LazyLock::new(|| Quota::per_hour(NonZeroU32::new(480).unwrap()));
pub const OKX_RATE_LIMIT_KEY_ORDER: &str = "order";
Pass rate limit keys when sending WebSocket messages to enforce per-operation quotas:
self.send_with_retry(payload, Some(vec![OKX_RATE_LIMIT_KEY_ORDER.to_string()])).await
WebSocket clients handle real-time streaming data. They manage connection state, authentication, subscriptions, and reconnection logic.
WebSocket adapters use a two-layer architecture to separate Python-accessible state from high-performance async I/O:
Track connection state using Arc<ArcSwap<AtomicU8>> to provide lock-free, race-free visibility across all clones:
use arc_swap::ArcSwap;
pub struct MyWebSocketClient {
connection_mode: Arc<ArcSwap<AtomicU8>>, // Shared connection mode (lock-free)
signal: Arc<AtomicBool>, // Cancellation signal for graceful shutdown
// ...
}
Pattern breakdown:
Arc: Shared across all clones (Python bindings clone clients before async operations).ArcSwap: Enables atomic pointer replacement via .store() without replacing the outer Arc.Arc<AtomicU8>: The actual connection state from WebSocketClient::connection_mode_atomic().Initialize with a placeholder atomic (ConnectionMode::Closed), then in connect() call
.store(client.connection_mode_atomic()) to atomically swap to the real client's state.
All clones see updates instantly through lock-free .load() calls in is_active().
The underlying WebSocketClient sends a RECONNECTED sentinel message when reconnection completes, triggering resubscription logic in the handler.
Outer client ({Venue}WebSocketClient):
Arc<DashMap<K, V>>.cmd_tx channel.out_rx channel.Inner handler ({Venue}WsFeedHandler):
WebSocketClient exclusively (no RwLock needed).cmd_rx → serializes to JSON → sends via WebSocket.{Venue}WsFrame → converts to {Venue}WsMessage → emits via out_tx.AHashMap<K, V> (single-threaded, no locking).VecDeque<{Venue}WsMessage> to buffer multi-message yields from a single frame parse.Some venues expose separate WebSocket endpoints for market data and order management
(different URLs, authentication flows, or message protocols). In this case, split into
two client+handler pairs under websocket/data/ and websocket/orders/ subdirectories,
each following the same two-layer pattern. Name them {Venue}MdWebSocketClient /
{Venue}MdWsFeedHandler and {Venue}OrdersWebSocketClient / {Venue}OrdersWsFeedHandler.
Communication pattern:
flowchart LR
subgraph client["Client (orchestrator)"]
cmd_tx["cmd_tx
├ Subscribe { args }
├ PlaceOrder { params }
└ MassCancel { id }"]
out_rx["out_rx
← {Venue}WsMessage
← Authenticated
← ChannelData"]
end
subgraph handler["Handler (I/O boundary)"]
cmd_rx[cmd_rx]
out_tx[out_tx]
ws[WebSocket]
end
cmd_tx --> cmd_rx
cmd_rx -->|"serialize"| ws
ws -->|"parse → transform"| out_tx
out_tx --> out_rx
Key principles:
WebSocketClient, client sends commands via lock-free mpsc channel.HandlerCommand enum.{Venue}WsMessage events (including Authenticated), client maintains state from events.AHashMap for matching responses (no Arc<DashMap> between layers).VecDeque<{Venue}WsMessage> for frames that produce multiple output messages. The next() method drains the queue before polling channels.Arc<DashMap> only for state Python might query; handler uses AHashMap for internal matching.Authentication state is managed through events:
Login response → returns {Venue}WsMessage::Authenticated immediately.AuthTracker (from nautilus_network::websocket::auth) tracks auth state across threads.The AuthTracker struct from nautilus_network provides thread-safe authentication state:
pub struct AuthTracker {
tx: Arc<Mutex<Option<AuthResultSender>>>,
authenticated: Arc<AtomicBool>,
}
AuthTracker is internally Arc-based, so cloning shares state. Both client and handler
store auth_tracker: AuthTracker and receive a .clone() of the same instance. The tracker
exposes a four-method lifecycle: begin() starts an attempt and returns a one-shot receiver,
succeed() sets the authenticated flag and notifies the receiver, fail(message) clears
the flag with an error, and invalidate() clears the flag on disconnect. Downstream
consumers query is_authenticated() for lock-free reads via the internal AtomicBool.
Note: The Authenticated message is consumed in the client's spawn loop for reconnection
flow coordination and is not forwarded to downstream consumers (data/execution clients).
Downstream consumers can query authentication state via AuthTracker if needed. The execution
client's Authenticated handler only logs at debug level with no important logic depending
on this event.
SubscriptionState patternThe SubscriptionState struct from nautilus_network::websocket is shared between client and handler using Arc<DashMap<>> internally for thread-safe access:
SubscriptionState is shared via Arc: Both client and handler receive .clone() of the same instance (shallow clone of Arc pointers).mark_subscribe, mark_unsubscribe), handler tracks server confirmations (confirm_subscribe, confirm_unsubscribe, mark_failure).A subscription represents any topic in one of two states:
| State | Description |
|---|---|
| Pending | Subscription request sent to venue, awaiting acknowledgment. |
| Confirmed | Venue acknowledged subscription and is actively streaming data. |
State transitions follow this lifecycle:
| Trigger | Method Called | From State | To State | Notes |
|---|---|---|---|---|
| User subscribes | mark_subscribe() | — | Pending | Topic added to pending set. |
| Venue confirms | confirm() | Pending | Confirmed | Moved from pending to confirmed. |
| Venue rejects | mark_failure() | Pending | Pending | Stays pending for retry on reconnect. |
| User unsubscribes | mark_unsubscribe() | Confirmed | Pending | Temporarily pending until ack. |
| Unsubscribe ack | clear_pending() | Pending | Removed | Topic fully removed. |
Key principles:
subscription_count() reports only confirmed subscriptions, not pending ones.op field in acknowledgments to avoid re-confirming topics.Adapters use venue-specific delimiters to structure subscription topics:
| Adapter | Delimiter | Example | Pattern |
|---|---|---|---|
| BitMEX | : | trade:XBTUSD | {channel}:{symbol} |
| OKX | : | trades:BTC-USDT-SWAP | {channel}:{symbol} |
| Bybit | . | orderbook.50.BTCUSDT | {channel}.{depth}.{symbol} |
Parse topics using split_once() with the appropriate delimiter to extract channel and symbol components.
On reconnection, restore authentication and subscriptions:
Track subscriptions: Preserve original subscription arguments in collections (e.g., Arc<DashMap>) to avoid parsing topics back to arguments.
Reconnection flow:
{Venue}WsMessage::Reconnected from handler.Preserving subscription arguments:
Store original subscription arguments in a separate collection to enable deterministic reconnection replay without parsing topics back into arguments:
pub struct MyWebSocketClient {
subscription_state: Arc<SubscriptionState>,
subscription_args: Arc<DashMap<String, SubscriptionArgs>>, // topic → original args
// ...
}
impl MyWebSocketClient {
async fn subscribe(&self, args: SubscriptionArgs) -> Result<(), Error> {
let topic = args.to_topic();
self.subscription_state.mark_subscribe(&topic);
self.subscription_args.insert(topic.clone(), args.clone());
self.send_cmd(HandlerCommand::Subscribe(args)).await
}
async fn unsubscribe(&self, topic: &str) -> Result<(), Error> {
self.subscription_state.mark_unsubscribe(topic);
self.subscription_args.remove(topic);
self.send_cmd(HandlerCommand::Unsubscribe(topic.to_string())).await
}
async fn restore_subscriptions(&self) {
for entry in self.subscription_args.iter() {
let _ = self.send_cmd(HandlerCommand::Subscribe(entry.value().clone())).await;
}
}
}
This avoids complex topic parsing and ensures subscriptions are replayed exactly as originally requested.
Support both WebSocket control frame pings and application-level text pings:
WebSocketClient via the PingHandler callback."ping"/"pong" text messages. Configure heartbeat_msg: Some(TEXT_PING.to_string()) in WebSocketConfig and respond to incoming TEXT_PING with TEXT_PONG in the handler.The handler should check for ping messages early in the message processing loop and respond immediately to maintain connection health.
close)The close() method follows a three-step shutdown sequence: signal, command, await.
impl MyWebSocketClient {
pub async fn close(&mut self) -> Result<(), MyWsError> {
tracing::debug!("Starting close process");
// 1. Send disconnect command so handler can clean up gracefully
if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
tracing::warn!("Failed to send disconnect command to handler: {e}");
}
// 2. Set stop signal so handler loop exits after processing disconnect
self.signal.store(true, Ordering::Release);
// 3. Await task handle with timeout, abort if stuck
if let Some(task_handle) = self.task_handle.take() {
match Arc::try_unwrap(task_handle) {
Ok(handle) => {
let abort_handle = handle.abort_handle();
match tokio::time::timeout(Duration::from_secs(2), handle).await {
Ok(Ok(())) => tracing::debug!("Handler task completed"),
Ok(Err(e)) => tracing::error!("Handler task error: {e:?}"),
Err(_) => {
tracing::warn!("Timeout waiting for handler task, aborting");
abort_handle.abort();
}
}
}
Err(arc_handle) => {
tracing::debug!("Cannot unwrap task handle, aborting");
arc_handle.abort();
}
}
}
Ok(())
}
}
Key points:
Disconnect before setting the stop signal so the handler processes it before exiting.Result<(), {Venue}WsError> so callers can handle failures.Ordering::Release on the signal store so the handler sees the write.abort_handle before awaiting so it remains available after timeout.Arc::try_unwrap fails (other clones exist), abort directly.stream)The outer client exposes a stream() method that hands ownership of out_rx to the
caller as an async stream. Data and execution clients call this once to drive their
message processing loop:
impl MyWebSocketClient {
pub fn stream(&mut self) -> impl Stream<Item = MyWsMessage> + 'static {
let rx = self
.out_rx
.take()
.expect("Stream receiver already taken or not connected");
let mut rx = Arc::try_unwrap(rx)
.expect("Cannot take ownership - other references exist");
async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield msg;
}
}
}
}
The data/execution client consumes the stream in a tokio::select! loop with a
cancellation token or stop signal, matching on {Venue}WsMessage variants and calling
parse functions to produce Nautilus domain types.
subscription.rs)When a venue's subscription topics have complex structure (multiple parameter types,
instrument type / family / ID variants, candle width encoding), extract topic building
and parsing into websocket/subscription.rs. This keeps client.rs focused on
connection lifecycle and handler.rs focused on I/O.
For venues with simple {channel}:{symbol} topics, inline helpers in the client are
sufficient and a separate module is not needed.
Define handler-specific tuning constants for consistent behavior:
| Constant | Purpose | Typical value |
|---|---|---|
DEFAULT_HEARTBEAT_SECS | Interval for sending keep‑alive messages. | 15-30 |
WEBSOCKET_AUTH_WINDOW_MS | Maximum age for authentication timestamps. | 5000-30000 |
BATCH_PROCESSING_LIMIT | Maximum messages processed per event loop cycle. | 100-1000 |
Place these in websocket/handler.rs or common/consts.rs depending on scope.
The handler uses two message enums to separate wire deserialization from emitted events. The data and execution client layers convert emitted events into Nautilus domain types.
Define two enums:
{Venue}WsFrame: Serde-deserialized wire frames. Contains every JSON shape the venue
can send (login responses, subscription acks, channel data, order responses, errors, pings).
Typically pub(super) since only the handler uses it.
{Venue}WsMessage: Handler output events emitted on out_tx. Contains the subset of
wire data the client needs plus synthetic control variants (Reconnected, Authenticated,
SendFailed) that have no wire representation. This is the pub type consumers match on.
The handler deserializes raw text into {Venue}WsFrame, handles control frames internally
(subscription acks, login, pings), and converts relevant frames into {Venue}WsMessage events
sent via out_tx. The client receives from out_rx and routes to data/execution callbacks,
which convert venue types to Nautilus domain types using parse functions.
Types prefixed with the venue name (e.g., OKX, Bitmex) contain raw exchange-specific types.
Types prefixed with Nautilus contain normalized domain types ready for the trading system.
Wire frame enum (serde-deserialized, handler-internal):
pub(super) enum MyWsFrame {
Login { event, code, msg, conn_id },
Subscription { event, arg, conn_id, code, msg },
OrderResponse { id, op, code, msg, data },
BookData { arg, action, data: Vec<MyBookMsg> },
Data { arg, data: Value },
Error { code, msg },
Ping,
Reconnected,
}
Handler output enum (emitted to client):
pub enum MyWsMessage {
BookData { arg, action, data: Vec<MyBookMsg> },
ChannelData { channel, inst_id, data: Value },
Orders(Vec<MyOrderMsg>),
OrderResponse { id, op, code, msg, data },
SendFailed { request_id, client_order_id, op, error },
Instruments(Vec<MyInstrument>),
Error(MyWebSocketError),
Reconnected,
Authenticated,
}
The frame enum includes every wire shape (login acks, subscription acks, pings) for
deserialization. The output enum drops shapes the handler consumes internally and adds synthetic
variants (Authenticated, SendFailed) that originate in handler logic, not on the wire.
Include OrderResponse for venue acknowledgements (place, cancel, amend) and SendFailed for
WebSocket send failures after retries are exhausted. The execution client dispatch layer converts
these into Nautilus rejection events (OrderRejected, OrderCancelRejected, etc.).
Conversion in data/exec client:
The data client's message loop matches on {Venue}WsMessage variants and calls parse functions
to produce Nautilus domain types (Data, OrderBookDeltas, etc.). The execution client's
dispatch layer handles OrderResponse, SendFailed, and Orders variants. This keeps
the handler focused on I/O and deserialization while the client layers own domain conversion.
The execution dispatch converts order and fill messages using a two-tier routing contract:
Orders(Vec<MyOrderMsg>)).OrderAccepted, OrderCanceled,
OrderFilled, etc.) and synthesize any missing lifecycle events (e.g., OrderAccepted
before a fast fill).OrderStatusReport or FillReport) for
downstream reconciliation.WsDispatchStateExecution dispatch state lives in a WsDispatchState struct defined in websocket/dispatch.rs.
It tracks which lifecycle events have already been emitted to prevent duplicates across
reconnections and fast-fill races:
#[derive(Debug, Default)]
pub struct WsDispatchState {
pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
pub emitted_accepted: DashSet<ClientOrderId>,
pub triggered_orders: DashSet<ClientOrderId>,
pub filled_orders: DashSet<ClientOrderId>,
clearing: AtomicBool,
}
| Field | Purpose |
|---|---|
order_identities | Maps client order ID to identity metadata set at submission. |
emitted_accepted | Prevents duplicate OrderAccepted events. |
triggered_orders | Tracks conditional orders that have triggered. |
filled_orders | Prevents duplicate OrderFilled events on reconnect replay. |
clearing | Guards concurrent eviction when sets reach capacity. |
Each DashSet is bounded by a DEDUP_CAPACITY constant (typically 10,000). When a set
reaches capacity, evict_if_full() clears it atomically using a compare-exchange on the
clearing flag to prevent concurrent clears.
The dispatch_ws_message() free function in the same module routes {Venue}WsMessage
variants to the appropriate order event builders, using WsDispatchState for dedup
and OrderIdentity for tracked-vs-external classification.
WsDispatchState prevents duplicate lifecycle events within a single stream. When an
adapter receives fills from multiple sources (WebSocket user data and HTTP reconciliation),
a separate trade-ID-level dedup is needed to prevent the same fill from being emitted twice.
The BoundedDedup<T> pattern addresses this with a fixed-capacity set backed by a
VecDeque for insertion order and an AHashSet for O(1) lookup. When the set reaches
capacity, the oldest entry is evicted (FIFO). The insert() method returns true if the
value was already present, signaling a duplicate:
struct BoundedDedup<T> {
order: VecDeque<T>,
set: AHashSet<T>,
capacity: usize,
}
Use this in the execution client to track trade IDs (typically as (Ustr, i64) tuples
of symbol and trade ID). A capacity of 10,000 provides sufficient coverage for most
venues without unbounded memory growth.
Channel send failures (client → handler) should propagate loudly as Result<(), Error>:
impl MyWebSocketClient {
async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), Error> {
self.cmd_tx.read().await.send(cmd)
.map_err(|e| Error::ClientError(format!("Handler not available: {e}")))
}
pub async fn submit_order(...) -> Result<(), Error> {
let cmd = HandlerCommand::PlaceOrder { ... };
self.send_cmd(cmd).await // Propagates channel failures
}
}
WebSocket send failures (handler → network) should be retried by the handler using RetryManager:
pub struct MyWsFeedHandler {
inner: Option<WebSocketClient>,
retry_manager: RetryManager<MyWsError>,
// ...
}
impl MyWsFeedHandler {
async fn send_with_retry(&self, payload: String, rate_limit_keys: Option<Vec<String>>) -> Result<(), MyWsError> {
if let Some(client) = &self.inner {
self.retry_manager.execute_with_retry(
"websocket_send",
|| async {
client.send_text(payload.clone(), rate_limit_keys.clone())
.await
.map_err(|e| MyWsError::ClientError(format!("Send failed: {e}")))
},
should_retry_error,
create_timeout_error,
).await
} else {
Err(MyWsError::ClientError("No active WebSocket client".to_string()))
}
}
async fn handle_place_order(...) -> anyhow::Result<()> {
let payload = serde_json::to_string(&request)?;
match self.send_with_retry(payload, Some(vec![RATE_LIMIT_KEY])).await {
Ok(()) => Ok(()),
Err(e) => {
// Emit SendFailed so the exec client dispatch can produce OrderRejected
let _ = self.out_tx.send(MyWsMessage::SendFailed {
request_id: request_id.clone(),
client_order_id: Some(client_order_id),
op: Some(MyWsOperation::Order),
error: e.to_string(),
});
Err(anyhow::anyhow!("Failed to send order: {e}"))
}
}
}
}
fn should_retry_error(error: &MyWsError) -> bool {
match error {
MyWsError::NetworkError(_) | MyWsError::Timeout(_) => true,
MyWsError::AuthenticationError(_) | MyWsError::ParseError(_) => false,
}
}
Key principles:
SendFailed when retries are exhausted; the exec client dispatch converts
these into Nautilus rejection events (OrderRejected, OrderCancelRejected).RetryManager from nautilus_network::retry for consistent backoff.Adapters follow standardized naming conventions for consistency across all venue integrations.
raw → msg → outWebSocket message channels follow a two-stage transformation pipeline within the handler:
| Stage | Type | Description | Example |
|---|---|---|---|
raw | Raw WebSocket frames | Bytes/text from the network layer. | raw_rx: UnboundedReceiver<Message> |
out | Venue‑specific messages | Parsed venue message types. | out_tx: UnboundedSender<MyWsMessage> |
The handler deserializes raw frames into venue-specific types and emits them on out_tx.
The data and execution client layers then convert venue types into Nautilus domain types.
Example flow:
// Client creates output channel for venue messages
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel(); // Venue messages (MyWsMessage)
// Handler receives raw frames, outputs venue messages
let handler = MyWsFeedHandler::new(
cmd_rx,
raw_rx, // Input: Message (raw WebSocket frames)
out_tx, // Output: MyWsMessage
// ...
);
Channel names reflect the data transformation stage, not the destination. Use raw_* for raw
WebSocket frames (Message) and out_* for venue-specific message types.
WebSocket channels on latency-sensitive paths are intentionally unbounded. The platform prioritizes latency and prefers an explicit crash (OOM) over delaying or dropping data.
:::note Do not add bounded channels, buffering limits, or backpressure unless the latency requirement changes. :::
inner and command channelsStructs holding references to lower-level components follow these conventions:
| Field | Type | Description |
|---|---|---|
inner | Option<WebSocketClient> | Network‑level WebSocket client (handler only, exclusively owned). |
cmd_tx | Arc<tokio::sync::RwLock<UnboundedSender<...>>> | Command channel to handler (client side). |
cmd_rx | UnboundedReceiver<HandlerCommand> | Command channel from client (handler side). |
out_tx | UnboundedSender<{Venue}WsMessage> | Output channel to client (handler side). |
out_rx | Option<Arc<UnboundedReceiver<{Venue}WsMessage>>> | Output channel from handler (client side). |
task_handle | Option<Arc<JoinHandle<()>>> | Handler task handle. |
Example:
// Client struct
pub struct MyWebSocketClient {
cmd_tx: Arc<tokio::sync::RwLock<UnboundedSender<HandlerCommand>>>,
out_rx: Option<Arc<UnboundedReceiver<MyWsMessage>>>,
task_handle: Option<Arc<JoinHandle<()>>>,
connection_mode: Arc<ArcSwap<AtomicU8>>, // Lock-free connection state
// ...
}
impl MyWebSocketClient {
async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), Error> {
self.cmd_tx.read().await.send(cmd)
.map_err(|e| Error::ClientError(format!("Handler not available: {e}")))
}
}
// Handler struct
pub(super) struct MyWsFeedHandler {
inner: Option<WebSocketClient>, // Exclusively owned - no RwLock
cmd_rx: UnboundedReceiver<HandlerCommand>,
raw_rx: UnboundedReceiver<Message>,
out_tx: UnboundedSender<MyWsMessage>,
pending_requests: AHashMap<String, RequestData>, // Single-threaded - no locks
pending_messages: VecDeque<MyWsMessage>, // Multi-message buffer
// ...
}
The handler exclusively owns WebSocketClient without locks. The client sends commands via
cmd_tx (wrapped in RwLock to allow reconnection channel replacement) and receives events
via out_rx. Use a send_cmd() helper to standardize command sending.
{Venue}Ws{TypeSuffix}All WebSocket-related types follow a standardized naming pattern: {Venue}Ws{TypeSuffix}
{Venue}: Capitalized venue name (e.g., OKX, Bybit, Bitmex, Hyperliquid).Ws: Abbreviated "WebSocket" (not fully spelled out).{TypeSuffix}: Full type descriptor (e.g., Message, Error, Request, Response).Examples:
// Correct - abbreviated Ws, full type suffix
pub enum OKXWsMessage { ... }
pub enum BybitWsError { ... }
pub struct HyperliquidWsRequest { ... }
Standard type suffixes:
Message: WebSocket message enums.Error: WebSocket error types.Request: Request message types.Response: Response message types.Tokio channel qualification:
Always fully qualify tokio channel types as tokio::sync::mpsc:: to avoid ambiguity with
similarly-named types from other crates. Never import mpsc directly at module level.
// Correct
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<MyMessage>();
Some venues expose multiple WebSocket endpoints with distinct protocols or encodings.
When a venue requires separate connections for market data and order management, split
the websocket/ module into submodules that mirror the connection boundaries:
src/
├── websocket/
│ ├── mod.rs # Re-exports from submodules
│ ├── streams/ # Market data pub/sub connection
│ │ ├── client.rs # Streams client
│ │ ├── handler.rs # Streams feed handler
│ │ ├── messages.rs # Streams message types
│ │ └── mod.rs
│ └── trading/ # Order management + user data (authenticated WS API)
│ ├── client.rs # Trading client
│ ├── handler.rs # Trading handler
│ ├── messages.rs # Trading message types
│ ├── user_data.rs # User data stream venue types (execution reports, etc.)
│ ├── parse.rs # Parse functions for user data -> Nautilus types
│ ├── error.rs # Trading error types
│ └── mod.rs
Each submodule follows the same two-layer client/handler pattern described above. The
parent websocket/mod.rs re-exports the public client types.
The trading/ module handles both order operations (place, cancel, modify) and the
user data stream (execution reports, account updates). When the venue's authenticated
WebSocket API supports session.logon and inline user data subscriptions, both
concerns share a single authenticated connection. This avoids a separate execution/
module and the deprecated REST listenKey lifecycle.
For venues where user data events arrive on a separate stream connection (e.g.,
futures APIs that return a listenKey for a dedicated stream URL), the streams/
handler dispatches both market data and user data events from the combined connection.
Type names include the submodule qualifier to avoid ambiguity:
| Submodule | Command type | Message type |
|---|---|---|
streams/ | {Venue}WsStreamsCommand | {Venue}WsMessage (venue types) |
trading/ | {Venue}WsTradingCommand | {Venue}WsTradingMessage |
The {Venue}Ws prefix follows the standard type naming convention. The qualifier
(Streams, Trading) distinguishes types that would otherwise collide across
submodules.
Split the WebSocket module when the venue has:
ws-api style) alongside pub/sub streamsDo not split when a single connection handles all message types through channel-based multiplexing (the common pattern for OKX, Bybit, and similar venues).
Some venues use the same WebSocket protocol for all product types but serve them on separate endpoints (e.g., Bybit provides distinct URLs for Linear, Spot, and Inverse). In this case the data client creates one WebSocket client per product type and manages them in a map:
pub struct MyDataClient {
ws_clients: AHashMap<MyProductType, MyWebSocketClient>,
}
Each client follows the same two-layer client/handler pattern. Subscription routing inspects the instrument's product type to select the correct client. On connect, the data client iterates the map to connect all clients; on disconnect, it closes them all.
This differs from the split architecture (streams/ vs trading/) which separates by
protocol or purpose. Multi-product management separates by product type while sharing
the same protocol.
Use the following conventions when mirroring upstream schemas in Rust.
http::models and http::query)src/http/models.rs and derive serde::Deserialize (add serde::Serialize when the adapter sends data back).#[serde(rename_all = "camelCase")] or #[serde(rename_all = "snake_case")]; only add per-field renames when the upstream key would be an invalid Rust identifier or collide with a keyword (for example #[serde(rename = "type")] pub order_type: String).src/http/query.rs, deriving serde::Serialize to remain type-safe and reusing constants from common::consts instead of duplicating literals.websocket::messages)src/websocket/messages.rs, giving each venue topic a struct or enum that mirrors the upstream JSON.#[serde(tag = "op")] or #[serde(flatten)] and document the choice.spawn_task)Data and execution clients spawn background tasks for WebSocket stream processing,
periodic polling, and order submission. Wrap all spawned work with a spawn_task()
method that provides error logging and handle tracking:
fn spawn_task<F>(&self, description: &'static str, fut: F)
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let runtime = get_runtime();
let handle = runtime.spawn(async move {
if let Err(e) = fut.await {
log::warn!("{description} failed: {e:?}");
}
});
let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
tasks.retain(|handle| !handle.is_finished());
tasks.push(handle);
}
Store task handles in pending_tasks: Mutex<Vec<JoinHandle<()>>>. Each call to
spawn_task prunes finished handles before pushing the new one, preventing unbounded
growth. On disconnect, abort all remaining handles.
block_on in trait methodsThe live runner calls sync ExecutionClient and DataClient trait methods from within a
tokio runtime. Using runtime.block_on() in these methods panics with
"Cannot start a runtime from within a runtime". Use spawn_task instead:
// Wrong: panics at runtime
fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
get_runtime().block_on(async { self.http_client.get_order(&id).await })
}
// Correct: clone what you need, spawn, return immediately
fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
let http_client = self.http_client.clone();
let emitter = self.emitter.clone();
self.spawn_task("query_order", async move {
let report = http_client.get_order(&id).await?;
emitter.send_order_status_report(report);
Ok(())
});
Ok(())
}
block_on is valid in contexts that run outside a tokio runtime:
| Context | Why safe |
|---|---|
PyO3 #[pymethods] | Called from Python, no ambient runtime |
Binary main() functions | Top‑level entry point, runtime not yet started |
| Dedicated background threads | Thread created outside tokio's worker pool |
block_in_place wrapper | Moves the thread out of the worker pool first |
| Test code with own runtime | Runtime::new() creates an isolated runtime |
CancellationTokenUse tokio_util::sync::CancellationToken to coordinate shutdown across multiple spawned
tasks. The client creates a token at construction and passes clones to each spawned task.
Tasks select on the token alongside their primary work:
tokio::select! {
msg = stream.next() => { /* process */ }
_ = cancellation_token.cancelled() => { break; }
}
On disconnect, the client cancels the token, which signals all tasks to exit their loops.
This complements the handler-level signal: Arc<AtomicBool> pattern: AtomicBool gates
the handler's I/O loop, while CancellationToken coordinates shutdown of tasks the client
spawned outside the handler (polling loops, reconciliation tasks, stream consumers).
Reset the token on reconnect by replacing it with a fresh CancellationToken::new() so
subsequent tasks are not born cancelled.
Adapters should ship two layers of coverage: the Rust crate that talks to the venue and the Python glue that exposes it to the wider platform. Keep the suites deterministic and colocated with the production code they protect.
Key principle: The tests/ directory is reserved for integration tests that require external infrastructure (mock Axum servers, simulated network conditions).
Unit tests for parsing, serialization, and business logic belong in #[cfg(test)] blocks within source modules.
crates/adapters/your_adapter/
├── src/
│ ├── http/
│ │ ├── client.rs # HTTP client + unit tests
│ │ └── parse.rs # REST payload parsers + unit tests
│ └── websocket/
│ ├── client.rs # WebSocket client + unit tests
│ └── parse.rs # Streaming parsers + unit tests
├── tests/ # Integration tests (mock servers)
│ ├── data_client.rs # Data client integration tests
│ ├── exec_client.rs # Execution client integration tests
│ ├── http.rs # HTTP client integration tests
│ └── websocket.rs # WebSocket client integration tests
└── test_data/ # Canonical venue payloads used by the suites
├── http_{method}_{endpoint}.json # Full venue responses with retCode/result/time
└── ws_{message_type}.json # WebSocket message samples
| File | Purpose |
|---|---|
tests/data_client.rs | Integration tests for the data client. Validates data subscriptions, historical data requests, and market data parsing. |
tests/exec_client.rs | Integration tests for the execution client. Validates order submission, modification, cancellation, and execution reports. |
tests/http.rs | Low‑level HTTP client tests. Validates request signing, error handling, and response parsing against mock Axum servers. |
tests/websocket.rs | WebSocket client tests. Validates connection lifecycle, authentication, subscriptions, and message routing. |
Guidelines:
#[cfg(test)] blocks). Use src/common/testing.rs (or an equivalent helper module) for shared fixtures so production files stay tidy.crates/adapters/<adapter>/tests/, mirroring the public APIs (HTTP client, WebSocket client, data client, execution client).data_client.rs, exec_client.rs) should focus on higher-level behavior: subscription workflows, order lifecycle, and domain model transformations. HTTP and WebSocket tests (http.rs, websocket.rs) focus on transport-level concerns.test_data/ and reference them from both unit and integration tests. Name test data files consistently: http_get_{endpoint_name}.json for REST responses, ws_{message_type}.json for WebSocket messages. Include complete venue response envelopes (status codes, timestamps, result wrappers) rather than just the data payload. Provide multiple realistic examples in each file - for instance, position data should include long, short, and flat positions to exercise all parser branches.Unit tests belong in #[cfg(test)] blocks within source modules, not in the tests/ directory.
What to test (in source modules):
What NOT to test:
Tests should exercise production code paths. If a test only verifies that Vec::extend() works or that chrono can parse a date string, it provides no value.
WebSocket unit tests exercise three areas: message deserialization, parse dispatch, and handler
logic. Each area lives in a #[cfg(test)] block within the module it tests.
Message types (messages.rs):
test_data/.skip_serializing_if attributes, and precision
loss that deserialization-only tests miss.Parse functions (parse.rs):
Handler logic (handler.rs):
Reconnected variant.next().Integration tests belong in the tests/ directory and exercise the public API against mock infrastructure.
What to test (in tests/ directory):
At a minimum, review existing adapter test suites for reference patterns and verify every adapter proves the same core behaviours.
OkxError/BitmexHttpError variant so the retry policy can react.after, before, limit, etc.).{Venue}WsMessage variant.CI robustness:
tokio::time::sleep() with arbitrary durations. Tests become flaky under CI load and slower than necessary.wait_until_async test helper to poll for conditions with timeout. Tests return
immediately when the condition is met and fail deterministically on timeout rather than
relying on arbitrary sleep durations.subscription_events, track pending/confirmed topics, wait for connection_count transitions).Data (tests/data_client.rs) and execution (tests/exec_client.rs) client integration tests verify the full message flow from WebSocket through parsing to event emission.
Test infrastructure:
| Component | Purpose |
|---|---|
| Mock Axum server | Serves HTTP endpoints (instruments, fee rates, positions) and WebSocket channels. |
TestServerState | Tracks connections, subscriptions, and authentication state for assertions. |
| Thread‑local event channels | set_data_event_sender() / set_exec_event_sender() for capturing emitted events. |
wait_until_async | Polls conditions with timeout for deterministic async assertions. |
Data client coverage:
| Test scenario | Validates |
|---|---|
| Connect/disconnect | Connection lifecycle, WebSocket establishment, clean shutdown. |
| Subscribe trades | Trade tick events emitted to data channel. |
| Subscribe quotes | Quote events from ticker (LINEAR) or orderbook (SPOT). |
| Subscribe book deltas | OrderBookDeltas events from orderbook snapshots/updates. |
| Subscribe mark/index prices | Filtered by subscription state (only emit when subscribed). |
| Reset state | Subscription tracking cleared, connection terminated. |
| Instruments on connect | Instrument events emitted during connection setup. |
Execution client coverage:
| Test scenario | Validates |
|---|---|
| Connect/disconnect | Auth handshake, private + trade WS connections, subscriptions. |
| Demo mode | Only private WS connects (trade WS skipped for HTTP fallback). |
| Order submission | Order accepted/rejected events, venue ID correlation. |
| Order modification/cancel | Update and cancel acknowledgment events. |
| Position/wallet updates | PositionStatusReport and AccountState events. |
Key patterns:
#[tokio::test] runs on a fresh thread, ensuring thread-local channel isolation.wait_until_async for subscription/connection state instead of arbitrary sleeps.TestServerState before asserting on emitted events.tests/integration_tests/adapters/your_adapter/
├── conftest.py # Shared fixtures (mock clients, test instruments)
├── test_data.py # Data client integration tests
├── test_execution.py # Execution client integration tests
├── test_providers.py # Instrument provider tests
├── test_factories.py # Factory and configuration tests
└── __init__.py # Package initialization
| File | Purpose |
|---|---|
test_data.py | Tests for LiveDataClient and LiveMarketDataClient. Validates subscriptions, data parsing, and message handling. |
test_execution.py | Tests for LiveExecutionClient. Validates order submission, modification, cancellation, and execution reports. |
test_providers.py | Tests for InstrumentProvider. Validates instrument loading, filtering, and caching behavior. |
test_factories.py | Tests for factory functions. Validates client instantiation and configuration wiring. |
Guidelines:
tests/integration_tests/adapters/<adapter>/.nautilus_pyo3 shims, stubbed Rust clients) so tests stay fast while verifying that configuration, factory wiring, and error handling match the exported Rust API.All adapter documentation (module-level docs, doc comments, and inline comments) should follow the Documentation Style Guide.
Every Rust module, struct, and public method must have documentation comments. Use third-person declarative voice (e.g., "Returns the account ID" not "Return the account ID").
//! doc comments at the top of each file (after the license header) to describe the module's purpose./// doc comments above struct definitions. Keep descriptions concise; one sentence is often sufficient.pub fn and pub async fn must have a /// doc comment describing what the method does.
Do not document individual parameters in a separate # Arguments section. The type signatures and names should be self-explanatory.
Parameters may be mentioned in the description when behavior is complex or non-obvious.What NOT to document:
python/ module (PyO3 bindings). Documentation conventions are TBD (may use numpydoc specification).Step-by-step guide to building the Python layer of an adapter using the provided template.
When implementing adapter classes, group methods by category in this order:
_connect, _disconnect_subscribe, _subscribe_*_unsubscribe, _unsubscribe_*_request, _request_*This keeps related functionality together rather than interleaving subscribe/unsubscribe pairs.
The InstrumentProvider loads instrument definitions from the venue: all instruments, specific
instruments by ID, or a filtered subset.
from nautilus_trader.common.providers import InstrumentProvider
from nautilus_trader.model import InstrumentId
class TemplateInstrumentProvider(InstrumentProvider):
"""Example `InstrumentProvider` showing the minimal overrides required for a complete integration."""
async def load_all_async(self, filters: dict | None = None) -> None:
raise NotImplementedError("implement `load_all_async` in your adapter subclass")
async def load_ids_async(self, instrument_ids: list[InstrumentId], filters: dict | None = None) -> None:
raise NotImplementedError("implement `load_ids_async` in your adapter subclass")
async def load_async(self, instrument_id: InstrumentId, filters: dict | None = None) -> None:
raise NotImplementedError("implement `load_async` in your adapter subclass")
| Method | Description |
|---|---|
load_all_async | Loads all instruments asynchronously, optionally with filters. |
load_ids_async | Loads specific instruments by their IDs. |
load_async | Loads a single instrument by its ID. |
The LiveDataClient handles data feeds that are not market data: news feeds, custom data streams,
or other non-market sources.
from nautilus_trader.data.messages import RequestData
from nautilus_trader.data.messages import SubscribeData
from nautilus_trader.data.messages import UnsubscribeData
from nautilus_trader.live.data_client import LiveDataClient
from nautilus_trader.model import DataType
class TemplateLiveDataClient(LiveDataClient):
"""Example `LiveDataClient` showing the overridable abstract methods."""
async def _connect(self) -> None:
raise NotImplementedError("implement `_connect` in your adapter subclass")
async def _disconnect(self) -> None:
raise NotImplementedError("implement `_disconnect` in your adapter subclass")
async def _subscribe(self, command: SubscribeData) -> None:
raise NotImplementedError("implement `_subscribe` in your adapter subclass")
async def _unsubscribe(self, command: UnsubscribeData) -> None:
raise NotImplementedError("implement `_unsubscribe` in your adapter subclass")
async def _request(self, request: RequestData) -> None:
raise NotImplementedError("implement `_request` in your adapter subclass")
| Method | Description |
|---|---|
_connect | Establishes a connection to the data provider. |
_disconnect | Closes the connection to the data provider. |
_subscribe | Subscribes to a specific data type. |
_unsubscribe | Unsubscribes from a specific data type. |
_request | Requests data from the provider. |
The MarketDataClient handles market-specific data: order books, top-of-book quotes and trades,
instrument status updates, and historical data requests.
from nautilus_trader.data.messages import RequestBars
from nautilus_trader.data.messages import RequestData
from nautilus_trader.data.messages import RequestInstrument
from nautilus_trader.data.messages import RequestInstruments
from nautilus_trader.data.messages import RequestOrderBookDeltas
from nautilus_trader.data.messages import RequestOrderBookDepth
from nautilus_trader.data.messages import RequestOrderBookSnapshot
from nautilus_trader.data.messages import RequestQuoteTicks
from nautilus_trader.data.messages import RequestTradeTicks
from nautilus_trader.data.messages import SubscribeBars
from nautilus_trader.data.messages import SubscribeData
from nautilus_trader.data.messages import SubscribeFundingRates
from nautilus_trader.data.messages import SubscribeIndexPrices
from nautilus_trader.data.messages import SubscribeInstrument
from nautilus_trader.data.messages import SubscribeInstrumentClose
from nautilus_trader.data.messages import SubscribeInstruments
from nautilus_trader.data.messages import SubscribeInstrumentStatus
from nautilus_trader.data.messages import SubscribeMarkPrices
from nautilus_trader.data.messages import SubscribeOrderBook
from nautilus_trader.data.messages import SubscribeQuoteTicks
from nautilus_trader.data.messages import SubscribeTradeTicks
from nautilus_trader.data.messages import UnsubscribeBars
from nautilus_trader.data.messages import UnsubscribeData
from nautilus_trader.data.messages import UnsubscribeFundingRates
from nautilus_trader.data.messages import UnsubscribeIndexPrices
from nautilus_trader.data.messages import UnsubscribeInstrument
from nautilus_trader.data.messages import UnsubscribeInstrumentClose
from nautilus_trader.data.messages import UnsubscribeInstruments
from nautilus_trader.data.messages import UnsubscribeInstrumentStatus
from nautilus_trader.data.messages import UnsubscribeMarkPrices
from nautilus_trader.data.messages import UnsubscribeOrderBook
from nautilus_trader.data.messages import UnsubscribeQuoteTicks
from nautilus_trader.data.messages import UnsubscribeTradeTicks
from nautilus_trader.live.data_client import LiveMarketDataClient
class TemplateLiveMarketDataClient(LiveMarketDataClient):
"""Example `LiveMarketDataClient` showing the overridable abstract methods."""
async def _connect(self) -> None:
raise NotImplementedError("implement `_connect` in your adapter subclass")
async def _disconnect(self) -> None:
raise NotImplementedError("implement `_disconnect` in your adapter subclass")
async def _subscribe(self, command: SubscribeData) -> None:
raise NotImplementedError("implement `_subscribe` in your adapter subclass")
async def _subscribe_instruments(self, command: SubscribeInstruments) -> None:
raise NotImplementedError("implement `_subscribe_instruments` in your adapter subclass")
async def _subscribe_instrument(self, command: SubscribeInstrument) -> None:
raise NotImplementedError("implement `_subscribe_instrument` in your adapter subclass")
async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None:
raise NotImplementedError("implement `_subscribe_order_book_deltas` in your adapter subclass")
async def _subscribe_order_book_depth(self, command: SubscribeOrderBook) -> None:
raise NotImplementedError("implement `_subscribe_order_book_depth` in your adapter subclass")
async def _subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None:
raise NotImplementedError("implement `_subscribe_quote_ticks` in your adapter subclass")
async def _subscribe_trade_ticks(self, command: SubscribeTradeTicks) -> None:
raise NotImplementedError("implement `_subscribe_trade_ticks` in your adapter subclass")
async def _subscribe_mark_prices(self, command: SubscribeMarkPrices) -> None:
raise NotImplementedError("implement `_subscribe_mark_prices` in your adapter subclass")
async def _subscribe_index_prices(self, command: SubscribeIndexPrices) -> None:
raise NotImplementedError("implement `_subscribe_index_prices` in your adapter subclass")
async def _subscribe_bars(self, command: SubscribeBars) -> None:
raise NotImplementedError("implement `_subscribe_bars` in your adapter subclass")
async def _subscribe_funding_rates(self, command: SubscribeFundingRates) -> None:
raise NotImplementedError("implement `_subscribe_funding_rates` in your adapter subclass")
async def _subscribe_instrument_status(self, command: SubscribeInstrumentStatus) -> None:
raise NotImplementedError("implement `_subscribe_instrument_status` in your adapter subclass")
async def _subscribe_instrument_close(self, command: SubscribeInstrumentClose) -> None:
raise NotImplementedError("implement `_subscribe_instrument_close` in your adapter subclass")
async def _subscribe_option_greeks(self, command: SubscribeOptionGreeks) -> None:
raise NotImplementedError("implement `_subscribe_option_greeks` in your adapter subclass")
async def _unsubscribe(self, command: UnsubscribeData) -> None:
raise NotImplementedError("implement `_unsubscribe` in your adapter subclass")
async def _unsubscribe_instruments(self, command: UnsubscribeInstruments) -> None:
raise NotImplementedError("implement `_unsubscribe_instruments` in your adapter subclass")
async def _unsubscribe_instrument(self, command: UnsubscribeInstrument) -> None:
raise NotImplementedError("implement `_unsubscribe_instrument` in your adapter subclass")
async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None:
raise NotImplementedError("implement `_unsubscribe_order_book_deltas` in your adapter subclass")
async def _unsubscribe_order_book_depth(self, command: UnsubscribeOrderBook) -> None:
raise NotImplementedError("implement `_unsubscribe_order_book_depth` in your adapter subclass")
async def _unsubscribe_quote_ticks(self, command: UnsubscribeQuoteTicks) -> None:
raise NotImplementedError("implement `_unsubscribe_quote_ticks` in your adapter subclass")
async def _unsubscribe_trade_ticks(self, command: UnsubscribeTradeTicks) -> None:
raise NotImplementedError("implement `_unsubscribe_trade_ticks` in your adapter subclass")
async def _unsubscribe_mark_prices(self, command: UnsubscribeMarkPrices) -> None:
raise NotImplementedError("implement `_unsubscribe_mark_prices` in your adapter subclass")
async def _unsubscribe_index_prices(self, command: UnsubscribeIndexPrices) -> None:
raise NotImplementedError("implement `_unsubscribe_index_prices` in your adapter subclass")
async def _unsubscribe_bars(self, command: UnsubscribeBars) -> None:
raise NotImplementedError("implement `_unsubscribe_bars` in your adapter subclass")
async def _unsubscribe_funding_rates(self, command: UnsubscribeFundingRates) -> None:
raise NotImplementedError("implement `_unsubscribe_funding_rates` in your adapter subclass")
async def _unsubscribe_instrument_status(self, command: UnsubscribeInstrumentStatus) -> None:
raise NotImplementedError("implement `_unsubscribe_instrument_status` in your adapter subclass")
async def _unsubscribe_instrument_close(self, command: UnsubscribeInstrumentClose) -> None:
raise NotImplementedError("implement `_unsubscribe_instrument_close` in your adapter subclass")
async def _unsubscribe_option_greeks(self, command: UnsubscribeOptionGreeks) -> None:
raise NotImplementedError("implement `_unsubscribe_option_greeks` in your adapter subclass")
async def _request(self, request: RequestData) -> None:
raise NotImplementedError("implement `_request` in your adapter subclass")
async def _request_instrument(self, request: RequestInstrument) -> None:
raise NotImplementedError("implement `_request_instrument` in your adapter subclass")
async def _request_instruments(self, request: RequestInstruments) -> None:
raise NotImplementedError("implement `_request_instruments` in your adapter subclass")
async def _request_order_book_deltas(self, request: RequestOrderBookDeltas) -> None:
raise NotImplementedError("implement `_request_order_book_deltas` in your adapter subclass")
async def _request_order_book_depth(self, request: RequestOrderBookDepth) -> None:
raise NotImplementedError("implement `_request_order_book_depth` in your adapter subclass")
async def _request_order_book_snapshot(self, request: RequestOrderBookSnapshot) -> None:
raise NotImplementedError("implement `_request_order_book_snapshot` in your adapter subclass")
async def _request_quote_ticks(self, request: RequestQuoteTicks) -> None:
raise NotImplementedError("implement `_request_quote_ticks` in your adapter subclass")
async def _request_trade_ticks(self, request: RequestTradeTicks) -> None:
raise NotImplementedError("implement `_request_trade_ticks` in your adapter subclass")
async def _request_bars(self, request: RequestBars) -> None:
raise NotImplementedError("implement `_request_bars` in your adapter subclass")
| Method | Description |
|---|---|
_connect | Establishes a connection to the venue APIs. |
_disconnect | Closes the connection to the venue APIs. |
_subscribe | Subscribes to generic data (base for custom types). |
_subscribe_instruments | Subscribes to market data for multiple instruments. |
_subscribe_instrument | Subscribes to market data for a single instrument. |
_subscribe_order_book_deltas | Subscribes to order book delta updates. |
_subscribe_order_book_depth | Subscribes to order book depth updates. |
_subscribe_quote_ticks | Subscribes to top‑of‑book quote updates. |
_subscribe_trade_ticks | Subscribes to trade tick updates. |
_subscribe_mark_prices | Subscribes to mark price updates. |
_subscribe_index_prices | Subscribes to index price updates. |
_subscribe_bars | Subscribes to bar/candlestick updates. |
_subscribe_funding_rates | Subscribes to funding rate updates. |
_subscribe_instrument_status | Subscribes to instrument status updates. |
_subscribe_instrument_close | Subscribes to instrument close price updates. |
_subscribe_option_greeks | Subscribes to option greeks updates. |
_unsubscribe | Unsubscribes from generic data (base for custom types). |
_unsubscribe_instruments | Unsubscribes from market data for multiple instruments. |
_unsubscribe_instrument | Unsubscribes from market data for a single instrument. |
_unsubscribe_order_book_deltas | Unsubscribes from order book delta updates. |
_unsubscribe_order_book_depth | Unsubscribes from order book depth updates. |
_unsubscribe_quote_ticks | Unsubscribes from quote tick updates. |
_unsubscribe_trade_ticks | Unsubscribes from trade tick updates. |
_unsubscribe_mark_prices | Unsubscribes from mark price updates. |
_unsubscribe_index_prices | Unsubscribes from index price updates. |
_unsubscribe_bars | Unsubscribes from bar updates. |
_unsubscribe_funding_rates | Unsubscribes from funding rate updates. |
_unsubscribe_instrument_status | Unsubscribes from instrument status updates. |
_unsubscribe_instrument_close | Unsubscribes from instrument close price updates. |
_unsubscribe_option_greeks | Unsubscribes from option greeks updates. |
_request | Requests generic data (base for custom types). |
_request_instrument | Requests historical data for a single instrument. |
_request_instruments | Requests historical data for multiple instruments. |
_request_order_book_snapshot | Requests an order book snapshot. |
_request_order_book_depth | Requests order book depth. |
_request_order_book_deltas | Requests historical order book deltas. |
_request_quote_ticks | Requests historical quote tick data. |
_request_trade_ticks | Requests historical trade tick data. |
_request_bars | Requests historical bar data. |
_request_funding_rates | Requests historical funding rate data. |
When implementing _subscribe_order_book_deltas or streaming order book
data, adapters must set RecordFlag flags correctly on each
OrderBookDelta. See also Delta flags and event boundaries.
F_LAST: Set on the last delta of every logical event group. The
DataEngine uses this flag as the flush signal when buffer_deltas is
enabled. Without it, deltas accumulate indefinitely and are never
published to subscribers.
F_SNAPSHOT: Set on all deltas that belong to a snapshot sequence
(a Clear action followed by Add actions reconstructing the book).
Empty book snapshots: When emitting a snapshot for an empty book,
the Clear delta must have F_SNAPSHOT | F_LAST. Otherwise buffered
consumers never receive it.
Incremental updates: Each venue update message ends with a delta
that has F_LAST set. If the venue batches multiple updates into one
message, terminate each logical group with F_LAST.
from nautilus_trader.model.enums import RecordFlag
# Incremental update (single event)
delta = OrderBookDelta(
instrument_id=instrument_id,
action=BookAction.UPDATE,
order=order,
flags=RecordFlag.F_LAST, # Last (and only) delta in this event
sequence=sequence,
ts_event=ts_event,
ts_init=ts_init,
)
# Snapshot sequence
clear_delta = OrderBookDelta(
instrument_id=instrument_id,
action=BookAction.CLEAR,
order=NULL_ORDER,
flags=RecordFlag.F_SNAPSHOT, # Not the last delta
...
)
last_add_delta = OrderBookDelta(
instrument_id=instrument_id,
action=BookAction.ADD,
order=last_order,
flags=RecordFlag.F_SNAPSHOT | RecordFlag.F_LAST, # End of snapshot
...
)
:::warning
A missing F_LAST is a silent bug: no error is raised, but subscribers
never receive the data when buffering is enabled.
:::
The ExecutionClient manages order submission, modification, and cancellation against the venue
trading system.
from nautilus_trader.execution.messages import BatchCancelOrders
from nautilus_trader.execution.messages import CancelAllOrders
from nautilus_trader.execution.messages import CancelOrder
from nautilus_trader.execution.messages import GenerateFillReports
from nautilus_trader.execution.messages import GenerateOrderStatusReport
from nautilus_trader.execution.messages import GenerateOrderStatusReports
from nautilus_trader.execution.messages import GeneratePositionStatusReports
from nautilus_trader.execution.messages import ModifyOrder
from nautilus_trader.execution.messages import SubmitOrder
from nautilus_trader.execution.messages import SubmitOrderList
from nautilus_trader.execution.reports import ExecutionMassStatus
from nautilus_trader.execution.reports import FillReport
from nautilus_trader.execution.reports import OrderStatusReport
from nautilus_trader.execution.reports import PositionStatusReport
from nautilus_trader.live.execution_client import LiveExecutionClient
class TemplateLiveExecutionClient(LiveExecutionClient):
"""Example `LiveExecutionClient` outlining the required overrides."""
async def _connect(self) -> None:
raise NotImplementedError("implement `_connect` in your adapter subclass")
async def _disconnect(self) -> None:
raise NotImplementedError("implement `_disconnect` in your adapter subclass")
async def generate_order_status_report(
self,
command: GenerateOrderStatusReport,
) -> OrderStatusReport | None:
raise NotImplementedError("method `generate_order_status_report` must be implemented in the subclass")
async def generate_order_status_reports(
self,
command: GenerateOrderStatusReports,
) -> list[OrderStatusReport]:
raise NotImplementedError("method `generate_order_status_reports` must be implemented in the subclass")
async def generate_fill_reports(
self,
command: GenerateFillReports,
) -> list[FillReport]:
raise NotImplementedError("method `generate_fill_reports` must be implemented in the subclass")
async def generate_position_status_reports(
self,
command: GeneratePositionStatusReports,
) -> list[PositionStatusReport]:
raise NotImplementedError("method `generate_position_status_reports` must be implemented in the subclass")
async def generate_mass_status(
self,
lookback_mins: int | None = None,
) -> ExecutionMassStatus | None:
raise NotImplementedError("method `generate_mass_status` must be implemented in the subclass")
async def _submit_order(self, command: SubmitOrder) -> None:
raise NotImplementedError("implement `_submit_order` in your adapter subclass")
async def _submit_order_list(self, command: SubmitOrderList) -> None:
raise NotImplementedError("implement `_submit_order_list` in your adapter subclass")
async def _modify_order(self, command: ModifyOrder) -> None:
raise NotImplementedError("implement `_modify_order` in your adapter subclass")
async def _cancel_order(self, command: CancelOrder) -> None:
raise NotImplementedError("implement `_cancel_order` in your adapter subclass")
async def _cancel_all_orders(self, command: CancelAllOrders) -> None:
raise NotImplementedError("implement `_cancel_all_orders` in your adapter subclass")
async def _batch_cancel_orders(self, command: BatchCancelOrders) -> None:
raise NotImplementedError("implement `_batch_cancel_orders` in your adapter subclass")
| Method | Description |
|---|---|
_connect | Establishes a connection to the venue APIs. |
_disconnect | Closes the connection to the venue APIs. |
generate_order_status_report | Generates a report for a specific order on the venue. |
generate_order_status_reports | Generates reports for all orders on the venue. |
generate_fill_reports | Generates reports for filled orders on the venue. |
generate_position_status_reports | Generates reports for position status on the venue. |
generate_mass_status | Generates execution mass status reports. |
_submit_order | Submits a new order to the venue. |
_submit_order_list | Submits a list of orders to the venue. |
_modify_order | Modifies an existing order on the venue. |
_cancel_order | Cancels a specific order on the venue. |
_cancel_all_orders | Cancels all orders for an instrument on the venue. |
_batch_cancel_orders | Cancels a batch of orders for an instrument on the venue. |
Configuration classes hold adapter-specific settings like API keys and connection details.
from nautilus_trader.config import LiveDataClientConfig
from nautilus_trader.config import LiveExecClientConfig
class TemplateDataClientConfig(LiveDataClientConfig):
"""Configuration for `TemplateDataClient` instances."""
api_key: str
api_secret: str
base_url: str
class TemplateExecClientConfig(LiveExecClientConfig):
"""Configuration for `TemplateExecClient` instances."""
api_key: str
api_secret: str
base_url: str
Key attributes:
api_key: The API key for authenticating with the data provider.api_secret: The API secret for authenticating with the data provider.base_url: The base URL for connecting to the data provider's API.Exercise adapters across every venue behaviour they claim to support. Incorporate these scenarios into the Rust and Python suites.
Test each supported product family.
See the full Data Testing Spec for the DataTester test matrix.
See the full Execution Testing Spec for the ExecTester test matrix.