doc/developer/design/20240328_persist_columnar.md
We want Persist to have an understanding of the data it contains. The main
motivating factor for this is to support schema migrations, concretely,
being able to run ALTER TABLE ... ADD COLUMN ... or
ALTER TABLE ... DROP COLUMN ....
Today Persist writes Parquet files with the following schema:
{
"k": bytes,
"v": bytes,
"t": i64,
"d": i64,
}
Where "k" is a Row (technically SourceData) encoded as Protobuf (via the
Codec
trait) and "v" is always the unit type (), which gets encoded as an empty
byte array. The root of the problem is Row is not self describing, we need
some extra information (i.e. a RelationDesc) to know what columns the
Datums in a Row map to. Consider the following scenario:
CREATE TABLE t1 ('a' text, 'b' int);
INSERT INTO t1 VALUES ('hello', 5);
ALTER TABLE t1 DROP COLUMN 'b';
ALTER TABLE t1 ADD COLUMN 'b' int DEFAULT 50;
DELETE FROM t1 WHERE 'b' = 5;
To properly handle the DELETE we need enough information in the persisted
batch to realize that the Row('hello', 5) initially inserted should not
be deleted because the column with the value 5 corresponds to the previous
column named b.
PARTITION BY.persist_source faster #25901.Require that data written to Persist (e.g. Rows) be transformed into
Apache Arrow data types, then use this columnar
format to write Apache Parquet to S3.
Arrow is a relatively new in-memory columnar data format. It is designed for efficient (e.g. SIMD vectorized) analytical operations and to be a standard that all "data libraries" can interface with. Persist currently collects statistics for filter-pushdown using Arrow.
Parquet is columnar format designed for efficient storage based off of the Dremel paper from Google and built as a collaboration between Twitter and Cloudera for Hadoop. It's goal is space-efficient storage of data that allows fetching individual columns from a collection of rows. Crucially it is also self describing and supports additional arbitrary metadata.
Parquet also has a number of nice features that we won't use immediately but could enable fairly easily:
Note: Arrow and Parquet are pretty tightly coupled, mapping from one format to the other is already handled by any open-source library we would use. For the most part our codebase will interact with Arrow, only the lowest layer of Persist having to know about Parquet.
Currently we durably persist Datums using the ProtoDatum
protobuf message. Arrow's data types are not as rich as protobufs, so we need
to figure out how exactly we would represent Datums in an Arrow format. These
decisions are not set in stone though, we describe below how migrations to
other Arrow data types would be possible, but it's less work if we get this
right from the start.
struct Decimal<const N: usize> {
digits: u32,
exponent: i32,
bits: u8,
lsu: [u16; N],
}
StructArray<{
lossy: f64,
actual: Bytes,
}>
Encoded as two values, a lossy f64 for sorting and filtering, then a
serialized representation of Decimal as opaque bytes.
See https://speleotrove.com/decimal/dnnumb.html for an explanation as to what the fields in the Rust type represent.
</td> <tr> <td> Date </td> <td>Note: Arrow does have
Decimaltypes, but we opt not to use them because they can't represent the full range of values that can be represented byNumeric. Specifically, theDecimaltypes are fixed-point and the largest variant, [Decimal256], has a maximum precision of 76 digits.Numericis floating-point and has a maximum precision of 39 digits, which means we would need a fixed-point number capable of storing 78 digits which Arrow doesn't have.
struct Date {
// Days since the Posgres Epoch.
days: i32,
}
PrimitiveArray<i32>
Directly encode the number of days since the UNIX Epoch (1970-01-01).
</td> <tr> <td> Time </td> <td>Alternative: We could encode this as number of days since the Postgres Epoch so it would be a direct representation of the Rust type, but I'm leaning towards encoding as days since the UNIX epoch for consistency with
Timestampwhich does is also relative to the UNIX epoch. The max value supported forDatein Postgres is the year 5,874,897 AD which can be represented with either offset.
struct NaiveTime {
secs: u32,
frac: u32,
}
FixedSizeBinary[8]
Represented as the secs field and frac field encoded in that order as
big-endian.
Alternative: We could represent this as number of nanoseconds since midnight which is a bit more general but is a more costly at runtime for encoding. Ideally Persist encoding is a fast as possible so I'm leaning towards the more direct-from-Rust approach.
</td> <tr> <td> Timestamp </td> <td>Note: We only need 47 bits to represent this total range, leaving 19 bits unused. In the future if we support the
TIMETZtype we could probably also represent that in au64, using these extra bits to store the timezone.
struct NaiveDateTime {
date: NaiveDate {
// (year << 13) | day
ymdf: i32,
},
time: NaiveTime,
}
PrimitiveArray<i64>
chrono (our underlying date time library) uses a more memory efficient
encoding of date by squeezing both year and day into a single i32, combined
with a NaiveTime this ends up being 12 bytes.
We can repesent this same range of time as the number of microseconds since
the UNIX epoch in an i64. Postgres does something very similar, the only
difference is it uses an offset of 2000-01-01.
struct DateTime<Tz: TimeZone> {
datetime: NaiveDateTime,
// purely type info
offset: Tz::Offset,
}
PrimitiveArray<i64>
Just like Timestamp, we'll encode this as the number of microseconds since the UNIX epoch. We don't actually need to store any timezone information, instead we convert to the session timezone when loaded. This is how Postgres works.
</td> <tr> <td> Interval </td> <td>struct Interval {
months: i32,
days: i32,
micros: i64,
}
FixedSizeBinary[16]
Represented by encoding the months, days, and micros fields encoded as
big endian.
Alternative: The smallest possible representation for interval would be 11 bytes, or 12 if we don't want to do bit swizzling. But other than space savings I don't believe there is a benefit to this approach. In fact it would incur some computational overhead to encode and there are no benefits from a SIMD perspective either.
</td> <tr> <td> Jsonb </td> <td>Alternative: We could represent
Intervals in aStructArraybut we don't expose the internal details ofIntervalso this wouldn't aid in filtering or pushdown. The only benefit of structuring an interval would be for space reduction if we enable dictionary encoding.
// JsonbRef<'a>(Datum<'a>)
enum Value {
Null,
Boolean(bool),
Number(f64),
String(String),
Array(Vec<Value>),
Map(BTree<String, Value>),
}
BinaryArray
Serialize JSON with the existing protobuf types, i.e. ProtoDatum, and store this binary blob.
Structured Data: An option is to structure the JSON data using an Arrow Union type. What is nice about this approach is it would allow us to do some form of projection pushdown on the JSON data. The main issue though is Arrow does not really support recursive data types. In fact, it is impossible to statically define the above
Valueenum in Arrow. The only option is to dynamically generate a DataType/schema given a column of values, see [1] for an example of this approach. I don't believe dynamically generating the schema is a good option because it is relatively complex, and we would end up with arbitrarily deep schemas based on user provided data. The arbitrarily deep schemas particularly concerns me because it would have unpredictable performance.
Alternative: An alternative to fully structing the data is structuing it with a depth limit. For example, structuring up-to X levels deep, and then binary encoding the rest. This gets us predictable performance with the ability to do limited pushdown, at the cost of code complexity. This is probably the best approach in the long term, but in my opinion the additional technical complexity makes it out-of-scope for the initial implementation.
Alternative: Instead of serializing the JSON data with protobuf, we could use a different serialization format like BSON. This approach is nice because it gets us a path to entirely eliminating
ProtoDatum(🔥) but I am slightly leaning away from this given Protobuf is already used so heavily in our codebase. If we do use a different serialization format we'll need to be careful about how we encode numeric data, currently our JSONDatumusesProtoNumericwhich has very high precision.
I am leaning away from this approach because we already use protobuf internally so it's well understood, and there are a few tricks we can use to improve deserialization to greatly improve our performance, e.g. zero-copy strings, lazy deserialization, and skipping fields we don't care about.
[1] https://gist.github.com/ParkMyCar/594f647a1bc5a146bb54ca46e6e95680
</td> <tr> <td> UUID </td> <td>extern crate uuid;
uuid::Uuid([u8; 16])
FixedSizeBinary[16]
Encode the bytes from the Uuid directly into a fixed size buffer.
struct Array {
elements: DatumList,
dims: ArrayDimensions,
}
ArrayDimensions: StructArray<{
lower_bound: i64,
length: u64,
}>
Array: StructArray<{
elements: VariableListArray<T>,
dimensions: VariableListArray<ArrayDimensions>,
}>
Store all arrays (including multidimensional) linearly in Row-major order, with their metadata structured.
Arrays are a bit tricky, their shape must be rectangular but all of the values in a column don't need to have the same shape, and users can specify a logical lower bound other than 1. For example, the following is valid:
CREATE TABLE t1 (a int[]);
INSERT INTO t1 VALUES (ARRAY[1]), (ARRAY[ARRAY[2], ARRAY[3]]);
Even though column a is defined as a single dimension int[], it's valid to
insert a multi-dimensional array. This is because arrays in Postgres are all a
single type, in other words, int[] and int[][][] are the same type.
</td> <tr> <td> List </td> <td>Alternative: We could binary encode the
ArrayDimensionsdata but the Arrow types aren't too complex, so it's not clear that this would definitely be a better solution.
// DatumList<'a>.
Vec<T>
VariableSizeList<T>
A list of values.
</td> <tr> <td> Record </td> <td>Note: Unlike
Array, all the values in a column ofLists must have the same number of dimensions. Also internally Arrow represents nested lists in a Row-major format.
Vec<(ColumnName, ColumnType)>
StructArray
All Record types have the same schema, so at creation time we can define the schema of the column. This is different than JSON where all values can have a different schema/shape.
</td> <tr> <td> Map </td> <td>// DatumMap<'a>.
HashMap<String, T>
MapArray
The Arrow spec does not include the concept of a Map but the arrow2 and
arrow-rs crates have a MapArray type that is a list of tuples.
</td> <tr> <td> MzTimestamp </td> <td>Alternative: We could encode maps to some binary format, e.g. proto, and store them as a binary blob. While this might be simpler it prevents us from being able to push down optimizations into the map.
struct MzTimestamp(u64);
PrimitiveArray<u64>
Number of milliseconds since the UNIX epoch.
</td> <tr> <td> Range </td> <td>struct Range<T> {
lower: RangeBound<T> {
inclusize: bool,
bound: Option<T>,
},
upper: RangeBound<T>,
}
RangeBound: StructArray<{
inclusive: bool,
bound: T,
}>
Range: StructArray<{
lower: RangeBound,
upper: RangeBound,
}>
Structure the data as it is in Rust.
Ranges seem pretty interesting and powerful, so Persist having an understanding of the data seems worthwhile for the long term. They could also be entirely unused (I'm not sure) in which case the complexity of encoding these in a structured way might not be worth it.
</td> <tr> <td> MzAclItem </td> <td>Alternative: Encode a Range into a binary format and store it as a blob.
struct MzAclItem {
// String
grantee: RoleId,
// String
grantor: RoleId,
// u64
acl_mode: AclMode,
}
StructArray<{
grantee: String,
grantor: String,
acl_mode: u64,
}>
Structure the data as it is in Rust.
</td> <tr> <td> AclItem </td> <td>Alternative: Encode an MzAclItem into a binary format and store it as a blob.
struct AclItem {
// u32
grantee: Oid,
// u32
grantor: Oid,
// u64
acl_mode: AclMode,
}
StructArray<{
grantee: u32,
grantor: u32,
acl_mode: u64,
}>
Structure the data as it is in Rust.
</td> <tr> <td> Int2Vector </td> <td>Alternative: It would be relatively easy to stitch together the three values that make up an
AclIteminto aFixedSizeBinary<16>, it should even sort the same as its Rust counterpart.
Vec<i16>
VariableSizeList<i16>
Structure the data as it is in Rust.
</td> </table>Arrow has the concept of Extension Types which do not change the physical layout of a column, but allow you to identify the kind of data stored in a column by tagging it with metadata. At the time of writing there are only two accepted canonical extension types but we are free to create our own.
As part of our mapping from Datum to Arrow column we could include an
extension type in the column's metadata, specifically:
'ARROW:extension:name': 'materialize.persist.<version>.<datum_name>'
To start <version> will just be 1, but could be used in the future to
evolve how we represent Datums in Arrow. And we include the 'persist'
namespacing to make sure we don't collide with any other Materialize Arrow
formats, e.g. for COPY TO ....
Note: The guidance provided by the Arrow Project is to name extension types with a prefix to prevent collisions with other applications. I don't think this really matters for Persist, but I don't see a reason not to follow this pattern.
When writing Arrow columns as Parquet you need to include a schema, which requires each column to be named. Instead of using the externally visible column names, we could name the Arrow fields with their column index. For example, a table with two columns 'foo' and 'bar' would be persisted with column names '0' and '1'. This is a bit forward looking, but aims to support two goals:
Like Extension Types, schemas can also be tagged with metadata. We already
write some limited metadata
to our Parquet files in the form of a protobuf message
I propose we extend this message to include include a "version" which initially
will just be 1 and can be incremented at its own pace. The goal of this
"batch version" is it gives us a way to entirely ignore the new columnar data
if necessary. For example, say we incorrectly encode the new structured format
and should always ignore the bad data on decode, bumping the version number
gives us an easy way to do that.
Today data is encoded with the Codec trait and written to S3 with the
following Parquet schema:
{
"k": bytes,
"v": bytes,
"t": i64,
"d": i64,
}
To migrate to the new structured data I propose we extend the above schema to:
{
"k": bytes,
"v": bytes,
"t": i64,
"d": i64,
"k_s": {
// ... nested structured data
},
"v_s": {
// ... nested structured data
},
}
In other words, we dual encode blobs with both Codec and Arrow and write them
to a single Parquet file. This has the following benefits:
Codec and Arrow
decode to the same Rows. Reading only Arrow data could be turned on or off
via a LaunchDarkly flag.Codec format. Currently decoding
protobuf (i.e. ProtoDatum) is computationally expensive, getting an
incremental win of improving decode time would be nice.PUT or GET requests to S3.'k' and 'v' columns.The current max blob size is 128MiB, we have two options for how we want to
handle the "max" size when we write both Codec data and Arrow at the same
time.
GETs and PUTs which could
impact cost, and requires ConsolidatingIter to handle a larger number of
Parts.Neither option is great, but I am leanning towards [1] since it puts the pressure/complexity on S3 and the network more than Persist internals. To make sure this solution is stable we can add Prometheus metrics tracking the size of blobs we read from S3 and monitor them during rollout to staging and canaries.
To start we can write both the Codec and Arrow versions of the data, and do
compaction based on the bytes from Codec. Eventually we'll need to
consolidate the Arrow data directly which requires sorting the data. Sorting
an Arrow StructArray is possible (e.g. the pyarrow library supports it)
but neither the arrow-rs or arrow2 crates support it. I filed an
issue with arrow-rs but
realistically this is something we'll probably need to implement. Once data is
sorted consolidating an Arrow array is relatively easy and fast! To delete a
value all we need to do is unset the bit in the
validity bitmap
for that value.
There are three different scenarios we need to handle consolidation for:
Datums to Arrow Arrays.Codec data, Batch B with Arrow Arrays.ALTER TABLE .. ADD COLUMN ...)For all of these scenarios I believe we can handle consolidation by first
decoding to the Rust type (e.g. Row) and consolidating based on that. It will
be relatively inefficient, but none of these scenarios should happen too often.
Note: In the current state of the world we'll need to support compacting
Codecand Arrow data forever since we can't guarantee all batches written withCodechave been compacted yet. The ability to make this guarantee is something we could add to Compaction 2.0.
Note: While technically out-of-scope for this design doc, we can implement efficient consolidation for case 3 (table evolution) without decoding to the Rust type. For columns that have been dropped we can drop the corresponding Arrow Array or for new columns with a default value we can add a run-end encoded array, both should take
O(1)time.
While not a working prototype I have three draft PRs that validate the ideas described in this design doc.
'k_s' and 'v_s' with our
current Parquet schema.A goal of this project is to break up the work to provide incremental value. This makes code easier to review, allows changes to bake for longer periods of time, and we can pause on work if something else arrises.
An approximate ordering of work would be:
ProtoDatum representation and collects no stats. This unblocks writing
structured columnar data, and asynchronously we can nail down the right
Arrow encoding formats.Datums that have
had non-V0 Arrow encodings implemented, this allows us to avoid decoding
ProtoDatum in the read path, which currently is relatively expensive.
Crucially this is possible without supporting consolidation via Arrow
because we don't yet consolidate on read.Datum to Arrow encodings and migrate from the "V0" encoding
introduced in step 1. They can be implemented independent of one another,
and would allow us to begin collecting stats for filter pushdown for the new
type and further optimize our reads.Codec with
Arrow. Allows us to stop writing Codec for new batches, saving on S3
costs.After all 4 of these things are completed, we would be able to stop encoding
with ProtoDatum entirely.
The following changes are out of scope for this project, but our goal is to unblock these features with the described work.
ALTER TABLE ... [ADD | DROP] COLUMN ...
User specified sorting of data, PARTITION BY
Projection Pushdown
I found serialization formats generally fell into two categories:
While we could store the formats in group 1 (e.g. today we store a list of protobuf encoded objects) they lack the features of group 2 which work well for large amounts of data, e.g. compression or dictionary encoding.
Out of the formats in group 2, it's generally accepted that columnar formatted data is better for analytical workloads. That elimates row based formats like Apache Avro, CSV, and jsonl.
The only serialization format, other than Apache Parquet, I could find that was designed for large tabular data and columnar formatted was Apache ORC. I didn't do too much research into ORC, nor think it's the right fit for us, because it doesn't have much support in the open source community. There is only a partial Rust implementation which has little usage.
I briefly explored the idea of inventing our own serialization format. Because we're serializing a specific kind of data structure with a specific access pattern there are probably some optimizations we could make for the format. But in my opinion these possible benefits are not worth the time and maintenance required to invent our own format, considering where we are today.
There are not many in-memory columnar formats other than Arrow. I did find
lance which is a farily popular columnar
format written in Rust, but it's geared towards Machine Learning and LLMs. In
fact, when I looked a bit deeper parts of lance seemed to use Arrow directly?
Given this and the different goals it doesn't seem like it would be a good fit.
Currently we only use Arrow Arrays to collect stats on the data we're writing,
so we're not really benefiting from Arrow's high performance compute kernels.
It is possible to go from Datums straight to Parquet types, but when
exploring this I found Parquet to be a much lower level interface than desired,
and Arrow offered the appropriate abstraction that made mapping Datums
relatively easy.
Arrow also allows for faster random access than Parquet. While not used today, it's a nice property to have.
There is one other alternative for how we migrate to the new columnar format, which is write two blobs to S3 instead of one. We could continue to write blobs as they are today and then begin writing a second blob that has only our columnar data. This avoids the issues with blob size that the proposed solution has, but otherwise has a number of its own issues:
GETs and PUTs to S3, increasing cost.Part would not have
two blobs associated, and we need to keep track of these at some layer.None currently