doc/developer/design/20250707_persist_schemas.md
The Self Describing Persist Batches design doc from 2024 specified how Persist’s new columnar encoding should work. It covers most of the details of our columnar encoding, but says that “detailed logic for migrating the schema of a batch” is left to future work because it is “a large enough issue to warrant its own design doc”.
This design doc fleshes out the sketch from that original design, and gives a more precise semantics for how migrations can be handled in a safe and compatible way.
DataType level, not the RelationDesc level.)This is partly a backwards-looking design, since the initial work was done last year. This section sketches out a consistent general design… and the MVP section below covers what parts of this have been implemented already and the simplifying assumptions we’ve made.
As part of the columnar migration, Persist introduced a new Schema trait which controls how a particular set of rows should be tranlated into a columnar format. (For example, the key type of an typical shard is SourceData and its schema type is RelationDesc; the arrow datatype will generally be a struct, with fields that vary depending on the exact set of columns and values in that schema.) Persist uses schema implementations like this to map inputs and outputs to and from its columnar representation at the edges, but otherwise works internally in terms of the underlying Arrow datatypes. This allows Persist to strongly enforce its internal requirements around data ordering and encoding, while allowing relations and their encoding to vary over time.
A Persist shard maintains an ordered sequence of schemas. Clients may append a new schema to the sequence. If the sequence is empty, this sets the initial schema of the shard. If the sequence is non-empty, this will check that the schema obeys the schema-migration rules; if it does, it will add the given schema as the latest schema of the shard. A monotonically-increasing SchemaId identifies a particular schema in this sequence.
Permissible migrations are limited to:
All other migrations are against the rules. This includes reordering fields, or re-creating previously-deleted fields. We may decide to add more forward-compatible migrations, like widening integer types, in the future.
Schema implementations can support more flexible migrations by being thoughtful about how they map data to datatypes. For example, we support re-using the names of deleted columns in RelationDesc by using the column id as our struct field name and ensuring column ids are not reused.
To check whether it’s allowed to migrate a shard to a particular datatype, it’s not enough just to check the current datatype: we need to ensure that any added fields have never been used before in any of the previous schemas. IDLs like Protobuf uses “reserved fields” to track this information as part of the current schema, but Arrow datatypes have no similar concept built in.
Internally, we define a new “migration type” that does track deleted fields: identical to an arrow datatype, but where struct fields include whether or not a field has been deleted. We can recover the current datatype by filtering out the deleted fields.
Backward compatibility mean that readers using any valid schema should be able to read data that was written using any previous schema. However, we do not enforce forward compatibility… so we may apply a schema change that invalidates existing readers. In that case, the reader should be “fenced”: exit with an error instead of passing along data with an unexpected shape.
However, it’s possible that any individual schema change is forward-compatible, and if it is, existing readers should not be fenced.
Snapshots work by reading a particular version of the shard state (perhaps blocking to wait for a particular frontier) then returning each part that make up the snapshot. The reader should check that the reader schema is a valid migration for the schemas in that specific version of state; if not, it should raise an error. Listens can be made to work in a similar way, though they will need to re-validate the schema whenever the schema is evolved.
The errors raised by this read-time schema validation should not be passed along in the data stream, since that may make the results of any particular read nondeterministic. Instead, they should use a mechanism like the ErrorHandler in the persist source or report a usage error.
Some consequences of this rule:
Compaction requires consolidation: taking a bunch of runs of data — which may have different datatypes — and converting them to a new run with a single datatype. It does this by first converting all the inputs into a uniform datatype, then compacting the uniformly-typed inputs together.
Compaction always migrates the data in the compaction to the datatype defined by the latest schema in state at the time the compaction was requested. This may be a lossy process if columns are deleted, but that is acceptable: since the compaction uses an output schema only once it’s durably recorded in state state, any reader that observes the compaction output will also have observed that schema, and any reads that would have depended on the now-deleted column will have been fenced out.
Deleting fields anywhere but the end of the struct may not preserve the order of the input data. This is a serious issue for consolidation, since it means we may not be able to generate sorted and consolidated output in a single pass over the inputs.
At time of writing, we have only implemented adding nullable columns to the end of a struct. This lets us make simplifying assumptions in various places:
So far, the demand for more advanced schema migrations has been limited. When we want to allow more advanced schema migrations, we’ll want to relax some of the assumptions above.
We’ve also implemented a few ad-hoc conversions, to deal with issues we found during the initial rollout — for example, compatibility issues around nullability or maps vs. lists. We will need to keep these special cases around until all the old data has been rewritten, which may be some time.
There are a number of other places where the current implementation is less general than the design... for example, the compare_and_evolve_schema method in the Persist API allows changing the schema of the shard, but initializing the schema is done implicitly elsewhere. In cases where the implementation and this design doc disagree, we expect to evolve the Persist API to more closely resemble this design over time.
While Persist is designed to be agnostic to the exact data and schema types, in practice most collections are collections of Rows with RelationDesc schemas. Some brief notes on the nonobvious aspects of that mapping:
DataType based on the numeric column ids in the relation desc, not the field names. This allows renaming fields without any type changes at the Persist level at all.RelationDesc schema implementation just marks all columns as possibly nullable. For use-cases like tables where nullability shouldn't change unexpectedly, the caller may need additional assertions to enforce their own higher-level semantics.Most schema tooling for languages like Protobuf and Avro assumes you want both forward and backwards compatibility: not only must new readers be allowed to read old data, but old readers should be able to understand new data, as long as the schema is evolved in legal ways. The original draft of this document was written with the idea that this was a requirement for Persist as well.
However, during review of that draft, it became clear that:
So: this draft proposes not enforcing forward compatibility by Persist in general. Persist clients, may choose to enforce forward compatibility and allow or a more limited set of migrations, or may allow a wider range of migrations as long as they don’t cause fencing in important dataflows… but in either case we propose leaving this decision to the client.