Back to Turso

Sync Server

cli/sync_server.mdx

0.5.37.2 KB
Original Source
<Code model="anthropic/claude-opus-4-5" language="rust" output="sync_server.rs" version="0.0.1" id="code"> <Output ref="prompt" /> </Code> <Doc model="anthropic/sonnet-4.5" output="../../turso-docs/sync/spec.mdx" id="doc"> Generate MDX documentation file in Mintlify format for the sync engine open protocol
You will be provided with a prompt which is used to generate simple implementation of the server.
Your task is to extract protocol implementation from this prompt and compose a documentation with PROTOCOL description:
- Document endpoint which MUST be implemented to implement sync engine protocol
- Document sql over http protocol which is used for `/v2/pipeline` endpoint
- Document `/pull-updates` endpoint
- The documentation MUST ONLY mention contracts, logic and flow. No code should be mentioned

Structure documentation well and Use Mintlify rich formatting tools in order to make documentation easy to read and navigatable:
* Give high level overview of sync protocol and its main components in the beginning
    * This is protocol to support bidirectional of sqlite-compatible database between server and client
* Describe /v2/pipeline endpoint
    * List contract spec
* Describe /pull-updates endpoint
* Use mintlify components if appropriate
    * ```....``` code blocks to emit contracts like protobuf schemas
    * Callouts (`<Info>`, `<Warning>`, etc) to extract portion of information into separate block visible to the reader
* Use markdown markup features if appropriate
    * Headers and subheaders for structuring the docs and automatically build table of content by Mintlify
    * Text emphasis (bold, italic) to highlight important words
<Output ref="prompt" />
</Doc> <Text id="prompt">

Goal

Generate simple implementation of sync server using tursodatabase - rewrite of the SQLite. The implementation must maintain turso database file locally at given path and disable checkpoint for it.

rs
use anyhow::Result;
// ... more imports here ...

pub struct TursoSyncServer {
    // listen address (e.g. 0.0.0.0:8080)
    address: String,
    conn: Arc<Mutex<Arc<Connection>>>,
    // stop server if interrupt_count > 0 (do this check in the main server event loop)
    interrupt_count: Arc<AtomicUsize>,
}

impl TursoSyncServer {
    pub fn new(address: String, conn: Arc<Connection>, interrupt_count: Arc<AtomicUsize>) -> Self {
        Self {
            address,
            conn: Arc::new(Mutex::new(conn)),
            interrupt_count,
        }
    }

    pub fn run(&self) -> Result<()> {
        // implement logic here
    }

    fn sql_over_http(&self, query: ...) -> Result<()> { ... }
    fn pull_updates(&self, query: ...) -> Result<()> { ... }
}
  • Use <T as prost::Message>::decode(...) to access methods associated with the trait prost::Message
  • from_i32 method in prost is deprecated
    • use of deprecated associated function turso_sync_engine::server_proto::PageUpdatesEncodingReq::from_i32: Use the TryFrom<i32> implementation instead
  • Start separate thread to monitor interrupt_count because otherwise server will be blocked at syscall and will be unable to shutdown
  • Add logging
    • Trace all request information with tracing::info!(...)
    • Add debug logs for more low level info about sync server internal logic
    • Add error logs if SQL over http execution statement failed or something bad happened
    • Add debug logs with executed SQL statements to simplify tracing execution
  • Use run_collect_rows/run_ignore_rows helpers to execute Statement
  • Implement simple non-chunked http/1.1 server - always respond with full Content-Length
  • Use application/protobuf as content type for protobuf payloads
  • Format HTTP response only in one place - all internal functions must return either raw body (without headers and http preamble) or some Rust structs
  • Server must process one request at the time (/v2/pipeline or /pull-updates) in order to provide simple and safe concurrency guarantees
    • Note, that you MUST hold the lock for the whole duration of request execution
    • If you will drop lock guard soon - you will allow multiple connections to be processed at the same time
    rs
    let conn = { self.conn.lock().unwrap().clone() }; // this is wrong!
    
  • USE rocket library for http server
  • DO NOT use tokio - use simple threads instead

Endpoints

Sync server must support 2 endpoints:

  1. POST /v2/pipeline
    • Method executes hrana (SQL-over-HTTP) commands encoded with JSON
  2. POST /pull-updates endpoint which fetch page updates since revision provided by the client
    • Method expects PullUpdatesRequest protobuf message and respond with sequence of length-delimited messages
    • First it respond with PullUpdatesResponseHeader
    • After, it sends multiple PullUpdatesPageData
    • The method recognizes protobuf contracts sent by HTTP/1.1
    • Implement simple http/1.1 server but use binary protobuf as payloads:
    • You MUST return an error if zstd encoding is used
    • You MUST ignore server_query_selector field
    • You MUST ignore client_pages field
    • You MUST decode server_pages_selector and send only pages from the selector if it is set
    • BE CAREFUL: sync protocol use zero-based page identifers while core API sometimes uses 1-based indexing

The contracts which client uses to interact with server (protobufs and JSONs) are listed here: <File path="../sync/engine/src/server_proto.rs" />

Core API

The main database API is in the lib.rs: <Outline model="openai/gpt-4.1" id="api">

  • Generate outline of the turso core API which later can be used to write code easily without errors
  • Include information about main Database/Connection/Statement methods
  • Includ Rust signatures in order to use the outline for code generation
  • Document extra WAL API as it will be important later
<File path="../core/lib.rs" /> <File path="../core/types.rs" selector="WalState|WalFrameInfo|Value|Text" /> <File path="../core/storage/sqlite3_ondisk.rs" selector="PageSize" /> </Outline>

pull updates

  • In order to implement pull-updates endpoint use extra WAL API exposed by the core database api: wal_state and wal_get_frame
  • Use offset in WAL files as a simplest revision string
  • wal_get_frame reads the frame from the WAL which has additional 24 bytes header with extra meta
    • pub const WAL_FRAME_HEADER_SIZE: usize = 24;
    • Do not forget to cut this meta to get page content
  • sync server must work only with page_size = 4096 (4kb)
  • If server_revision is not set - take latest commited frame offset
  • If client_revision is not set - use zero for this
  • Pull updates logic should take all frames between [client_revision..server_revision] and send latest versions of unique pages changed in this range
    • Iterates backward and maintain HashSet of changes pages in memory during request execution
    • Filter out pages which are not included in the server_pages_selector if it was set

sql over http

  • Implement batch conditions - they are important especiall auto-commit handling
  • Implement only positional arguments - named parameters are not used in the sync protocol at client side

dependencies

Currently available dependencies are: <File path="Cargo.toml" />

</Text>