pkg/dataobj/README.md
The dataobj package provides a container format for storing and retrieving structured data from object storage. It's designed specifically for Loki's log storage needs, enabling efficient columnar storage and retrieval of log data with support for multiple tenants, metadata indexing, and flexible querying.
The dataobj package provides a hierarchical container format:
┌─────────────────────────────────────┐
│ Data Object File │
├─────────────────────────────────────┤
│ Header (Magic: "THOR") │
├─────────────────────────────────────┤
│ Section 1 Data │
│ Section 2 Data │
│ ... │
│ Section N Data │
├─────────────────────────────────────┤
│ Section 1 Metadata │
│ Section 2 Metadata │
│ ... │
│ Section N Metadata │
├─────────────────────────────────────┤
│ File Metadata (Protobuf) │
│ - Dictionary │
│ - Section Types │
│ - Section Layout Info │
├─────────────────────────────────────┤
│ Footer │
│ - File Format Version │
│ - File Metadata Size (4 bytes) │
│ - Magic: "THOR" │
└─────────────────────────────────────┘
A Data Object is a self-contained file stored in object storage containing:
Sections are the primary organizational unit within a data object:
Each section is split into two regions:
This separation allows reading section metadata without loading the entire data payload.
Most section implementations use columnar storage via the internal/dataset package:
Offset | Content
--------|----------------------------------------------------------
0 | Magic bytes: "THOR" (4 bytes)
4 | [Section Data Region - All sections concatenated]
... | [Section Metadata Region - All sections concatenated]
... | Format Version (varint)
... | File Metadata (protobuf-encoded)
-8 | Metadata Size (uint32, little-endian)
-4 | Magic bytes: "THOR" (4 bytes)
File Metadata Structure (Protobuf) can be found in the pkg/dataobj/internal/metadata package.
┌──────────────┐
│ Log Records │
└──────┬───────┘
│
▼
┌──────────────────┐
│ Section Builder │ (e.g., logs.Builder)
│ - Buffer records │
│ - Create stripes │
│ - Merge stripes │
└──────┬───────────┘
│
│ Flush
▼
┌──────────────────┐
│ Columnar Encoder │
│ - Encode columns │
│ - Compress pages │
└──────┬───────────┘
│
│ WriteSection
▼
┌──────────────────┐
│ dataobj.Builder │
│ - Append section │
│ - Build metadata │
└──────┬───────────┘
│
│ Flush
▼
┌──────────────────┐
│ Snapshot │
│ - In-memory/disk │
│ - Ready to upload│
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Object Storage │
└──────────────────┘
A section builder:
BufferSizeWhen flushing, the builder:
The data object builder:
Column metadata includes:
The encoding uses a multi-level compression strategy:
SpeedFastest for quick bufferingSpeedDefault for better compression┌──────────────────┐
│ Object Storage │
└──────┬───────────┘
│
│ Open
▼
┌──────────────────┐
│ dataobj.Object │
│ - Read metadata │
│ - Parse sections │
└──────┬───────────┘
│
│ Open Section
▼
┌──────────────────┐
│ Section (e.g. │
│ logs.Section) │
│ - Decode metadata│
│ - List columns │
└──────┬───────────┘
│
│ NewReader
▼
┌──────────────────┐
│ logs.Reader │
│ - Read pages │
│ - Apply filters │
│ - Decompress │
└──────┬───────────┘
│
│ Read batches
▼
┌──────────────────┐
│ Arrow Records │
└──────────────────┘
Opening process:
The decoder:
Section opening:
Reading process:
Reader optimizations:
Values in pages can use different encodings:
The package provides several built-in section types:
Namespace: github.com/grafana/loki
Kind: logs
Location: pkg/dataobj/sections/logs/
Stores log records in columnar format.
Columns:
stream_id (int64): Stream identifiertimestamp (int64): Nanosecond timestampmetadata (binary): Structured metadata key-value pairs, one column per keymessage (binary): Log line contentNamespace: github.com/grafana/loki
Kind: streams
Location: pkg/dataobj/sections/streams/
Stores stream metadata and statistics.
Columns:
stream_id (int64): Unique stream identifiermin_timestamp (int64): Earliest timestamp in streammax_timestamp (int64): Latest timestamp in streamlabels (binary): Label key-value pairs for the stream, one column per keyrows (uint64): Number of log recordsuncompressed_size (uint64): Uncompressed data sizeNamespace: github.com/grafana/loki
Kind: pointers
Location: pkg/dataobj/sections/pointers/
Used in index objects to point to other objects that contain logs, via stream or predicate lookup.
Columns:
path (binary): Path to data object in object storagesection (int64): The section number within the referenced data objectpointer_kind (int64): The type of pointer entry. Determines which fields are set in the section. Either stream or column.// Fields present for a stream pointer type
stream_id (int64): Stream identifier in this index objectstream_id_ref (int64): Stream identifier in the target data object's stream sectionmin_timestamp (int64): Min timestamp in the object for this streammax_timestamp (int64): Max timestamp in the object for this streamrow_count (int64): Total rows for this stream in the target objectuncompressed_size (int64): Total bytes for this stream in the target object// Fields for a column pointer type
column_name (binary): The name of the column in the referenced objectcolumn_index (int64): The index number of the column in the referenced objectvalues_bloom_filter (binary): A bloom filter for unique values seen in the referenced columnNamespace: github.com/grafana/loki
Kind: indexpointers
Location: pkg/dataobj/sections/indexpointers/
Used in Table of Contents (toc) objects to point to index objects, via a time range lookup.
Columns:
path (binary): Path to the index filemin_time (int64): Minimum time covered by the referenced index objectmax_time (int64): Maximum time covered by the referenced index objectLocation: pkg/dataobj/consumer/
The consumer reads log data from Kafka and builds data objects.
Key Features:
Location: pkg/dataobj/metastore/
Manages an index of data objects and their contents for efficient querying.
The metastore serves queries by the following:
Location: pkg/dataobj/index/
Creates index objects that contain indexes over data objects containing the logs.
Location: pkg/dataobj/explorer/
HTTP service for inspecting data objects.
package main
import (
"context"
"time"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/prometheus/prometheus/model/labels"
)
func main() {
ctx := context.Background()
// Create logs builder
logsBuilder := logs.NewBuilder(nil, logs.BuilderOptions{
PageSizeHint: 256 * 1024,
BufferSize: 10 * 1024 * 1024,
AppendStrategy: logs.AppendUnordered,
SortOrder: logs.SortStreamASC,
})
// Append log records
logsBuilder.Append(logs.Record{
StreamID: 12345,
Timestamp: time.Now(),
Metadata: labels.Labels{{Name: "level", Value: "error"}},
Line: []byte("error occurred"),
})
// Create data object
objBuilder := dataobj.NewBuilder(nil)
objBuilder.Append(logsBuilder)
obj, closer, err := objBuilder.Flush()
if err != nil {
panic(err)
}
defer closer.Close()
// Upload to object storage
uploader := uploader.New(uploaderCfg, bucket, logger)
objectPath, err := uploader.Upload(ctx, obj)
if err != nil {
panic(err)
}
println("Uploaded to:", objectPath)
}
package main
import (
"context"
"io"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)
func main() {
ctx := context.Background()
// Open data object
obj, err := dataobj.FromBucket(ctx, bucket, "objects/ab/cd123...", 0)
if err != nil {
panic(err)
}
// Find logs sections
var logsSection *dataobj.Section
for _, sec := range obj.Sections() {
if logs.CheckSection(sec) {
logsSection = sec
break
}
}
// Open logs section
section, err := logs.Open(ctx, logsSection)
if err != nil {
panic(err)
}
// Find columns
var timestampCol, messageCol *logs.Column
for _, col := range section.Columns() {
switch col.Type {
case logs.ColumnTypeTimestamp:
timestampCol = col
case logs.ColumnTypeMessage:
messageCol = col
}
}
// Create reader with predicate
reader := logs.NewReader(logs.ReaderOptions{
Columns: []*logs.Column{timestampCol, messageCol},
Predicates: []logs.Predicate{
logs.GreaterThanPredicate{
Column: timestampCol,
Value: scalar.NewInt64Scalar(startTime.UnixNano()),
},
},
})
defer reader.Close()
if err := reader.Open(ctx); err != nil {
panic(err)
}
// Read batches
for {
batch, err := reader.Read(ctx, 1000)
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
// Process batch (Arrow record)
for i := 0; i < int(batch.NumRows()); i++ {
timestamp := batch.Column(0).(*array.Timestamp).Value(i)
message := batch.Column(1).(*array.String).Value(i)
println(timestamp, message)
}
batch.Release()
}
}
# Inspect data object structure
go run ./pkg/dataobj/tools/inspect.go -path objects/ab/cd123...
# Show statistics
go run ./pkg/dataobj/tools/stats.go -path objects/ab/cd123...
The explorer provides a web UI for browsing data objects:
# Start explorer
./loki -target=dataobj-explorer
# Access UI
curl http://localhost:3100/dataobj/api/v1/list