src/platform/packages/private/kbn-data-streams/README.md
A set of (generally) stateless tools and utilities to ease working with Elasticsearch Data streams in TypeScript.
Inspired by @kbn/storage-adapter and other data stream adapter-like implementations in alerting and security solution.
The DataStreamClient provides a lightweight data-mapper pattern for CRUD operations against Elasticsearch data streams.
create(options?)Create one or more documents in the data stream. Each document can optionally include an _id property. Supports optional space parameter applied globally to all documents.
// Single document
const response = await client.create({
space: 'my-space', // optional, applied to all documents
documents: [
{ _id: 'doc-123', '@timestamp': new Date().toISOString(), field: 'value' },
],
refresh: true,
});
// Multiple documents
const response = await client.create({
space: 'my-space', // optional, applied to all documents
documents: [
{ _id: 'doc-1', '@timestamp': new Date().toISOString(), field: 'value1' },
{ '@timestamp': new Date().toISOString(), field: 'value2' }, // auto-generated ID
{ _id: 'doc-3', '@timestamp': new Date().toISOString(), field: 'value3' },
],
refresh: true,
});
search(query, options?)Search documents in the data stream. Supports optional space parameter for space-aware filtering.
const response = await client.search({
space: 'my-space', // optional
query: { match_all: {} },
size: 10,
});
exists()Check if the data stream exists.
const exists = await client.exists();
The following operations are not exposed in the API because they require knowledge of the underlying backing index names, which we keep as a private implementation detail:
get(id): Use search() with an ids query instead (see example below)update operations: Data streams are append-only; updates require targeting specific backing indicesdelete operations: Deletes require specifying the backing index nameSince get() is not supported, use search() with an ids query to retrieve a document by ID:
// Retrieve a single document by ID
const response = await client.search({
space: 'my-space', // optional, required if document is space-bound
query: { ids: { values: ['document-id'] } },
size: 1,
});
if (response.hits.hits.length > 0) {
const document = response.hits.hits[0]._source;
// Use the document...
} else {
// Document not found
}
This approach works across all backing indices in the data stream, unlike Elasticsearch's get() API which requires a specific backing index name.
All CRUD operations (create, search) accept an optional space parameter:
{space}::{id} (e.g. myspace::abc123). Documents are decorated with kibana.space_ids: [space]. Searches are filtered to that space. The system property kibana.space_ids is stripped from responses.kibana.space_ids decoration. Searches return only space-agnostic documents. IDs containing the :: separator are rejected (reserved for system use).Data streams can contain both space-bound and space-agnostic documents. The package does not handle RBAC; higher-level repositories should wrap these APIs for access control.
When registering a data stream, the following reserved keys are automatically validated and will cause an error if found in your mappings:
kibana: Reserved for system properties (e.g., kibana.space_ids)_id: Reserved for document identifiers (cannot be defined in mappings)The kibana.space_ids mapping is automatically injected during registration. These validations occur via registerDataStream(). When the data stream version is incremented, mappings are applied to the write index.
Mapping updates will apply to the current write-index and your index template. This means new mappings will only be applied to docs that arrive after your mappings update land.
!IMPORTANT Mapping updates will only be applied once you INCREMENT the template version number in your data stream definition. As you update your definition it is highly recommended that you retain past definitions so that you can test your upgrade path before releasing new mappings.
@kbn/data-streams supports data stream lifecycle (DSL) through index template lifecycle settings.
Define template.lifecycle (for example data_retention) to configure data stream lifecycle behavior when the data stream is created or updated.
These tools assume that you will be introducing backwards compatible changes to your mappings. If you do not apply bwc mappings you will hit a runtime error initializing your client as it will try to update the current write index with your new mappings.
If you need to make a breaking change to mappings, consider using search-time runtime mappings.
Elasticsearch supports specifying runtime mappings at search time (docs). This is a powerful tool that enables certain schema-on-read-like patterns, massive data migrations or backfills CAN be avoided!
Let's say I have written the following document:
POST my-data-stream/_doc
{
"@timestamp": "2099-05-06T16:21:15.000Z",
"message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] GET /images/bg.jpg HTTP/1.0 200 24736",
}
But actually, I mapped message as a keyword field. With runtime mappings you can remap the field on the fly:
GET my-data-stream/_search
{
"runtime_mappings": {
"messageV2": {
"type": "text",
"script": {
"source": "emit(doc['message'].value);"
}
}
},
"query": {
"match_all": {}
},
"fields": ["messageV2"]
}
...but what if you want to move and transform the value of a field in the database, almost like a migration. To a limited degree this is possible to do at read time too!
GET my-data-stream/_search
{
"runtime_mappings": {
"messageV2": {
"type": "text",
"script": {
"source": "if (params._source[\"messageV2\"] != null) {\n emit(params._source[\"messageV2\"]);\n} else {\n emit(doc['message'].value + \" the original, but processed\");\n}"
}
}
},
"query": {
"match_all": {}
},
"fields": ["messageV2"]
}
Using painless in this way is powerful, but we should be careful to ship performant and well tested painless in our code. That's why we expose a set of parameterised scripts for the most common use cases.
We can consider creating the following test utilities:
test('myDataStream should be backwards compatible', async () => {
await integrationTestHelpers.assertBackwardsCompatible([
{
sampleDocs: [
{
/* 1 */
},
// and so on...
],
dataStream: v1,
},
{
sampleDocs: [
{
/* 1 */
},
// and so on...
],
dataStream: current,
},
]);
});
test('snapshot', async () => {
expect(integrationTestHelpers.toSnapshot(myDataStream)).toMatchSnapshot();
});
test('mappings hash v1', async () => {
expect(integrationTestHelpers.mappingsHash(myDataStream)).toMatchInlineSnapshot(`hash-1`);
});