docs/ref/modules/inventory-sync/flatbuffers.md
The Inventory Sync module uses Google FlatBuffers for efficient serialization and deserialization of synchronization messages between Wazuh agents and the manager. FlatBuffers provide zero-copy deserialization and compact binary representation ideal for high-throughput inventory synchronization.
The inventory synchronization protocol is defined in flatbuffer schema files that specify the message structure and types.
table Message {
content: MessageType;
}
union MessageType {
DataValue,
DataClean,
ChecksumModule,
Start,
StartAck,
End,
EndAck,
ReqRet,
DataContext
}
root_type Message;
All synchronization messages are wrapped in the Message table with the specific message type in the content union field.
Initiates a synchronization session with mode and agent information:
table Start {
module: string;
mode: Mode;
size: ulong;
index: [string];
option: Option;
architecture: string;
hostname: string;
osname: string;
osplatform: string;
ostype: string;
osversion: string;
agentversion: string;
agentname: string;
agentid: string;
groups: [string];
global_version: ulong;
}
enum Mode: byte {
ModuleFull,
ModuleDelta,
ModuleCheck,
MetadataDelta,
MetadataCheck,
GroupDelta,
GroupCheck
}
enum Option: byte {
Sync,
VDFirst,
VDSync
}
Fields:
module: Module name (syscollector, fim, sca)mode: Synchronization type (see modes below)size: Number of messages to be sent in this sessionindex: Target index names for the synchronizationoption: Synchronization option (used by Vulnerability Scanner integration)architecture, hostname, osname, osplatform, ostype, osversion: Agent OS informationagentversion, agentname, agentid: Agent identification informationgroups: Agent group membershipsglobal_version: Version counter for agent context updatesSynchronization Modes:
Module Synchronization (syscollector, FIM, SCA):
ModuleFull: Complete inventory replacement - triggers delete-by-query before indexingModuleDelta: Incremental updates - only processes changesModuleCheck: Integrity verification using checksumsAgent Context Synchronization (agent-info module):
MetadataDelta: Updates agent metadata (name, version, IP, OS) on existing documentsMetadataCheck: Disaster recovery - scans all indices and fixes metadata inconsistenciesGroupDelta: Updates agent group membership on existing documentsGroupCheck: Disaster recovery - scans all indices and fixes group inconsistenciesThe protocol supports multiple data message types:
Standard inventory data message with versioning:
table DataValue {
seq: ulong;
session: ulong;
operation: Operation;
id: string;
index: string;
version: ulong;
data: [byte];
}
enum Operation : byte {
Upsert = 0,
Delete = 1
}
Fields:
seq: Sequence number for the messagesession: Session identifier linking messages to a synchronization sessionoperation: Type of operation (Upsert for insert/update, Delete for removal)id: Unique document identifier within the indexindex: Target Elasticsearch/OpenSearch index nameversion: Module-specific version number for integrity checksdata: JSON document payload as byte arrayUsage: Used by syscollector, FIM, and SCA modules for inventory synchronization.
Context message for vulnerability scanning metadata:
table DataContext {
seq: ulong;
session: ulong;
id: string;
index: string;
data: [byte];
}
Fields:
seq: Sequence number for ordering and gap detectionsession: Session identifier linking to the synchronization sessionid: Context document identifierindex: Target index name for context associationdata: Context payload as byte array (typically JSON)Storage:
{session}_{seq}_contextUsage: Reserved for future Vulnerability Detector integration to provide additional context for vulnerability scanning without indexing the data.
Delete-by-query message for bulk data removal:
table DataClean {
seq: ulong;
session: ulong;
index: string;
}
Fields:
seq: Sequence numbersession: Session identifierindex: Target index to cleanUsage: Used to remove outdated data.
Checksum verification for integrity checks:
table ChecksumModule {
session: ulong;
index: string;
checksum: string;
}
Fields:
session: Session identifierindex: Target index namechecksum: Calculated checksum for integrity verificationUsage: Used in ModuleCheck mode to determine if full sync is needed.
Signals completion of data transmission:
table End {
session: ulong;
}
Fields:
session: Session identifier to completeSent by the agent after all data messages have been transmitted. This triggers the manager to process and index all session data.
The manager sends acknowledgment responses back to agents:
Acknowledgment for session creation:
table StartAck {
status: Status;
session: ulong;
}
enum Status: byte {
Ok,
Error,
Offline
}
Fields:
status: Result of session creation
Ok: Session created successfullyError: Session creation failedOffline: Reserved for future usesession: Unique session identifier assigned by the managerSent by the manager in response to a Start message. The agent uses this session ID for all subsequent data messages.
Acknowledgment for completed synchronization:
table EndAck {
status: Status;
session: ulong;
}
Fields:
status: Final synchronization result
Ok: All data successfully indexedError: Synchronization failedOffline: Indexer unavailablesession: Session identifierSent by the manager after processing all session data and completing indexing operations.
Request for message retransmission:
table ReqRet {
seq: [Pair];
session: ulong;
}
table Pair {
begin: ulong;
end: ulong;
}
Fields:
seq: Array of sequence number ranges to retransmit (each Pair represents a range from begin to end)session: Session identifierSent by the manager when sequence gaps are detected in received data messages. The agent retransmits the requested messages to ensure data completeness.
The Inventory Sync module validates all incoming FlatBuffer messages:
// Verify the flatbuffer structure
flatbuffers::Verifier verifier(buffer.data(), buffer.size());
if (!Wazuh::SyncSchema::VerifyMessageBuffer(verifier))
{
throw InventorySyncException("Invalid message buffer");
}
// Parse the message
auto syncMessage = Wazuh::SyncSchema::GetMessage(buffer.data());
// Validate content type
switch (syncMessage->content_type())
{
case Wazuh::SyncSchema::MessageType_Start:
// Validate Start message fields
if (syncMessage->content_as_Start()->module() == nullptr)
{
throw InventorySyncException("Missing module name");
}
break;
case Wazuh::SyncSchema::MessageType_DataValue:
// Validate DataValue message fields
auto dataValue = syncMessage->content_as_DataValue();
if (dataValue->id() == nullptr || dataValue->index() == nullptr)
{
throw InventorySyncException("Missing required fields");
}
break;
// ... other message types
}
Validation checks:
FlatBuffers provide several performance advantages for inventory synchronization:
auto fbBuilder = std::make_shared<flatbuffers::FlatBufferBuilder>();
auto startAckOffset = Wazuh::SyncSchema::CreateStartAck(
*fbBuilder,
Wazuh::SyncSchema::Status_Ok,
sessionId);
auto messageOffset = Wazuh::SyncSchema::CreateMessage(
*fbBuilder,
Wazuh::SyncSchema::MessageType_StartAck,
startAckOffset.Union());
fbBuilder->Finish(messageOffset);
auto message = Wazuh::SyncSchema::GetMessage(buffer.data());
if (message->content_type() == Wazuh::SyncSchema::MessageType_DataValue)
{
auto dataValue = static_cast<const Wazuh::SyncSchema::DataValue*>(message->content());
const auto seq = dataValue->seq();
const auto session = dataValue->session();
const auto version = dataValue->version();
const auto operation = dataValue->operation();
// Access JSON payload
auto data = dataValue->data();
std::string_view jsonPayload(reinterpret_cast<const char*>(data->data()), data->size());
// Process inventory data
}
flatbuffers::FlatBufferBuilder builder;
auto endAck = Wazuh::SyncSchema::CreateEndAck(
builder,
Wazuh::SyncSchema::Status_Ok,
sessionId);
auto message = Wazuh::SyncSchema::CreateMessage(
builder,
Wazuh::SyncSchema::MessageType_EndAck,
endAck.Union());
builder.Finish(message);
flatbuffers::FlatBufferBuilder builder;
// Create pairs of sequence ranges to retransmit
std::vector<flatbuffers::Offset<Wazuh::SyncSchema::Pair>> pairs;
pairs.push_back(Wazuh::SyncSchema::CreatePair(builder, 10, 15)); // Request seq 10-15
pairs.push_back(Wazuh::SyncSchema::CreatePair(builder, 20, 25)); // Request seq 20-25
auto reqRet = Wazuh::SyncSchema::CreateReqRet(
builder,
builder.CreateVector(pairs),
sessionId);
auto message = Wazuh::SyncSchema::CreateMessage(
builder,
Wazuh::SyncSchema::MessageType_ReqRet,
reqRet.Union());
builder.Finish(message);
The module implements robust error handling for flatbuffer operations:
The flatbuffer messages integrate seamlessly with the Wazuh Router system:
m_inventorySubscription = std::make_unique<TRouterSubscriber>(
INVENTORY_SYNC_TOPIC,
INVENTORY_SYNC_SUBSCRIBER_ID);
m_inventorySubscription->subscribe(
[queue = m_workersQueue.get()](const std::vector<char>& message) {
auto copy = message;
queue->push(std::move(copy));
});
This integration allows for efficient message routing and processing across the Wazuh infrastructure while maintaining type safety and performance through FlatBuffers.