Back to Sui

Bring Your Own Store (BYOS)

docs/content/guides/developer/accessing-data/custom-indexer/bring-your-own-store.mdx

latest7.5 KB
Original Source

The IndexerCluster provides a convenient way to get started with PostgreSQL, but you might want to use a different database or storage system. This requires using the manual Indexer class and implementing custom Store and Connection traits from sui-indexer-alt-framework-store-traits.

<details> <summary> `lib.rs` in `sui-indexer-alt-framework-store-traits` </summary> <ImportContent source="crates/sui-indexer-alt-framework-store-traits/src/lib.rs" mode="code" trait="Connection,Store" /> </details>

When to use BYOS:

  • Different database: MongoDB, CouchDB, or other non-PostgreSQL databases. This also applies if you prefer to use PostgreSQL but without the default Diesel ORM.
  • Custom requirements: Specialized storage logic, partitioning, or performance optimizations.

Core implementation requirements

To implement BYOS, you need to:

  1. Define your Store and Connection struct that manages connections.
  2. Implement the Store trait for connection management.
  3. Implement the Connection trait for watermark operations.
  4. Use manual Indexer instead of IndexerCluster.

###step Define your store structure

rust
use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;

#[derive(Clone)]
pub struct MyCustomStore {
    // Your database connection details
    connection_pool: MyDatabasePool,
    config: MyConfig,
}

pub struct MyCustomConnection<'a> {
    // A connection instance
    conn: MyDatabaseConnection<'a>,
}

###step Implement the Store trait

The Store trait manages the connection lifecycle:

rust
#[async_trait]
impl Store for MyCustomStore {
    type Connection<'c> = MyCustomConnection<'c>;

    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
        // Your implementation
    }
}

###step Implement the Connection trait

The Connection trait handles watermark operations for pipeline coordination:

rust
#[async_trait]
impl Connection for MyCustomConnection<'_> {
    // Get the highest checkpoint processed by a pipeline
    async fn committer_watermark(
        &mut self,
        pipeline: &'static str,
    ) -> anyhow::Result<Option<CommitterWatermark>> {
        // Query your database for watermark data
        todo!("Implement based on your storage system")
    }

    // Get the lowest available checkpoint for readers
    async fn reader_watermark(
        &mut self,
        pipeline: &'static str,
    ) -> anyhow::Result<Option<ReaderWatermark>> {
        // Implementation depends on your database schema
        todo!("Implement based on your storage system")
    }

	  // Implement other required methods...
}

For a complete reference, study the sui-pg-db implementation on Connection:

<ImportContent source="crates/sui-pg-db/src/store.rs" mode="code" impl="Connection" />

###step Use manual indexer

Replace IndexerCluster with manual Indexer:

rust
use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
    ClientArgs, IngestionConfig,
    ingestion_client::IngestionClientArgs,
};

async fn main() -> anyhow::Result<()> {
    // Initialize your custom store
    let store = MyCustomStore::new(config).await?;

    // Configure indexer manually
    let indexer = Indexer::new(
        store,
        IndexerArgs::default(),
        ClientArgs {
            ingestion: IngestionClientArgs {
                remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
                ..Default::default()
            },
            ..Default::default()
        },
        IngestionConfig::default(),
        &prometheus::Registry::new(),
        tokio_util::sync::CancellationToken::new(),
    ).await?;

    // Add your pipelines
    indexer.concurrent_pipeline(
        YourHandler::default(),
        ConcurrentConfig::default(),
    ).await?;

    // Start the indexer
    indexer.run().await?;
    Ok(())
}

Example: ClickHouse implementation

For a complete working example of BYOS with ClickHouse (a high-performance columnar database for analytics), see the example project in the Sui repo.

<details> <summary> ClickHouse example README </summary> <ImportContent source="examples/rust/clickhouse-sui-indexer/README.md" mode="code" style="md" /> </details>

This example demonstrates:

  • Custom store implementation using the ClickHouse Rust client.
  • Watermark persistence with ClickHouse-specific SQL syntax.
  • Transaction digest indexing similar to the built-in PostgreSQL handler.

The example includes 3 main components:

  1. store.rs - ClickHouseStore implementing Store and Connection traits.

    <details> <summary> `store.rs` </summary> <ImportContent source="examples/rust/clickhouse-sui-indexer/src/store.rs" mode="code" /> </details>
  2. handlers.rs - TxDigest handler processing checkpoint data.

    <details> <summary> `handlers.rs` </summary> <ImportContent source="examples/rust/clickhouse-sui-indexer/src/handlers.rs" mode="code" /> </details>
  3. main.rs - Manual indexer setup with ClickHouse backend.

    <details> <summary> `main.rs` </summary> <ImportContent source="examples/rust/clickhouse-sui-indexer/src/main.rs" mode="code" /> </details>

Deserializing Move events {#deserialize}

When Move smart contracts execute on Sui, they can emit events using the sui::event module. These events are stored in checkpoints as BCS-serialized bytes that your indexer needs to deserialize to extract meaningful data.

Why deserialization is needed

Move contracts emit events like the following:

rust
// Move smart contract
use sui::event;

public fun transfer_balance(...) {
    event::emit(BalanceEvent {
        balance_manager_id: id,
        asset: asset_id,
        amount: 100,
        deposit: true
    });
}

In checkpoint data, these events arrive as raw BCS bytes that need to be converted back to Rust structs for processing.

Step-by-step deserialization

  1. Add BCS dependency.

    rust
    [dependencies]
    bcs = "0.1.6"
    serde = { version = "1.0", features = ["derive"] }
    
  2. Define the Event struct in Rust.

    Define the same structure in Rust as declared in Move. You can do this manually or use move-binding to auto-generate it from on-chain packages.

    rust
    use serde::Deserialize;
    use sui_indexer_alt_framework::types::::base_types::ObjectID;
    
    #[derive(Deserialize, Debug)]
    struct BalanceEvent {
        balance_manager_id: ObjectID,
        asset: ObjectID,
        amount: u64,
        deposit: bool,
    }
    

    :::important

    Field order and types must match the Move event exactly.

    :::

  3. Extract event bytes in your processor.

    rust
    impl Processor for YourHandler {
        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
            let mut results = Vec::new();
    
            for transaction in &checkpoint.transactions {
                for event in &transaction.events {
                    // Get the raw BCS bytes
                    let event_bytes = &event.contents;
    
                    // Deserialize to your Rust struct
                    if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
                        // Do something
                    }
                }
            }
    
            Ok(results)
        }
    }