docs/content/guides/developer/accessing-data/custom-indexer/bring-your-own-store.mdx
The IndexerCluster provides a convenient way to get started with PostgreSQL, but you might want to use a different database or storage system. This requires using the manual Indexer class and implementing custom Store and Connection traits from sui-indexer-alt-framework-store-traits.
When to use BYOS:
To implement BYOS, you need to:
Store and Connection struct that manages connections.Store trait for connection management.Connection trait for watermark operations.Indexer instead of IndexerCluster.###step Define your store structure
use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;
#[derive(Clone)]
pub struct MyCustomStore {
// Your database connection details
connection_pool: MyDatabasePool,
config: MyConfig,
}
pub struct MyCustomConnection<'a> {
// A connection instance
conn: MyDatabaseConnection<'a>,
}
###step Implement the Store trait
The Store trait manages the connection lifecycle:
#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;
async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// Your implementation
}
}
###step Implement the Connection trait
The Connection trait handles watermark operations for pipeline coordination:
#[async_trait]
impl Connection for MyCustomConnection<'_> {
// Get the highest checkpoint processed by a pipeline
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// Query your database for watermark data
todo!("Implement based on your storage system")
}
// Get the lowest available checkpoint for readers
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// Implementation depends on your database schema
todo!("Implement based on your storage system")
}
// Implement other required methods...
}
For a complete reference, study the sui-pg-db implementation on Connection:
###step Use manual indexer
Replace IndexerCluster with manual Indexer:
use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
ClientArgs, IngestionConfig,
ingestion_client::IngestionClientArgs,
};
async fn main() -> anyhow::Result<()> {
// Initialize your custom store
let store = MyCustomStore::new(config).await?;
// Configure indexer manually
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
ingestion: IngestionClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
..Default::default()
},
..Default::default()
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;
// Add your pipelines
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;
// Start the indexer
indexer.run().await?;
Ok(())
}
For a complete working example of BYOS with ClickHouse (a high-performance columnar database for analytics), see the example project in the Sui repo.
<details> <summary> ClickHouse example README </summary> <ImportContent source="examples/rust/clickhouse-sui-indexer/README.md" mode="code" style="md" /> </details>This example demonstrates:
The example includes 3 main components:
store.rs - ClickHouseStore implementing Store and Connection traits.
handlers.rs - TxDigest handler processing checkpoint data.
main.rs - Manual indexer setup with ClickHouse backend.
When Move smart contracts execute on Sui, they can emit events using the sui::event module. These events are stored in checkpoints as BCS-serialized bytes that your indexer needs to deserialize to extract meaningful data.
Move contracts emit events like the following:
// Move smart contract
use sui::event;
public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}
In checkpoint data, these events arrive as raw BCS bytes that need to be converted back to Rust structs for processing.
Add BCS dependency.
[dependencies]
bcs = "0.1.6"
serde = { version = "1.0", features = ["derive"] }
Define the Event struct in Rust.
Define the same structure in Rust as declared in Move. You can do this manually or use move-binding to auto-generate it from on-chain packages.
use serde::Deserialize;
use sui_indexer_alt_framework::types::::base_types::ObjectID;
#[derive(Deserialize, Debug)]
struct BalanceEvent {
balance_manager_id: ObjectID,
asset: ObjectID,
amount: u64,
deposit: bool,
}
:::important
Field order and types must match the Move event exactly.
:::
Extract event bytes in your processor.
impl Processor for YourHandler {
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
let mut results = Vec::new();
for transaction in &checkpoint.transactions {
for event in &transaction.events {
// Get the raw BCS bytes
let event_bytes = &event.contents;
// Deserialize to your Rust struct
if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
// Do something
}
}
}
Ok(results)
}
}