docs/design_docs/20211109-milvus_flush_collections.md
The Flush operation is used to make sure that inserted data will be written into persistent storage. This document will introduce how the Flush operation works in Milvus 2.0. The following figure shows the execution flow of Flush.
SDK sends a Flush request to Proxy via Grpc, the proto is defined as follows:service MilvusService {
...
rpc Flush(FlushRequest) returns (FlushResponse) {}
...
}
message FlushRequest {
common.MsgBase base = 1;
string db_name = 2;
repeated string collection_names = 3;
}
message FlushResponse{
common.Status status = 1;
string db_name = 2;
map<string, schema.LongArray> coll_segIDs = 3;
}
Proxy receives Flush request, it would wrap this request into FlushTask, and push this task into DdTaskQueue queue. After that, Proxy would call WatiToFinish to wait until the task finished.type task interface {
TraceCtx() context.Context
ID() UniqueID // return ReqID
SetID(uid UniqueID) // set ReqID
Name() string
Type() commonpb.MsgType
BeginTs() Timestamp
EndTs() Timestamp
SetTs(ts Timestamp)
OnEnqueue() error
PreExecute(ctx context.Context) error
Execute(ctx context.Context) error
PostExecute(ctx context.Context) error
WaitToFinish() error
Notify(err error)
}
type FlushTask struct {
Condition
*milvuspb.FlushRequest
ctx context.Context
dataCoord types.DataCoord
result *milvuspb.FlushResponse
}
There is a background service in Proxy. This service gets FlushTask from DdTaskQueue, and executes in three phases:
PreExecute
FlushTask does nothing at this phase, and returns directly
Execute
Proxy sends a Flush request to DataCoord via Grpc, and waits for the response, the proto is defined as follows:
service DataCoord {
...
rpc Flush(FlushRequest) returns (FlushResponse) {}
...
}
message FlushRequest {
common.MsgBase base = 1;
int64 dbID = 2;
int64 collectionID = 4;
}
message FlushResponse {
common.Status status = 1;
int64 dbID = 2;
int64 collectionID = 3;
repeated int64 segmentIDs = 4;
}
PostExecute
FlushTask does nothing at this phase, and returns directly
After receiving a Flush request from Proxy, DataCoord would call SealAllSegments to seal all the growing segments belonging to this Collection, and would not allocate new IDs for these segments anymore. After that, DataCoord would send a response to Proxy, which contains all the sealed segment IDs.
In Milvus 2.0, Flush is an asynchronous operation. So when SDK receives the response of Flush, it only means that the DataCoord has sealed these segments. There are 2 problems that we have to solve.
DataCoord would no longer allocate new IDs for these sealed segments, but how to make sure all the allocated IDs have been consumed by DataNode.For the first problem, SDK should send GetSegmentInfo request to DataCoord periodically, until all sealed segments are in state of Flushed. The proto is defined as follows.
service DataCoord {
...
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
...
}
message GetSegmentInfoRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
}
message GetSegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
message SegmentInfo {
int64 ID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
int64 num_of_rows = 5;
common.SegmentState state = 6;
msgpb.MsgPosition dml_position = 7;
int64 max_row_num = 8;
uint64 last_expire_time = 9;
msgpb.MsgPosition start_position = 10;
}
enum SegmentState {
SegmentStateNone = 0;
NotExist = 1;
Growing = 2;
Sealed = 3;
Flushed = 4;
Flushing = 5;
}
DataNode would report a timestamp to DataCoord every time it consumes a package from MsgStream, the proto is defined as follows.message DataNodeTtMsg {
common.MsgBase base = 1;
string channel_name = 2;
uint64 timestamp = 3;
}
startDataNodeTsLoop, in DataCoord to process the message of DataNodeTtMsg.
DataCoord would extract channel_name from DataNodeTtMsg, and filter out all sealed segments that are attached on this channel_nameSealed with the DataNodeTtMsg.timestamp, if DataNodeTtMsg.timestamp is greater, which means that all IDs belonging to that segment have been consumed by DataNode, it's safe to notify DataNode to write that segment into persistent storage. The proto is defined as follows:service DataNode {
...
rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {}
...
}
message FlushSegmentsRequest {
common.MsgBase base = 1;
int64 dbID = 2;
int64 collectionID = 3;
repeated int64 segmentIDs = 4;
}